You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ns...@apache.org on 2011/10/11 04:09:32 UTC

svn commit: r1181452 - in /hbase/branches/0.89/src: main/java/org/apache/hadoop/hbase/mapreduce/ test/java/org/apache/hadoop/hbase/mapreduce/

Author: nspiegelberg
Date: Tue Oct 11 02:09:31 2011
New Revision: 1181452

URL: http://svn.apache.org/viewvc?rev=1181452&view=rev
Log:
HBASE-1861 : Multi-CF support for HFileOutputFormat

Summary:
Added ability for multiple column families to be concurrently written to
by HFOF.
Removed unnecessary dependency on Google jar

Test Plan:
- ant test-core

DiffCamp Revision: 183938
Reviewed By: kannan
CC: nspiegelberg, kannan, ruifang
Revert Plan:
OK

Modified:
    hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java
    hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java
    hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/mapreduce/PutSortReducer.java
    hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java
    hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java

Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java?rev=1181452&r1=1181451&r2=1181452&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java Tue Oct 11 02:09:31 2011
@@ -53,8 +53,6 @@ import org.apache.hadoop.mapreduce.lib.o
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
-import com.google.common.base.Preconditions;
-
 /**
  * Writes HFiles. Passed KeyValues must arrive in order.
  * Currently, can only write files to a single column family at a
@@ -74,8 +72,10 @@ public class HFileOutputFormat extends F
     Configuration conf = context.getConfiguration();
     final FileSystem fs = outputdir.getFileSystem(conf);
     // These configs. are from hbase-*.xml
-    final long maxsize = conf.getLong("hbase.hregion.max.filesize", 268435456);
-    final int blocksize = conf.getInt("hfile.min.blocksize.size", 65536);
+    final long maxsize = conf.getLong("hbase.hregion.max.filesize",
+        HConstants.DEFAULT_MAX_FILE_SIZE);
+    final int blocksize = conf.getInt("hfile.min.blocksize.size",
+        HFile.DEFAULT_BLOCKSIZE);
     // Invented config.  Add to hbase-*.xml if other than default compression.
     final String compression = conf.get("hfile.compression",
       Compression.Algorithm.NONE.getName());
@@ -87,48 +87,77 @@ public class HFileOutputFormat extends F
         new TreeMap<byte [], WriterLength>(Bytes.BYTES_COMPARATOR);
       private byte [] previousRow = HConstants.EMPTY_BYTE_ARRAY;
       private final byte [] now = Bytes.toBytes(System.currentTimeMillis());
+      private boolean rollRequested = false;
 
       public void write(ImmutableBytesWritable row, KeyValue kv)
       throws IOException {
+        // null input == user explicitly wants to flush
+        if (row == null && kv == null) {
+          rollWriters();
+          return;
+        }
+
+        byte [] rowKey = kv.getRow();
         long length = kv.getLength();
         byte [] family = kv.getFamily();
         WriterLength wl = this.writers.get(family);
-        if (wl == null || ((length + wl.written) >= maxsize) &&
-            Bytes.compareTo(this.previousRow, 0, this.previousRow.length,
-              kv.getBuffer(), kv.getRowOffset(), kv.getRowLength()) != 0) {
-          // Get a new writer.
-          Path basedir = new Path(outputdir, Bytes.toString(family));
-          if (wl == null) {
-            wl = new WriterLength();
-            this.writers.put(family, wl);
-            if (this.writers.size() > 1) throw new IOException("One family only");
-            // If wl == null, first file in family.  Ensure family dir exits.
-            if (!fs.exists(basedir)) fs.mkdirs(basedir);
-          }
-          wl.writer = getNewWriter(wl.writer, basedir);
-          LOG.info("Writer=" + wl.writer.getPath() +
-            ((wl.written == 0)? "": ", wrote=" + wl.written));
-          wl.written = 0;
+
+        // If this is a new column family, verify that the directory exists
+        if (wl == null) {
+          fs.mkdirs(new Path(outputdir, Bytes.toString(family)));
+        }
+
+        // If any of the HFiles for the column families has reached
+        // maxsize, we need to roll all the writers
+        if (wl != null && wl.written + length >= maxsize) {
+          this.rollRequested = true;
+        }
+
+        // This can only happen once a row is finished though
+        if (rollRequested && Bytes.compareTo(this.previousRow, rowKey) != 0) {
+          rollWriters();
+        }
+
+        // create a new HLog writer, if necessary
+        if (wl == null || wl.writer == null) {
+          wl = getNewWriter(family);
         }
+
+        // we now have the proper HLog writer. full steam ahead
         kv.updateLatestStamp(this.now);
         wl.writer.append(kv);
         wl.written += length;
+
         // Copy the row so we know when a row transition.
-        this.previousRow = kv.getRow();
+        this.previousRow = rowKey;
       }
 
-      /* Create a new HFile.Writer. Close current if there is one.
-       * @param writer
-       * @param familydir
-       * @return A new HFile.Writer.
+      private void rollWriters() throws IOException {
+        for (WriterLength wl : this.writers.values()) {
+          if (wl.writer != null) {
+            LOG.info("Writer=" + wl.writer.getPath() +
+                ((wl.written == 0)? "": ", wrote=" + wl.written));
+            close(wl.writer);
+          }
+          wl.writer = null;
+          wl.written = 0;
+        }
+        this.rollRequested = false;
+      }
+
+      /* Create a new HFile.Writer.
+       * @param family
+       * @return A WriterLength, containing a new HFile.Writer.
        * @throws IOException
        */
-      private HFile.Writer getNewWriter(final HFile.Writer writer,
-          final Path familydir)
-      throws IOException {
-        close(writer);
-        return new HFile.Writer(fs,  StoreFile.getUniqueFile(fs, familydir),
-          blocksize, bytesPerChecksum, compression, KeyValue.KEY_COMPARATOR);
+      private WriterLength getNewWriter(byte[] family) throws IOException {
+        WriterLength wl = new WriterLength();
+        Path familydir = new Path(outputdir, Bytes.toString(family));
+        wl.writer = new HFile.Writer(fs,
+          StoreFile.getUniqueFile(fs, familydir), blocksize,
+          bytesPerChecksum, compression, KeyValue.KEY_COMPARATOR);
+        this.writers.put(family, wl);
+        return wl;
       }
 
       private void close(final HFile.Writer w) throws IOException {
@@ -145,8 +174,8 @@ public class HFileOutputFormat extends F
 
       public void close(TaskAttemptContext c)
       throws IOException, InterruptedException {
-        for (Map.Entry<byte [], WriterLength> e: this.writers.entrySet()) {
-          close(e.getValue().writer);
+        for (WriterLength wl: this.writers.values()) {
+          close(wl.writer);
         }
       }
     };
@@ -183,7 +212,9 @@ public class HFileOutputFormat extends F
    */
   private static void writePartitions(Configuration conf, Path partitionsPath,
       List<ImmutableBytesWritable> startKeys) throws IOException {
-    Preconditions.checkArgument(!startKeys.isEmpty(), "No regions passed");
+    if (startKeys.isEmpty()) {
+      throw new IllegalArgumentException("No regions passed");
+    }
 
     // We're generating a list of split points, and we don't ever
     // have keys < the first region (which has an empty start key)
@@ -193,10 +224,11 @@ public class HFileOutputFormat extends F
       new TreeSet<ImmutableBytesWritable>(startKeys);
 
     ImmutableBytesWritable first = sorted.first();
-    Preconditions.checkArgument(
-        first.equals(HConstants.EMPTY_BYTE_ARRAY),
-        "First region of table should have empty start key. Instead has: %s",
-        Bytes.toStringBinary(first.get()));
+    if (!first.equals(HConstants.EMPTY_BYTE_ARRAY)) {
+      throw new IllegalArgumentException(
+          "First region of table should have empty start key. Instead has: "
+          + Bytes.toStringBinary(first.get()));
+    }
     sorted.remove(first);
 
     // Write the actual file

Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java?rev=1181452&r1=1181451&r2=1181452&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java Tue Oct 11 02:09:31 2011
@@ -290,6 +290,8 @@ public class ImportTsv {
     }
 
     TableMapReduceUtil.addDependencyJars(job);
+    TableMapReduceUtil.addDependencyJars(job.getConfiguration(),
+        com.google.common.base.Function.class /* Guava used by TsvParser */);
     return job;
   }
 

Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/mapreduce/PutSortReducer.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/mapreduce/PutSortReducer.java?rev=1181452&r1=1181451&r2=1181452&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/mapreduce/PutSortReducer.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/mapreduce/PutSortReducer.java Tue Oct 11 02:09:31 2011
@@ -19,6 +19,7 @@
  */
 package org.apache.hadoop.hbase.mapreduce;
 
+import java.util.Iterator;
 import java.util.List;
 import java.util.TreeSet;
 
@@ -26,6 +27,7 @@ import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.util.StringUtils;
 
 /**
  * Emits sorted Puts.
@@ -46,21 +48,37 @@ public class PutSortReducer extends
               ImmutableBytesWritable, KeyValue>.Context context)
       throws java.io.IOException, InterruptedException
   {
-    TreeSet<KeyValue> map = new TreeSet<KeyValue>(KeyValue.COMPARATOR);
-
-    for (Put p : puts) {
-      for (List<KeyValue> kvs : p.getFamilyMap().values()) {
-        for (KeyValue kv : kvs) {
-          map.add(kv.clone());
+    // although reduce() is called per-row, handle pathological case
+    long threshold = context.getConfiguration().getLong(
+        "putsortreducer.row.threshold", 2L * (1<<30));
+    Iterator<Put> iter = puts.iterator();
+    while (iter.hasNext()) {
+      TreeSet<KeyValue> map = new TreeSet<KeyValue>(KeyValue.COMPARATOR);
+      long curSize = 0;
+      // stop at the end or the RAM threshold
+      while (iter.hasNext() && curSize < threshold) {
+        Put p = iter.next();
+        for (List<KeyValue> kvs : p.getFamilyMap().values()) {
+          for (KeyValue kv : kvs) {
+            map.add(kv);
+            curSize += kv.getValueLength();
+          }
         }
       }
-    }
-    context.setStatus("Read " + map.getClass());
-    int index = 0;
-    for (KeyValue kv : map) {
-      context.write(row, kv);
-      if (index > 0 && index % 100 == 0)
-        context.setStatus("Wrote " + index);
+      context.setStatus("Read " + map.size() + " entries of " + map.getClass()
+          + "(" + StringUtils.humanReadableInt(curSize) + ")");
+      int index = 0;
+      for (KeyValue kv : map) {
+        context.write(row, kv);
+        if (index > 0 && index % 100 == 0)
+          context.setStatus("Wrote " + index);
+      }
+
+      // if we have more entries to process
+      if (iter.hasNext()) {
+        // force flush because we cannot guarantee intra-row sorted order
+        context.write(null, null);
+      }
     }
   }
 }

Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java?rev=1181452&r1=1181451&r2=1181452&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java Tue Oct 11 02:09:31 2011
@@ -49,8 +49,6 @@ import org.apache.hadoop.util.StringUtil
 import org.apache.hadoop.conf.Configuration;
 import org.apache.zookeeper.ZooKeeper;
 
-import com.google.common.base.Function;
-
 /**
  * Utility for {@link TableMapper} and {@link TableReducer}
  */
@@ -247,7 +245,6 @@ public class TableMapReduceUtil {
     try {
       addDependencyJars(job.getConfiguration(),
           ZooKeeper.class,
-          Function.class, // Guava collections
           job.getMapOutputKeyClass(),
           job.getMapOutputValueClass(),
           job.getOutputKeyClass(),

Modified: hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java?rev=1181452&r1=1181451&r2=1181452&view=diff
==============================================================================
--- hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java (original)
+++ hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java Tue Oct 11 02:09:31 2011
@@ -41,6 +41,9 @@ import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.PerformanceEvaluation;
 import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.io.NullWritable;
@@ -64,7 +67,9 @@ import org.mockito.Mockito;
 public class TestHFileOutputFormat  {
   private final static int ROWSPERSPLIT = 1024;
 
-  private static final byte[] FAMILY_NAME = PerformanceEvaluation.FAMILY_NAME;
+  private static final byte[][] FAMILIES
+    = { Bytes.add(PerformanceEvaluation.FAMILY_NAME, Bytes.toBytes("-A"))
+      , Bytes.add(PerformanceEvaluation.FAMILY_NAME, Bytes.toBytes("-B"))};
   private static final byte[] TABLE_NAME = Bytes.toBytes("TestTable");
 
   private HBaseTestingUtility util = new HBaseTestingUtility();
@@ -118,9 +123,11 @@ public class TestHFileOutputFormat  {
         random.nextBytes(valBytes);
         ImmutableBytesWritable key = new ImmutableBytesWritable(keyBytes);
 
-        KeyValue kv = new KeyValue(keyBytes, PerformanceEvaluation.FAMILY_NAME,
-            PerformanceEvaluation.QUALIFIER_NAME, valBytes);
-        context.write(key, kv);
+        for (byte[] family : TestHFileOutputFormat.FAMILIES) {
+          KeyValue kv = new KeyValue(keyBytes, family,
+              PerformanceEvaluation.QUALIFIER_NAME, valBytes);
+          context.write(key, kv);
+        }
       }
     }
   }
@@ -267,14 +274,14 @@ public class TestHFileOutputFormat  {
     try {
       util.startMiniCluster();
       HBaseAdmin admin = new HBaseAdmin(conf);
-      HTable table = util.createTable(TABLE_NAME, FAMILY_NAME);
-      int numRegions = util.createMultiRegions(
-          util.getConfiguration(), table, FAMILY_NAME,
-          startKeys);
-      assertEquals("Should make 5 regions",
-          numRegions, 5);
+      HTable table = util.createTable(TABLE_NAME, FAMILIES);
       assertEquals("Should start with empty table",
           0, util.countRows(table));
+      for (byte[] family : FAMILIES) {
+        int numRegions = util.createMultiRegions(
+            util.getConfiguration(), table, family, startKeys);
+        assertEquals("Should make 5 regions", numRegions, 5);
+      }
 
       // Generate the bulk load files
       util.startMiniMapReduceCluster();
@@ -283,12 +290,26 @@ public class TestHFileOutputFormat  {
       assertEquals("HFOF should not touch actual table",
           0, util.countRows(table));
 
+      // Make sure that a directory was created for every CF
+      int dir = 0;
+      for (FileStatus f : testDir.getFileSystem(conf).listStatus(testDir)) {
+        for (byte[] family : FAMILIES) {
+          if (Bytes.toString(family).equals(f.getPath().getName())) {
+            ++dir;
+          }
+        }
+      }
+      assertEquals("Column family not found in FS.", FAMILIES.length, dir);
+
+      // handle the split case
       if (shouldChangeRegions) {
         LOG.info("Changing regions in table");
         admin.disableTable(table.getTableName());
         byte[][] newStartKeys = generateRandomStartKeys(15);
-        util.createMultiRegions(util.getConfiguration(),
-            table, FAMILY_NAME, newStartKeys);
+        for (byte[] family : FAMILIES) {
+          util.createMultiRegions(
+              util.getConfiguration(), table, family, newStartKeys);
+        }
         admin.enableTable(table.getTableName());
         while (table.getRegionsInfo().size() != 15 ||
             !admin.isTableAvailable(table.getTableName())) {
@@ -304,6 +325,19 @@ public class TestHFileOutputFormat  {
       int expectedRows = conf.getInt("mapred.map.tasks", 1) * ROWSPERSPLIT;
       assertEquals("LoadIncrementalHFiles should put expected data in table",
           expectedRows, util.countRows(table));
+      Scan scan = new Scan();
+      ResultScanner results = table.getScanner(scan);
+      int count = 0;
+      for (Result res : results) {
+        count++;
+        assertEquals(FAMILIES.length, res.raw().length);
+        KeyValue first = res.raw()[0];
+        for (KeyValue kv : res.raw()) {
+          assertTrue(KeyValue.COMPARATOR.matchingRows(first, kv));
+          assertTrue(Bytes.equals(first.getValue(), kv.getValue()));
+        }
+      }
+      results.close();
       String tableDigestBefore = util.checksumRows(table);
 
       // Cause regions to reopen
@@ -348,11 +382,13 @@ public class TestHFileOutputFormat  {
     util = new HBaseTestingUtility(conf);
     if ("newtable".equals(args[0])) {
       byte[] tname = args[1].getBytes();
-      HTable table = util.createTable(tname, FAMILY_NAME);
+      HTable table = util.createTable(tname, FAMILIES);
       HBaseAdmin admin = new HBaseAdmin(conf);
       admin.disableTable(tname);
-      util.createMultiRegions(conf, table, FAMILY_NAME,
-          generateRandomStartKeys(5));
+      byte[][] startKeys = generateRandomStartKeys(5);
+      for (byte[] family : FAMILIES) {
+        util.createMultiRegions(conf, table, family, startKeys);
+      }
       admin.enableTable(tname);
     } else if ("incremental".equals(args[0])) {
       byte[] tname = args[1].getBytes();