You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2011/03/16 19:25:41 UTC

svn commit: r1082246 - in /hbase/trunk: CHANGES.txt src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java

Author: stack
Date: Wed Mar 16 18:25:41 2011
New Revision: 1082246

URL: http://svn.apache.org/viewvc?rev=1082246&view=rev
Log:
HBASE-3440 Clean out load_table.rb and make sure all roads lead to completebulkload tool

Modified:
    hbase/trunk/CHANGES.txt
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java

Modified: hbase/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hbase/trunk/CHANGES.txt?rev=1082246&r1=1082245&r2=1082246&view=diff
==============================================================================
--- hbase/trunk/CHANGES.txt (original)
+++ hbase/trunk/CHANGES.txt Wed Mar 16 18:25:41 2011
@@ -113,6 +113,8 @@ Release 0.91.0 - Unreleased
    HBASE-2495  Allow record filtering with selected row key values in HBase
                Export (Subbu M Iyer via Stack)
    HBASE-3600  Update our jruby to 1.6.0
+   HBASE-3440  Clean out load_table.rb and make sure all roads lead to
+               completebulkload tool (Vidhyashankar Venkataraman via Stack)
 
   TASK
    HBASE-3559  Move report of split to master OFF the heartbeat channel

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java?rev=1082246&r1=1082245&r2=1082246&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java Wed Mar 16 18:25:41 2011
@@ -24,6 +24,7 @@ import java.io.IOException;
 import java.util.Deque;
 import java.util.LinkedList;
 import java.util.Map;
+import java.util.ArrayList;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -51,6 +52,11 @@ import org.apache.hadoop.hbase.regionser
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import java.util.TreeMap;
+
 
 /**
  * Tool to load the output of HFileOutputFormat into an existing table.
@@ -58,14 +64,21 @@ import org.apache.hadoop.util.ToolRunner
  */
 public class LoadIncrementalHFiles extends Configured implements Tool {
 
-  static Log LOG = LogFactory.getLog(LoadIncrementalHFiles.class);
+  private static Log LOG = LogFactory.getLog(LoadIncrementalHFiles.class);
+  private static final int  TABLE_CREATE_MAX_RETRIES = 20;
+  private static final long TABLE_CREATE_SLEEP = 60000;
+  private HBaseAdmin hbAdmin;
 
   public static String NAME = "completebulkload";
 
-  public LoadIncrementalHFiles(Configuration conf) {
+  public LoadIncrementalHFiles(Configuration conf) throws Exception {
     super(conf);
+    this.hbAdmin = new HBaseAdmin(conf);
   }
 
+  /* This constructor does not add HBase configuration. 
+   * Explicit addition is necessary. Do we need this constructor? 
+   */
   public LoadIncrementalHFiles() {
     super();
   }
@@ -299,6 +312,125 @@ public class LoadIncrementalHFiles exten
     return !HFile.isReservedFileInfoKey(key);
   }
 
+  private boolean doesTableExist(String tableName) throws Exception {
+    return hbAdmin.tableExists(tableName);
+  }
+  
+  /*
+   * Infers region boundaries for a new table.
+   * Parameter:
+   *   bdryMap is a map between keys to an integer belonging to {+1, -1}
+   *     If a key is a start key of a file, then it maps to +1
+   *     If a key is an end key of a file, then it maps to -1
+   * Algo:
+   * 1) Poll on the keys in order: 
+   *    a) Keep adding the mapped values to these keys (runningSum) 
+   *    b) Each time runningSum reaches 0, add the start Key from when the runningSum had started to a boundary list.
+   * 2) Return the boundary list. 
+   */
+  public static byte[][] inferBoundaries(TreeMap<byte[], Integer> bdryMap) {
+    ArrayList<byte[]> keysArray = new ArrayList<byte[]>();
+    int runningValue = 0;
+    byte[] currStartKey = null;
+    boolean firstBoundary = true;
+    
+    for (Map.Entry<byte[], Integer> item: bdryMap.entrySet()) {
+      if (runningValue == 0) currStartKey = item.getKey();
+      runningValue += item.getValue();
+      if (runningValue == 0) {
+        if (!firstBoundary) keysArray.add(currStartKey);
+        firstBoundary = false;
+      } 
+    }
+    
+    return keysArray.toArray(new byte[0][0]);
+  }
+ 
+  /*
+   * If the table is created for the first time, then "completebulkload" reads the files twice.
+   * More modifications necessary if we want to avoid doing it.
+   */
+  private void createTable(String tableName, String dirPath) throws Exception {
+    Path hfofDir = new Path(dirPath);
+    FileSystem fs = hfofDir.getFileSystem(getConf());
+
+    if (!fs.exists(hfofDir)) {
+      throw new FileNotFoundException("HFileOutputFormat dir " +
+          hfofDir + " not found");
+    }
+
+    FileStatus[] familyDirStatuses = fs.listStatus(hfofDir);
+    if (familyDirStatuses == null) {
+      throw new FileNotFoundException("No families found in " + hfofDir);
+    }
+
+    HTableDescriptor htd = new HTableDescriptor(tableName);
+    HColumnDescriptor hcd = null;
+
+    // Add column families
+    // Build a set of keys
+    byte[][] keys = null;
+    TreeMap<byte[], Integer> map = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
+    
+    for (FileStatus stat : familyDirStatuses) {
+      if (!stat.isDir()) {
+        LOG.warn("Skipping non-directory " + stat.getPath());
+        continue;
+      }
+      Path familyDir = stat.getPath();
+      // Skip _logs, etc
+      if (familyDir.getName().startsWith("_")) continue;
+      byte[] family = familyDir.getName().getBytes();
+     
+      hcd = new HColumnDescriptor(family);
+      htd.addFamily(hcd);
+      
+      Path[] hfiles = FileUtil.stat2Paths(fs.listStatus(familyDir));
+      for (Path hfile : hfiles) {
+        if (hfile.getName().startsWith("_")) continue;
+        
+        HFile.Reader reader = new HFile.Reader(fs, hfile, null, false, false);
+        final byte[] first, last;
+        try {
+          reader.loadFileInfo();
+          first = reader.getFirstRowKey();
+          last =  reader.getLastRowKey();
+
+          LOG.info("Trying to figure out region boundaries hfile=" + hfile +
+            " first=" + Bytes.toStringBinary(first) +
+            " last="  + Bytes.toStringBinary(last));
+          
+          // To eventually infer start key-end key boundaries
+          Integer value = map.containsKey(first)?(Integer)map.get(first):0;
+          map.put(first, value+1);
+
+          value = map.containsKey(last)?(Integer)map.get(last):0;
+          map.put(last, value-1);
+        }  finally {
+          reader.close();
+        }
+      }
+    }
+    
+    keys = LoadIncrementalHFiles.inferBoundaries(map);
+    try {    
+      this.hbAdmin.createTableAsync(htd, keys);
+    } catch (java.net.SocketTimeoutException e) {
+      System.err.println("Caught Socket timeout.. Mostly caused by a slow region assignment by master!");
+    }
+         
+    HTable table = new HTable(tableName);
+
+    HConnection conn = table.getConnection();
+    int ctr = 0;
+    while (!conn.isTableAvailable(table.getTableName()) && (ctr<TABLE_CREATE_MAX_RETRIES)) {
+      LOG.info("Table " + tableName + "not yet available... Sleeping for 60 more seconds...");
+      /* Every TABLE_CREATE_SLEEP milliseconds, wakes up and checks if the table is available*/
+      Thread.sleep(TABLE_CREATE_SLEEP);
+      ctr++;
+    }
+    LOG.info("Table "+ tableName +" is finally available!!");
+  }
 
   @Override
   public int run(String[] args) throws Exception {
@@ -307,15 +439,21 @@ public class LoadIncrementalHFiles exten
       return -1;
     }
 
-    Path hfofDir = new Path(args[0]);
-    HTable table = new HTable(this.getConf(), args[1]);
+    String dirPath   = args[0];
+    String tableName = args[1];
+    
+    boolean tableExists   = this.doesTableExist(tableName);
+    if (!tableExists) this.createTable(tableName,dirPath);
+    
+    Path hfofDir = new Path(dirPath);
+    HTable table = new HTable(tableName);
 
     doBulkLoad(hfofDir, table);
     return 0;
   }
 
   public static void main(String[] args) throws Exception {
-    ToolRunner.run(new LoadIncrementalHFiles(), args);
+    ToolRunner.run(new LoadIncrementalHFiles(HBaseConfiguration.create()), args);
   }
 
 }

Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java?rev=1082246&r1=1082245&r2=1082246&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java (original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java Wed Mar 16 18:25:41 2011
@@ -22,6 +22,8 @@ package org.apache.hadoop.hbase.mapreduc
 import static org.junit.Assert.assertEquals;
 
 import java.io.IOException;
+import java.util.Arrays;
+import java.util.TreeMap;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
@@ -186,4 +188,70 @@ public class TestLoadIncrementalHFiles {
       writer.close();
     }
   }
+
+  private void addStartEndKeysForTest(TreeMap<byte[], Integer> map, byte[] first, byte[] last) {
+    Integer value = map.containsKey(first)?(Integer)map.get(first):0;
+    map.put(first, value+1);
+
+    value = map.containsKey(last)?(Integer)map.get(last):0;
+    map.put(last, value-1);
+  }
+
+  @Test 
+  public void testInferBoundaries() {
+    TreeMap<byte[], Integer> map = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
+
+    /* Toy example
+     *     c---------i            o------p          s---------t     v------x
+     * a------e    g-----k   m-------------q   r----s            u----w
+     *
+     * Should be inferred as:
+     * a-----------------k   m-------------q   r--------------t  u---------x
+     * 
+     * The output should be (m,r,u) 
+     */
+
+    String first;
+    String last;
+
+    first = "a"; last = "e";
+    addStartEndKeysForTest(map, first.getBytes(), last.getBytes());
+    
+    first = "r"; last = "s";
+    addStartEndKeysForTest(map, first.getBytes(), last.getBytes());
+
+    first = "o"; last = "p";
+    addStartEndKeysForTest(map, first.getBytes(), last.getBytes());
+
+    first = "g"; last = "k";
+    addStartEndKeysForTest(map, first.getBytes(), last.getBytes());
+
+    first = "v"; last = "x";
+    addStartEndKeysForTest(map, first.getBytes(), last.getBytes());
+
+    first = "c"; last = "i";
+    addStartEndKeysForTest(map, first.getBytes(), last.getBytes());
+
+    first = "m"; last = "q";
+    addStartEndKeysForTest(map, first.getBytes(), last.getBytes());
+
+    first = "s"; last = "t";
+    addStartEndKeysForTest(map, first.getBytes(), last.getBytes());
+    
+    first = "u"; last = "w";
+    addStartEndKeysForTest(map, first.getBytes(), last.getBytes());
+
+    byte[][] keysArray = LoadIncrementalHFiles.inferBoundaries(map);
+    byte[][] compare = new byte[3][];
+    compare[0] = "m".getBytes();
+    compare[1] = "r".getBytes(); 
+    compare[2] = "u".getBytes();
+
+    assertEquals(keysArray.length, 3);
+
+    for (int row = 0; row<keysArray.length; row++){
+      assertArrayEquals(keysArray[row], compare[row]);
+    }
+  }
+
 }