You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by te...@apache.org on 2012/12/28 06:23:42 UTC
svn commit: r1426406 -
/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java
Author: tedyu
Date: Fri Dec 28 05:23:42 2012
New Revision: 1426406
URL: http://svn.apache.org/viewvc?rev=1426406&view=rev
Log:
HBASE-3776 Add Bloom Filter Support to HFileOutputFormat (Anoop Sam John)
Modified:
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java?rev=1426406&r1=1426405&r2=1426406&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java Fri Dec 28 05:23:42 2012
@@ -49,13 +49,15 @@ import org.apache.hadoop.hbase.client.Pu
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
+import org.apache.hadoop.hbase.io.hfile.AbstractHFileWriter;
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder;
import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoderImpl;
import org.apache.hadoop.hbase.io.hfile.NoOpDataBlockEncoder;
import org.apache.hadoop.hbase.regionserver.HStore;
import org.apache.hadoop.hbase.regionserver.StoreFile;
-import org.apache.hadoop.hbase.regionserver.TimeRangeTracker;
+import org.apache.hadoop.hbase.regionserver.StoreFile.BloomType;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.SequenceFile;
@@ -82,7 +84,7 @@ import org.apache.hadoop.mapreduce.lib.p
public class HFileOutputFormat extends FileOutputFormat<ImmutableBytesWritable, KeyValue> {
static Log LOG = LogFactory.getLog(HFileOutputFormat.class);
static final String COMPRESSION_CONF_KEY = "hbase.hfileoutputformat.families.compression";
- TimeRangeTracker trt = new TimeRangeTracker();
+ private static final String BLOOM_TYPE_CONF_KEY = "hbase.hfileoutputformat.families.bloomtype";
private static final String DATABLOCK_ENCODING_CONF_KEY =
"hbase.mapreduce.hfileoutputformat.datablock.encoding";
@@ -106,6 +108,7 @@ public class HFileOutputFormat extends F
// create a map from column family to the compression algorithm
final Map<byte[], String> compressionMap = createFamilyCompressionMap(conf);
+ final Map<byte[], String> bloomTypeMap = createFamilyBloomMap(conf);
String dataBlockEncodingStr = conf.get(DATABLOCK_ENCODING_CONF_KEY);
final HFileDataBlockEncoder encoder;
@@ -166,7 +169,6 @@ public class HFileOutputFormat extends F
// we now have the proper HLog writer. full steam ahead
kv.updateLatestStamp(this.now);
- trt.includeTimestamp(kv);
wl.writer.append(kv);
wl.written += length;
@@ -187,9 +189,9 @@ public class HFileOutputFormat extends F
this.rollRequested = false;
}
- /* Create a new HFile.Writer.
+ /* Create a new StoreFile.Writer.
* @param family
- * @return A WriterLength, containing a new HFile.Writer.
+ * @return A WriterLength, containing a new StoreFile.Writer.
* @throws IOException
*/
private WriterLength getNewWriter(byte[] family, Configuration conf)
@@ -198,20 +200,28 @@ public class HFileOutputFormat extends F
Path familydir = new Path(outputdir, Bytes.toString(family));
String compression = compressionMap.get(family);
compression = compression == null ? defaultCompression : compression;
- wl.writer = HFile.getWriterFactoryNoCache(conf)
- .withPath(fs, StoreFile.getUniqueFile(fs, familydir))
- .withBlockSize(blocksize)
- .withCompression(compression)
- .withComparator(KeyValue.KEY_COMPARATOR)
+ String bloomTypeStr = bloomTypeMap.get(family);
+ BloomType bloomType = BloomType.NONE;
+ if (bloomTypeStr != null) {
+ bloomType = BloomType.valueOf(bloomTypeStr);
+ }
+ Configuration tempConf = new Configuration(conf);
+ tempConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0.0f);
+ wl.writer = new StoreFile.WriterBuilder(conf, new CacheConfig(tempConf), fs, blocksize)
+ .withOutputDir(familydir)
+ .withCompression(AbstractHFileWriter.compressionByName(compression))
+ .withBloomType(bloomType)
+ .withComparator(KeyValue.COMPARATOR)
.withDataBlockEncoder(encoder)
.withChecksumType(HStore.getChecksumType(conf))
.withBytesPerChecksum(HStore.getBytesPerChecksum(conf))
- .create();
+ .build();
+
this.writers.put(family, wl);
return wl;
}
- private void close(final HFile.Writer w) throws IOException {
+ private void close(final StoreFile.Writer w) throws IOException {
if (w != null) {
w.appendFileInfo(StoreFile.BULKLOAD_TIME_KEY,
Bytes.toBytes(System.currentTimeMillis()));
@@ -221,8 +231,7 @@ public class HFileOutputFormat extends F
Bytes.toBytes(true));
w.appendFileInfo(StoreFile.EXCLUDE_FROM_MINOR_COMPACTION_KEY,
Bytes.toBytes(compactionExclude));
- w.appendFileInfo(StoreFile.TIMERANGE_KEY,
- WritableUtils.toByteArray(trt));
+ w.appendTrackedTimestampsToMetadata();
w.close();
}
}
@@ -241,7 +250,7 @@ public class HFileOutputFormat extends F
*/
static class WriterLength {
long written = 0;
- HFile.Writer writer = null;
+ StoreFile.Writer writer = null;
}
/**
@@ -359,7 +368,8 @@ public class HFileOutputFormat extends F
// Set compression algorithms based on column families
configureCompression(table, conf);
-
+ configureBloomType(table, conf);
+
TableMapReduceUtil.addDependencyJars(job);
LOG.info("Incremental table output configured.");
}
@@ -375,25 +385,39 @@ public class HFileOutputFormat extends F
* algorithm
*/
static Map<byte[], String> createFamilyCompressionMap(Configuration conf) {
- Map<byte[], String> compressionMap = new TreeMap<byte[], String>(Bytes.BYTES_COMPARATOR);
- String compressionConf = conf.get(COMPRESSION_CONF_KEY, "");
- for (String familyConf : compressionConf.split("&")) {
+ return createFamilyConfValueMap(conf, COMPRESSION_CONF_KEY);
+ }
+
+ private static Map<byte[], String> createFamilyBloomMap(Configuration conf) {
+ return createFamilyConfValueMap(conf, BLOOM_TYPE_CONF_KEY);
+ }
+
+ /**
+ * Run inside the task to deserialize column family to given conf value map.
+ *
+ * @param conf
+ * @param confName
+ * @return a map of column family to the given configuration value
+ */
+ private static Map<byte[], String> createFamilyConfValueMap(Configuration conf, String confName) {
+ Map<byte[], String> confValMap = new TreeMap<byte[], String>(Bytes.BYTES_COMPARATOR);
+ String confVal = conf.get(confName, "");
+ for (String familyConf : confVal.split("&")) {
String[] familySplit = familyConf.split("=");
if (familySplit.length != 2) {
continue;
}
-
try {
- compressionMap.put(URLDecoder.decode(familySplit[0], "UTF-8").getBytes(),
+ confValMap.put(URLDecoder.decode(familySplit[0], "UTF-8").getBytes(),
URLDecoder.decode(familySplit[1], "UTF-8"));
} catch (UnsupportedEncodingException e) {
// will not happen with UTF-8 encoding
throw new AssertionError(e);
}
}
- return compressionMap;
+ return confValMap;
}
-
+
/**
* Serialize column family to compression algorithm map to configuration.
* Invoked while configuring the MR job for incremental load.
@@ -423,4 +447,35 @@ public class HFileOutputFormat extends F
// Get rid of the last ampersand
conf.set(COMPRESSION_CONF_KEY, compressionConfigValue.toString());
}
+
+ /**
+ * Serialize column family to bloom type map to configuration.
+ * Invoked while configuring the MR job for incremental load.
+ *
+ * @throws IOException
+ * on failure to read column family descriptors
+ */
+ static void configureBloomType(HTable table, Configuration conf) throws IOException {
+ HTableDescriptor tableDescriptor = table.getTableDescriptor();
+ if (tableDescriptor == null) {
+ // could happen with mock table instance
+ return;
+ }
+ StringBuilder bloomTypeConfigValue = new StringBuilder();
+ Collection<HColumnDescriptor> families = tableDescriptor.getFamilies();
+ int i = 0;
+ for (HColumnDescriptor familyDescriptor : families) {
+ if (i++ > 0) {
+ bloomTypeConfigValue.append('&');
+ }
+ bloomTypeConfigValue.append(URLEncoder.encode(familyDescriptor.getNameAsString(), "UTF-8"));
+ bloomTypeConfigValue.append('=');
+ String bloomType = familyDescriptor.getBloomFilterType().toString();
+ if (bloomType == null) {
+ bloomType = HColumnDescriptor.DEFAULT_BLOOMFILTER;
+ }
+ bloomTypeConfigValue.append(URLEncoder.encode(bloomType, "UTF-8"));
+ }
+ conf.set(BLOOM_TYPE_CONF_KEY, bloomTypeConfigValue.toString());
+ }
}