You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ns...@apache.org on 2011/10/11 04:20:54 UTC

svn commit: r1181566 - in /hbase/branches/0.89/src: main/java/org/apache/hadoop/hbase/mapreduce/ main/java/org/apache/hadoop/hbase/regionserver/ main/java/org/apache/hadoop/hbase/util/ test/java/org/apache/hadoop/hbase/mapreduce/

Author: nspiegelberg
Date: Tue Oct 11 02:20:53 2011
New Revision: 1181566

URL: http://svn.apache.org/viewvc?rev=1181566&view=rev
Log:
add BloomFilter and timestamprange for bulk import

Summary:
add BloomFilter and timestamprange for bulk import.

This is the version updated for hfile v2.

Test Plan:
unit tests.

Reviewed By: kannan
Reviewers: nspiegelberg, kannan, kranganathan, mbautin
Commenters: kranganathan, nspiegelberg, mbautin
CC: hbase@lists, kranganathan, nspiegelberg, gqchen, ,
mbautin, kannan
Differential Revision: 237110

Modified:
    hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java
    hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
    hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/util/BloomFilterFactory.java
    hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java

Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java?rev=1181566&r1=1181565&r2=1181566&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java Tue Oct 11 02:20:53 2011
@@ -47,6 +47,8 @@ import org.apache.hadoop.hbase.io.hfile.
 import org.apache.hadoop.hbase.io.hfile.HFile;
 import org.apache.hadoop.hbase.mapreduce.hadoopbackport.TotalOrderPartitioner;
 import org.apache.hadoop.hbase.regionserver.StoreFile;
+import org.apache.hadoop.hbase.regionserver.StoreFile.BloomType;
+import org.apache.hadoop.hbase.util.BloomFilterFactory;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.SequenceFile;
@@ -70,6 +72,9 @@ import org.apache.commons.logging.LogFac
 public class HFileOutputFormat extends FileOutputFormat<ImmutableBytesWritable, KeyValue> {
   static Log LOG = LogFactory.getLog(HFileOutputFormat.class);
   static final String COMPRESSION_CONF_KEY = "hbase.hfileoutputformat.families.compression";
+  //This stores a string in the format family1=bloomType1&family2=bloomType2&...&familyN=bloomTypeN
+  static final String BLOOMFILTER_TYPE_PER_CF_KEY =
+    "hbase.hfileoutputformat.families.bloomfilter.typePerCF";
 
   public RecordWriter<ImmutableBytesWritable, KeyValue> getRecordWriter(final TaskAttemptContext context)
   throws IOException, InterruptedException {
@@ -90,7 +95,7 @@ public class HFileOutputFormat extends F
     // create a map from column family to the compression algorithm
     final Map<byte[], String> compressionMap = createFamilyCompressionMap(conf);
 
-    final int bytesPerChecksum = HFile.getBytesPerChecksum(conf, conf);
+    final Map<byte[], BloomType> bloomTypeMap = createFamilyBloomTypeMap(conf);
 
     return new RecordWriter<ImmutableBytesWritable, KeyValue>() {
       // Map of families to writers and how much has been output on the writer.
@@ -102,6 +107,7 @@ public class HFileOutputFormat extends F
 
       public void write(ImmutableBytesWritable row, KeyValue kv)
       throws IOException {
+
         // null input == user explicitly wants to flush
         if (row == null && kv == null) {
           rollWriters();
@@ -167,21 +173,35 @@ public class HFileOutputFormat extends F
         Path familydir = new Path(outputdir, Bytes.toString(family));
         String compression = compressionMap.get(family);
         compression = compression == null ? defaultCompression : compression;
-        wl.writer = HFile.getWriterFactory(conf).createWriter(fs,
-          StoreFile.getUniqueFile(fs, familydir), blocksize,
-          bytesPerChecksum, compression, KeyValue.KEY_COMPARATOR);
+        Compression.Algorithm compressionAlgo =
+          Compression.getCompressionAlgorithmByName(compression);
+
+        BloomType bloomType = bloomTypeMap.get(family);
+        if (bloomType == null) {
+          bloomType = BloomType.NONE;
+        }
+
+        /* new bloom filter does not require maxKeys. */
+        int maxKeys = 0;
+        wl.writer = StoreFile.createWriter(fs, familydir, blocksize,
+            compressionAlgo, KeyValue.COMPARATOR, conf, bloomType, maxKeys);
         this.writers.put(family, wl);
         return wl;
       }
 
-      private void close(final HFile.Writer w) throws IOException {
+      private void close(final StoreFile.Writer w) throws IOException {
         if (w != null) {
           w.appendFileInfo(StoreFile.BULKLOAD_TIME_KEY,
               Bytes.toBytes(System.currentTimeMillis()));
           w.appendFileInfo(StoreFile.BULKLOAD_TASK_KEY,
               Bytes.toBytes(context.getTaskAttemptID().toString()));
-          w.appendFileInfo(StoreFile.MAJOR_COMPACTION_KEY, 
-              Bytes.toBytes(true));
+
+          /* Set maxSequenceId to be 0 for bulk imported files since
+           * these files do not correspond to any edit log items.
+           *
+           * Set majorCompaction flag to be false for bulk import file.
+           */
+          w.appendMetadata(0, false);
           w.close();
         }
       }
@@ -200,7 +220,7 @@ public class HFileOutputFormat extends F
    */
   static class WriterLength {
     long written = 0;
-    HFile.Writer writer = null;
+    StoreFile.Writer writer = null;
   }
 
   /**
@@ -317,6 +337,10 @@ public class HFileOutputFormat extends F
     // Set compression algorithms based on column families
     configureCompression(table, conf);
 
+    // Set BloomFilter type based on column families and
+    // relevant parameters.
+    configureBloomFilter(table, conf);
+
     LOG.info("Incremental table output configured.");
   }
 
@@ -379,4 +403,66 @@ public class HFileOutputFormat extends F
     // Get rid of the last ampersand
     conf.set(COMPRESSION_CONF_KEY, compressionConfigValue.toString());
   }
+
+  private static void configureBloomFilter(HTable table, Configuration conf)
+  throws IOException {
+    // get conf information needed by BloomFilter
+    Configuration tableConf = table.getConfiguration();
+
+    // Now go through the column family and save the BloomFilter setting for
+    // each column family
+    HTableDescriptor tableDescriptor = table.getTableDescriptor();
+    if (tableDescriptor == null){
+      return;
+    }
+
+    if (tableConf != null) {
+      // copying Bloom filter related configuration to conf.
+      BloomFilterFactory.copyBloomFilterConf(tableConf, conf);
+    }
+
+    StringBuilder bloomfilterTypePerCFConfigValue = new StringBuilder();
+    Collection<HColumnDescriptor> families = tableDescriptor.getFamilies();
+    int i = 0;
+    for (HColumnDescriptor familyDescriptor : families) {
+      if (i++ > 0) {
+        bloomfilterTypePerCFConfigValue.append('&');
+      }
+
+      bloomfilterTypePerCFConfigValue.append(
+          URLEncoder.encode(familyDescriptor.getNameAsString(), "UTF-8"));
+      bloomfilterTypePerCFConfigValue.append('=');
+      bloomfilterTypePerCFConfigValue.append(
+          URLEncoder.encode(familyDescriptor.getBloomFilterType().toString(),
+          "UTF-8"));
+    }
+
+    conf.set(BLOOMFILTER_TYPE_PER_CF_KEY, bloomfilterTypePerCFConfigValue.toString());
+  }
+
+  static Map<byte[], BloomType> createFamilyBloomTypeMap(Configuration conf) {
+    Map<byte[], BloomType> bloomTypeMap =
+      new TreeMap<byte[], BloomType >(Bytes.BYTES_COMPARATOR);
+    String bloomFilterTypeConf = conf.get(BLOOMFILTER_TYPE_PER_CF_KEY, "");
+
+    if (bloomFilterTypeConf.isEmpty()) {
+      return bloomTypeMap;
+    }
+
+    for (String familyConf : bloomFilterTypeConf.split("&")) {
+      String[] familySplit = familyConf.split("=");
+      if (familySplit.length != 2) {
+        throw new AssertionError("invalid bloomfilter type configuration");
+      }
+
+      try {
+        bloomTypeMap.put(URLDecoder.decode(familySplit[0], "UTF-8").getBytes(),
+            BloomType.valueOf(URLDecoder.decode(familySplit[1], "UTF-8")));
+      } catch (UnsupportedEncodingException e) {
+        // will not happen with UTF-8 encoding
+        throw new AssertionError(e);
+      }
+    }
+    return bloomTypeMap;
+  }
 }

Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java?rev=1181566&r1=1181565&r2=1181566&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java Tue Oct 11 02:20:53 2011
@@ -116,7 +116,7 @@ public class StoreFile {
   private static final byte[] LAST_BLOOM_KEY = Bytes.toBytes("LAST_BLOOM_KEY");
 
   /** Key for Timerange information in metadata*/
-  static final byte[] TIMERANGE_KEY = Bytes.toBytes("TIMERANGE");
+  public static final byte[] TIMERANGE_KEY = Bytes.toBytes("TIMERANGE");
 
   // Make default block size for StoreFiles 8k while testing.  TODO: FIX!
   // Need to make it 8k for testing.

Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/util/BloomFilterFactory.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/util/BloomFilterFactory.java?rev=1181566&r1=1181565&r2=1181566&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/util/BloomFilterFactory.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/util/BloomFilterFactory.java Tue Oct 11 02:20:53 2011
@@ -121,11 +121,52 @@ public final class BloomFilterFactory {
     return conf.getBoolean(IO_STOREFILE_BLOOM_ENABLED, true);
   }
 
+  /**
+   * @return the Bloom filter error rate in the given configuration
+   */
   public static float getErrorRate(Configuration conf) {
     return conf.getFloat(IO_STOREFILE_BLOOM_ERROR_RATE, (float) 0.01);
   }
 
   /**
+   * @return the value for Bloom filter max fold in the given configuration
+   */
+  public static int getMaxFold(Configuration conf) {
+    return conf.getInt(IO_STOREFILE_BLOOM_MAX_FOLD, MAX_ALLOWED_FOLD_FACTOR);
+  }
+
+  /** @return the compound Bloom filter block size from the configuration */
+  public static int getBloomBlockSize(Configuration conf) {
+    return conf.getInt(IO_STOREFILE_BLOOM_BLOCK_SIZE, 128 * 1024);
+  }
+
+  /** @return whether to cache compound Bloom filter chunks on write */
+  public static boolean cacheChunksOnWrite(Configuration conf) {
+    return conf.getBoolean(IO_STOREFILE_BLOOM_CACHE_ON_WRITE, false);
+  }
+
+  /**
+  * @return max key for the Bloom filter from the configuration
+  */
+  public static int getMaxKeys(Configuration conf) {
+    return conf.getInt(IO_STOREFILE_BLOOM_MAX_KEYS, 128 * 1000 * 1000);
+  }
+
+   /**
+   * Copy the BloomFilter related configuration from fromConf to toConf
+   * @param fromConf conf we will copy from
+   * @param toConf conf we will copy to
+   */
+  public static void copyBloomFilterConf(Configuration fromConf, Configuration toConf) {
+    toConf.setBoolean(IO_STOREFILE_BLOOM_ENABLED, isBloomEnabled(fromConf));
+    toConf.setFloat(IO_STOREFILE_BLOOM_ERROR_RATE, getErrorRate(fromConf));
+    toConf.setInt(IO_STOREFILE_BLOOM_MAX_FOLD, getMaxFold(fromConf));
+    toConf.setInt(IO_STOREFILE_BLOOM_BLOCK_SIZE, getBloomBlockSize(fromConf));
+    toConf.setBoolean(IO_STOREFILE_BLOOM_CACHE_ON_WRITE, cacheChunksOnWrite(fromConf));
+    toConf.setInt(IO_STOREFILE_BLOOM_MAX_KEYS, getMaxKeys(fromConf));
+  }
+
+  /**
    * Creates a new Bloom filter at the time of
    * {@link org.apache.hadoop.hbase.regionserver.StoreFile} writing.
    *
@@ -161,8 +202,7 @@ public final class BloomFilterFactory {
       err = (float) (1 - Math.sqrt(1 - err));
     }
 
-    int maxFold = conf.getInt(IO_STOREFILE_BLOOM_MAX_FOLD,
-        MAX_ALLOWED_FOLD_FACTOR);
+    int maxFold = getMaxFold(conf);
 
     if (HFile.getFormatVersion(conf) > HFile.MIN_FORMAT_VERSION) {
       // In case of compound Bloom filters we ignore the maxKeys hint.
@@ -175,8 +215,7 @@ public final class BloomFilterFactory {
     } else {
       // A single-block Bloom filter. Only used when testing HFile format
       // version 1.
-      int tooBig = conf.getInt(IO_STOREFILE_BLOOM_MAX_KEYS,
-          128 * 1000 * 1000);
+      int tooBig = getMaxKeys(conf);
 
       if (maxKeys <= 0) {
         LOG.warn("Invalid maximum number of keys specified: " + maxKeys
@@ -195,14 +234,5 @@ public final class BloomFilterFactory {
     return null;
   }
 
-  /** @return the compound Bloom filter block size from the configuration */
-  public static int getBloomBlockSize(Configuration conf) {
-    return conf.getInt(IO_STOREFILE_BLOOM_BLOCK_SIZE, 128 * 1024);
-  }
-
-  /** @return whether to cache compound Bloom filter chunks on write */
-  public static boolean cacheChunksOnWrite(Configuration conf) {
-    return conf.getBoolean(IO_STOREFILE_BLOOM_CACHE_ON_WRITE, false);
-  }
 
 };

Modified: hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java?rev=1181566&r1=1181565&r2=1181566&view=diff
==============================================================================
--- hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java (original)
+++ hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java Tue Oct 11 02:20:53 2011
@@ -56,6 +56,9 @@ import org.apache.hadoop.hbase.io.hfile.
 import org.apache.hadoop.hbase.io.hfile.Compression.Algorithm;
 import org.apache.hadoop.hbase.io.hfile.HFile;
 import org.apache.hadoop.hbase.io.hfile.HFile.Reader;
+import org.apache.hadoop.hbase.io.hfile.HFile.Reader;
+import org.apache.hadoop.hbase.regionserver.StoreFile;
+import org.apache.hadoop.hbase.regionserver.StoreFile.BloomType;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapreduce.Job;
@@ -82,7 +85,8 @@ public class TestHFileOutputFormat  {
 
   private static final byte[][] FAMILIES
     = { Bytes.add(PerformanceEvaluation.FAMILY_NAME, Bytes.toBytes("-A"))
-      , Bytes.add(PerformanceEvaluation.FAMILY_NAME, Bytes.toBytes("-B"))};
+      , Bytes.add(PerformanceEvaluation.FAMILY_NAME, Bytes.toBytes("-B"))
+      , Bytes.add(PerformanceEvaluation.FAMILY_NAME, Bytes.toBytes("-C"))};
   private static final byte[] TABLE_NAME = Bytes.toBytes("TestTable");
 
   private HBaseTestingUtility util = new HBaseTestingUtility();
@@ -552,6 +556,105 @@ public class TestHFileOutputFormat  {
     return supportedAlgos.toArray(new Compression.Algorithm[0]);
   }
 
+  private void setupColumnFamiliesBloomType(HTable table,
+    Map<String, BloomType> familyToBloom) throws IOException
+  {
+    HTableDescriptor mockTableDescriptor = new HTableDescriptor(TABLE_NAME);
+    for (Entry<String, BloomType> entry : familyToBloom.entrySet()) {
+      mockTableDescriptor.addFamily(
+          new HColumnDescriptor(entry.getKey().getBytes(), 1,
+              Compression.Algorithm.NONE.getName(), false,
+              false, 0, entry.getValue().toString()));
+    }
+    Mockito.doReturn(mockTableDescriptor).when(table).getTableDescriptor();
+  }
+
+  /**
+   * Test that {@link HFileOutputFormat} RecordWriter uses bloomfilter settings
+   * from the column family descriptor
+   */
+  @Test
+  public void testColumnFamilyBloomFilter()
+      throws IOException, InterruptedException {
+    Configuration conf = new Configuration(this.util.getConfiguration());
+    RecordWriter<ImmutableBytesWritable, KeyValue> writer = null;
+    TaskAttemptContext context = null;
+    Path dir =
+        HBaseTestingUtility.getTestDir("testColumnFamilyBloomFilter");
+
+    HTable table = Mockito.mock(HTable.class);
+
+    Map<String, BloomType> configuredBloomFilter =
+      new HashMap<String, BloomType>();
+    BloomType [] bloomTypeValues = BloomType.values();
+
+    int familyIndex = 0;
+    for (byte[] family : FAMILIES) {
+      configuredBloomFilter.put(Bytes.toString(family),
+        bloomTypeValues[familyIndex++ % bloomTypeValues.length]);
+    }
+
+    setupColumnFamiliesBloomType(table, configuredBloomFilter);
+
+    // set up the table to return some mock keys
+    setupMockStartKeys(table);
+
+    try {
+      // partial map red setup to get an operational writer for testing
+      Job job = new Job(conf, "testLocalMRIncrementalLoad");
+      setupRandomGeneratorMapper(job);
+      HFileOutputFormat.configureIncrementalLoad(job, table);
+      FileOutputFormat.setOutputPath(job, dir);
+      context = new TaskAttemptContext(job.getConfiguration(),
+          new TaskAttemptID());
+      HFileOutputFormat hof = new HFileOutputFormat();
+      writer = hof.getRecordWriter(context);
+
+      // write out random rows
+      writeRandomKeyValues(writer, context, ROWSPERSPLIT);
+      writer.close(context);
+
+      // Make sure that a directory was created for every CF
+      FileSystem fileSystem = dir.getFileSystem(conf);
+
+      // commit so that the filesystem has one directory per column family
+      hof.getOutputCommitter(context).commitTask(context);
+      for (byte[] family : FAMILIES) {
+        String familyStr = new String(family);
+        boolean found = false;
+        for (FileStatus f : fileSystem.listStatus(dir)) {
+
+          if (Bytes.toString(family).equals(f.getPath().getName())) {
+            // we found a matching directory
+            found = true;
+
+            // verify that the bloomfilter type on this file matches the
+            // configured bloom type.
+            Path dataFilePath = fileSystem.listStatus(f.getPath())[0].getPath();
+            StoreFile.Reader reader = new StoreFile.Reader(fileSystem,
+                dataFilePath, null, false, true);
+            Map<byte[], byte[]> metadataMap = reader.loadFileInfo();
+
+            assertTrue("timeRange is not set",
+			metadataMap.get(StoreFile.TIMERANGE_KEY) != null);
+            assertEquals("Incorrect bloom type used for column family " +
+			     familyStr + "(reader: " + reader + ")",
+                         configuredBloomFilter.get(familyStr),
+                         reader.getBloomFilterType());
+            break;
+          }
+        }
+
+        if (!found) {
+          fail("HFile for column family " + familyStr + " not found");
+        }
+      }
+
+    } finally {
+      dir.getFileSystem(conf).delete(dir, true);
+    }
+  }
+
 
   /**
    * Write random values to the writer assuming a table created using