You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by te...@apache.org on 2012/12/28 06:23:42 UTC

svn commit: r1426406 - /hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java

Author: tedyu
Date: Fri Dec 28 05:23:42 2012
New Revision: 1426406

URL: http://svn.apache.org/viewvc?rev=1426406&view=rev
Log:
HBASE-3776 Add Bloom Filter Support to HFileOutputFormat (Anoop Sam John)


Modified:
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java?rev=1426406&r1=1426405&r2=1426406&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java Fri Dec 28 05:23:42 2012
@@ -49,13 +49,15 @@ import org.apache.hadoop.hbase.client.Pu
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.io.compress.Compression;
 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
+import org.apache.hadoop.hbase.io.hfile.AbstractHFileWriter;
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
 import org.apache.hadoop.hbase.io.hfile.HFile;
 import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder;
 import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoderImpl;
 import org.apache.hadoop.hbase.io.hfile.NoOpDataBlockEncoder;
 import org.apache.hadoop.hbase.regionserver.HStore;
 import org.apache.hadoop.hbase.regionserver.StoreFile;
-import org.apache.hadoop.hbase.regionserver.TimeRangeTracker;
+import org.apache.hadoop.hbase.regionserver.StoreFile.BloomType;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.SequenceFile;
@@ -82,7 +84,7 @@ import org.apache.hadoop.mapreduce.lib.p
 public class HFileOutputFormat extends FileOutputFormat<ImmutableBytesWritable, KeyValue> {
   static Log LOG = LogFactory.getLog(HFileOutputFormat.class);
   static final String COMPRESSION_CONF_KEY = "hbase.hfileoutputformat.families.compression";
-  TimeRangeTracker trt = new TimeRangeTracker();
+  private static final String BLOOM_TYPE_CONF_KEY = "hbase.hfileoutputformat.families.bloomtype";
   private static final String DATABLOCK_ENCODING_CONF_KEY =
      "hbase.mapreduce.hfileoutputformat.datablock.encoding";
 
@@ -106,6 +108,7 @@ public class HFileOutputFormat extends F
 
     // create a map from column family to the compression algorithm
     final Map<byte[], String> compressionMap = createFamilyCompressionMap(conf);
+    final Map<byte[], String> bloomTypeMap = createFamilyBloomMap(conf);
 
     String dataBlockEncodingStr = conf.get(DATABLOCK_ENCODING_CONF_KEY);
     final HFileDataBlockEncoder encoder;
@@ -166,7 +169,6 @@ public class HFileOutputFormat extends F
 
         // we now have the proper HLog writer. full steam ahead
         kv.updateLatestStamp(this.now);
-        trt.includeTimestamp(kv);
         wl.writer.append(kv);
         wl.written += length;
 
@@ -187,9 +189,9 @@ public class HFileOutputFormat extends F
         this.rollRequested = false;
       }
 
-      /* Create a new HFile.Writer.
+      /* Create a new StoreFile.Writer.
        * @param family
-       * @return A WriterLength, containing a new HFile.Writer.
+       * @return A WriterLength, containing a new StoreFile.Writer.
        * @throws IOException
        */
       private WriterLength getNewWriter(byte[] family, Configuration conf)
@@ -198,20 +200,28 @@ public class HFileOutputFormat extends F
         Path familydir = new Path(outputdir, Bytes.toString(family));
         String compression = compressionMap.get(family);
         compression = compression == null ? defaultCompression : compression;
-        wl.writer = HFile.getWriterFactoryNoCache(conf)
-            .withPath(fs, StoreFile.getUniqueFile(fs, familydir))
-            .withBlockSize(blocksize)
-            .withCompression(compression)
-            .withComparator(KeyValue.KEY_COMPARATOR)
+        String bloomTypeStr = bloomTypeMap.get(family);
+        BloomType bloomType = BloomType.NONE;
+        if (bloomTypeStr != null) {
+          bloomType = BloomType.valueOf(bloomTypeStr);
+        }
+        Configuration tempConf = new Configuration(conf);
+        tempConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0.0f);
+        wl.writer = new StoreFile.WriterBuilder(conf, new CacheConfig(tempConf), fs, blocksize)
+            .withOutputDir(familydir)
+            .withCompression(AbstractHFileWriter.compressionByName(compression))
+            .withBloomType(bloomType)
+            .withComparator(KeyValue.COMPARATOR)
             .withDataBlockEncoder(encoder)
             .withChecksumType(HStore.getChecksumType(conf))
             .withBytesPerChecksum(HStore.getBytesPerChecksum(conf))
-            .create();
+            .build();
+        
         this.writers.put(family, wl);
         return wl;
       }
 
-      private void close(final HFile.Writer w) throws IOException {
+      private void close(final StoreFile.Writer w) throws IOException {
         if (w != null) {
           w.appendFileInfo(StoreFile.BULKLOAD_TIME_KEY,
               Bytes.toBytes(System.currentTimeMillis()));
@@ -221,8 +231,7 @@ public class HFileOutputFormat extends F
               Bytes.toBytes(true));
           w.appendFileInfo(StoreFile.EXCLUDE_FROM_MINOR_COMPACTION_KEY,
               Bytes.toBytes(compactionExclude));
-          w.appendFileInfo(StoreFile.TIMERANGE_KEY,
-              WritableUtils.toByteArray(trt));
+          w.appendTrackedTimestampsToMetadata();
           w.close();
         }
       }
@@ -241,7 +250,7 @@ public class HFileOutputFormat extends F
    */
   static class WriterLength {
     long written = 0;
-    HFile.Writer writer = null;
+    StoreFile.Writer writer = null;
   }
 
   /**
@@ -359,7 +368,8 @@ public class HFileOutputFormat extends F
 
     // Set compression algorithms based on column families
     configureCompression(table, conf);
-
+    configureBloomType(table, conf);
+    
     TableMapReduceUtil.addDependencyJars(job);
     LOG.info("Incremental table output configured.");
   }
@@ -375,25 +385,39 @@ public class HFileOutputFormat extends F
    *         algorithm
    */
   static Map<byte[], String> createFamilyCompressionMap(Configuration conf) {
-    Map<byte[], String> compressionMap = new TreeMap<byte[], String>(Bytes.BYTES_COMPARATOR);
-    String compressionConf = conf.get(COMPRESSION_CONF_KEY, "");
-    for (String familyConf : compressionConf.split("&")) {
+    return createFamilyConfValueMap(conf, COMPRESSION_CONF_KEY);
+  }
+
+  private static Map<byte[], String> createFamilyBloomMap(Configuration conf) {
+    return createFamilyConfValueMap(conf, BLOOM_TYPE_CONF_KEY);
+  }
+  
+  /**
+   * Run inside the task to deserialize column family to given conf value map.
+   * 
+   * @param conf
+   * @param confName
+   * @return a map of column family to the given configuration value
+   */
+  private static Map<byte[], String> createFamilyConfValueMap(Configuration conf, String confName) {
+    Map<byte[], String> confValMap = new TreeMap<byte[], String>(Bytes.BYTES_COMPARATOR);
+    String confVal = conf.get(confName, "");
+    for (String familyConf : confVal.split("&")) {
       String[] familySplit = familyConf.split("=");
       if (familySplit.length != 2) {
         continue;
       }
-
       try {
-        compressionMap.put(URLDecoder.decode(familySplit[0], "UTF-8").getBytes(),
+        confValMap.put(URLDecoder.decode(familySplit[0], "UTF-8").getBytes(),
             URLDecoder.decode(familySplit[1], "UTF-8"));
       } catch (UnsupportedEncodingException e) {
         // will not happen with UTF-8 encoding
         throw new AssertionError(e);
       }
     }
-    return compressionMap;
+    return confValMap;
   }
-
+  
   /**
    * Serialize column family to compression algorithm map to configuration.
    * Invoked while configuring the MR job for incremental load.
@@ -423,4 +447,35 @@ public class HFileOutputFormat extends F
     // Get rid of the last ampersand
     conf.set(COMPRESSION_CONF_KEY, compressionConfigValue.toString());
   }
+  
+  /**
+   * Serialize column family to bloom type map to configuration.
+   * Invoked while configuring the MR job for incremental load.
+   *
+   * @throws IOException
+   *           on failure to read column family descriptors
+   */
+  static void configureBloomType(HTable table, Configuration conf) throws IOException {
+    HTableDescriptor tableDescriptor = table.getTableDescriptor();
+    if (tableDescriptor == null) {
+      // could happen with mock table instance
+      return;
+    }
+    StringBuilder bloomTypeConfigValue = new StringBuilder();
+    Collection<HColumnDescriptor> families = tableDescriptor.getFamilies();
+    int i = 0;
+    for (HColumnDescriptor familyDescriptor : families) {
+      if (i++ > 0) {
+        bloomTypeConfigValue.append('&');
+      }
+      bloomTypeConfigValue.append(URLEncoder.encode(familyDescriptor.getNameAsString(), "UTF-8"));
+      bloomTypeConfigValue.append('=');
+      String bloomType = familyDescriptor.getBloomFilterType().toString();
+      if (bloomType == null) {
+        bloomType = HColumnDescriptor.DEFAULT_BLOOMFILTER;
+      }
+      bloomTypeConfigValue.append(URLEncoder.encode(bloomType, "UTF-8"));
+    }
+    conf.set(BLOOM_TYPE_CONF_KEY, bloomTypeConfigValue.toString());
+  }
 }