You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2011/03/16 19:25:41 UTC
svn commit: r1082246 - in /hbase/trunk: CHANGES.txt
src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java
Author: stack
Date: Wed Mar 16 18:25:41 2011
New Revision: 1082246
URL: http://svn.apache.org/viewvc?rev=1082246&view=rev
Log:
HBASE-3440 Clean out load_table.rb and make sure all roads lead to completebulkload tool
Modified:
hbase/trunk/CHANGES.txt
hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
hbase/trunk/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java
Modified: hbase/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hbase/trunk/CHANGES.txt?rev=1082246&r1=1082245&r2=1082246&view=diff
==============================================================================
--- hbase/trunk/CHANGES.txt (original)
+++ hbase/trunk/CHANGES.txt Wed Mar 16 18:25:41 2011
@@ -113,6 +113,8 @@ Release 0.91.0 - Unreleased
HBASE-2495 Allow record filtering with selected row key values in HBase
Export (Subbu M Iyer via Stack)
HBASE-3600 Update our jruby to 1.6.0
+ HBASE-3440 Clean out load_table.rb and make sure all roads lead to
+ completebulkload tool (Vidhyashankar Venkataraman via Stack)
TASK
HBASE-3559 Move report of split to master OFF the heartbeat channel
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java?rev=1082246&r1=1082245&r2=1082246&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java Wed Mar 16 18:25:41 2011
@@ -24,6 +24,7 @@ import java.io.IOException;
import java.util.Deque;
import java.util.LinkedList;
import java.util.Map;
+import java.util.ArrayList;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -51,6 +52,11 @@ import org.apache.hadoop.hbase.regionser
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import java.util.TreeMap;
+
/**
* Tool to load the output of HFileOutputFormat into an existing table.
@@ -58,14 +64,21 @@ import org.apache.hadoop.util.ToolRunner
*/
public class LoadIncrementalHFiles extends Configured implements Tool {
- static Log LOG = LogFactory.getLog(LoadIncrementalHFiles.class);
+ private static Log LOG = LogFactory.getLog(LoadIncrementalHFiles.class);
+ private static final int TABLE_CREATE_MAX_RETRIES = 20;
+ private static final long TABLE_CREATE_SLEEP = 60000;
+ private HBaseAdmin hbAdmin;
public static String NAME = "completebulkload";
- public LoadIncrementalHFiles(Configuration conf) {
+ public LoadIncrementalHFiles(Configuration conf) throws Exception {
super(conf);
+ this.hbAdmin = new HBaseAdmin(conf);
}
+ /* This constructor does not add HBase configuration.
+ * Explicit addition is necessary. Do we need this constructor?
+ */
public LoadIncrementalHFiles() {
super();
}
@@ -299,6 +312,125 @@ public class LoadIncrementalHFiles exten
return !HFile.isReservedFileInfoKey(key);
}
+ private boolean doesTableExist(String tableName) throws Exception {
+ return hbAdmin.tableExists(tableName);
+ }
+
+ /*
+ * Infers region boundaries for a new table.
+ * Parameter:
+ * bdryMap is a map between keys to an integer belonging to {+1, -1}
+ * If a key is a start key of a file, then it maps to +1
+ * If a key is an end key of a file, then it maps to -1
+ * Algo:
+ * 1) Poll on the keys in order:
+ * a) Keep adding the mapped values to these keys (runningSum)
+ * b) Each time runningSum reaches 0, add the start Key from when the runningSum had started to a boundary list.
+ * 2) Return the boundary list.
+ */
+ public static byte[][] inferBoundaries(TreeMap<byte[], Integer> bdryMap) {
+ ArrayList<byte[]> keysArray = new ArrayList<byte[]>();
+ int runningValue = 0;
+ byte[] currStartKey = null;
+ boolean firstBoundary = true;
+
+ for (Map.Entry<byte[], Integer> item: bdryMap.entrySet()) {
+ if (runningValue == 0) currStartKey = item.getKey();
+ runningValue += item.getValue();
+ if (runningValue == 0) {
+ if (!firstBoundary) keysArray.add(currStartKey);
+ firstBoundary = false;
+ }
+ }
+
+ return keysArray.toArray(new byte[0][0]);
+ }
+
+ /*
+ * If the table is created for the first time, then "completebulkload" reads the files twice.
+ * More modifications necessary if we want to avoid doing it.
+ */
+ private void createTable(String tableName, String dirPath) throws Exception {
+ Path hfofDir = new Path(dirPath);
+ FileSystem fs = hfofDir.getFileSystem(getConf());
+
+ if (!fs.exists(hfofDir)) {
+ throw new FileNotFoundException("HFileOutputFormat dir " +
+ hfofDir + " not found");
+ }
+
+ FileStatus[] familyDirStatuses = fs.listStatus(hfofDir);
+ if (familyDirStatuses == null) {
+ throw new FileNotFoundException("No families found in " + hfofDir);
+ }
+
+ HTableDescriptor htd = new HTableDescriptor(tableName);
+ HColumnDescriptor hcd = null;
+
+ // Add column families
+ // Build a set of keys
+ byte[][] keys = null;
+ TreeMap<byte[], Integer> map = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
+
+ for (FileStatus stat : familyDirStatuses) {
+ if (!stat.isDir()) {
+ LOG.warn("Skipping non-directory " + stat.getPath());
+ continue;
+ }
+ Path familyDir = stat.getPath();
+ // Skip _logs, etc
+ if (familyDir.getName().startsWith("_")) continue;
+ byte[] family = familyDir.getName().getBytes();
+
+ hcd = new HColumnDescriptor(family);
+ htd.addFamily(hcd);
+
+ Path[] hfiles = FileUtil.stat2Paths(fs.listStatus(familyDir));
+ for (Path hfile : hfiles) {
+ if (hfile.getName().startsWith("_")) continue;
+
+ HFile.Reader reader = new HFile.Reader(fs, hfile, null, false, false);
+ final byte[] first, last;
+ try {
+ reader.loadFileInfo();
+ first = reader.getFirstRowKey();
+ last = reader.getLastRowKey();
+
+ LOG.info("Trying to figure out region boundaries hfile=" + hfile +
+ " first=" + Bytes.toStringBinary(first) +
+ " last=" + Bytes.toStringBinary(last));
+
+ // To eventually infer start key-end key boundaries
+ Integer value = map.containsKey(first)?(Integer)map.get(first):0;
+ map.put(first, value+1);
+
+ value = map.containsKey(last)?(Integer)map.get(last):0;
+ map.put(last, value-1);
+ } finally {
+ reader.close();
+ }
+ }
+ }
+
+ keys = LoadIncrementalHFiles.inferBoundaries(map);
+ try {
+ this.hbAdmin.createTableAsync(htd, keys);
+ } catch (java.net.SocketTimeoutException e) {
+ System.err.println("Caught Socket timeout.. Mostly caused by a slow region assignment by master!");
+ }
+
+ HTable table = new HTable(tableName);
+
+ HConnection conn = table.getConnection();
+ int ctr = 0;
+ while (!conn.isTableAvailable(table.getTableName()) && (ctr<TABLE_CREATE_MAX_RETRIES)) {
+ LOG.info("Table " + tableName + "not yet available... Sleeping for 60 more seconds...");
+ /* Every TABLE_CREATE_SLEEP milliseconds, wakes up and checks if the table is available*/
+ Thread.sleep(TABLE_CREATE_SLEEP);
+ ctr++;
+ }
+ LOG.info("Table "+ tableName +" is finally available!!");
+ }
@Override
public int run(String[] args) throws Exception {
@@ -307,15 +439,21 @@ public class LoadIncrementalHFiles exten
return -1;
}
- Path hfofDir = new Path(args[0]);
- HTable table = new HTable(this.getConf(), args[1]);
+ String dirPath = args[0];
+ String tableName = args[1];
+
+ boolean tableExists = this.doesTableExist(tableName);
+ if (!tableExists) this.createTable(tableName,dirPath);
+
+ Path hfofDir = new Path(dirPath);
+ HTable table = new HTable(tableName);
doBulkLoad(hfofDir, table);
return 0;
}
public static void main(String[] args) throws Exception {
- ToolRunner.run(new LoadIncrementalHFiles(), args);
+ ToolRunner.run(new LoadIncrementalHFiles(HBaseConfiguration.create()), args);
}
}
Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java?rev=1082246&r1=1082245&r2=1082246&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java (original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java Wed Mar 16 18:25:41 2011
@@ -22,6 +22,8 @@ package org.apache.hadoop.hbase.mapreduc
import static org.junit.Assert.assertEquals;
import java.io.IOException;
+import java.util.Arrays;
+import java.util.TreeMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
@@ -186,4 +188,70 @@ public class TestLoadIncrementalHFiles {
writer.close();
}
}
+
+ private void addStartEndKeysForTest(TreeMap<byte[], Integer> map, byte[] first, byte[] last) {
+ Integer value = map.containsKey(first)?(Integer)map.get(first):0;
+ map.put(first, value+1);
+
+ value = map.containsKey(last)?(Integer)map.get(last):0;
+ map.put(last, value-1);
+ }
+
+ @Test
+ public void testInferBoundaries() {
+ TreeMap<byte[], Integer> map = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
+
+ /* Toy example
+ * c---------i o------p s---------t v------x
+ * a------e g-----k m-------------q r----s u----w
+ *
+ * Should be inferred as:
+ * a-----------------k m-------------q r--------------t u---------x
+ *
+ * The output should be (m,r,u)
+ */
+
+ String first;
+ String last;
+
+ first = "a"; last = "e";
+ addStartEndKeysForTest(map, first.getBytes(), last.getBytes());
+
+ first = "r"; last = "s";
+ addStartEndKeysForTest(map, first.getBytes(), last.getBytes());
+
+ first = "o"; last = "p";
+ addStartEndKeysForTest(map, first.getBytes(), last.getBytes());
+
+ first = "g"; last = "k";
+ addStartEndKeysForTest(map, first.getBytes(), last.getBytes());
+
+ first = "v"; last = "x";
+ addStartEndKeysForTest(map, first.getBytes(), last.getBytes());
+
+ first = "c"; last = "i";
+ addStartEndKeysForTest(map, first.getBytes(), last.getBytes());
+
+ first = "m"; last = "q";
+ addStartEndKeysForTest(map, first.getBytes(), last.getBytes());
+
+ first = "s"; last = "t";
+ addStartEndKeysForTest(map, first.getBytes(), last.getBytes());
+
+ first = "u"; last = "w";
+ addStartEndKeysForTest(map, first.getBytes(), last.getBytes());
+
+ byte[][] keysArray = LoadIncrementalHFiles.inferBoundaries(map);
+ byte[][] compare = new byte[3][];
+ compare[0] = "m".getBytes();
+ compare[1] = "r".getBytes();
+ compare[2] = "u".getBytes();
+
+ assertEquals(keysArray.length, 3);
+
+ for (int row = 0; row<keysArray.length; row++){
+ assertArrayEquals(keysArray[row], compare[row]);
+ }
+ }
+
}