You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ns...@apache.org on 2011/10/11 04:20:54 UTC
svn commit: r1181566 - in /hbase/branches/0.89/src:
main/java/org/apache/hadoop/hbase/mapreduce/
main/java/org/apache/hadoop/hbase/regionserver/
main/java/org/apache/hadoop/hbase/util/
test/java/org/apache/hadoop/hbase/mapreduce/
Author: nspiegelberg
Date: Tue Oct 11 02:20:53 2011
New Revision: 1181566
URL: http://svn.apache.org/viewvc?rev=1181566&view=rev
Log:
add BloomFilter and timestamprange for bulk import
Summary:
add BloomFilter and timestamprange for bulk import.
This is the version updated for hfile v2.
Test Plan:
unit tests.
Reviewed By: kannan
Reviewers: nspiegelberg, kannan, kranganathan, mbautin
Commenters: kranganathan, nspiegelberg, mbautin
CC: hbase@lists, kranganathan, nspiegelberg, gqchen, ,
mbautin, kannan
Differential Revision: 237110
Modified:
hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java
hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/util/BloomFilterFactory.java
hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java
Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java?rev=1181566&r1=1181565&r2=1181566&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java Tue Oct 11 02:20:53 2011
@@ -47,6 +47,8 @@ import org.apache.hadoop.hbase.io.hfile.
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.mapreduce.hadoopbackport.TotalOrderPartitioner;
import org.apache.hadoop.hbase.regionserver.StoreFile;
+import org.apache.hadoop.hbase.regionserver.StoreFile.BloomType;
+import org.apache.hadoop.hbase.util.BloomFilterFactory;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.SequenceFile;
@@ -70,6 +72,9 @@ import org.apache.commons.logging.LogFac
public class HFileOutputFormat extends FileOutputFormat<ImmutableBytesWritable, KeyValue> {
static Log LOG = LogFactory.getLog(HFileOutputFormat.class);
static final String COMPRESSION_CONF_KEY = "hbase.hfileoutputformat.families.compression";
+ //This stores a string in the format family1=bloomType1&family2=bloomType2&...&familyN=bloomTypeN
+ static final String BLOOMFILTER_TYPE_PER_CF_KEY =
+ "hbase.hfileoutputformat.families.bloomfilter.typePerCF";
public RecordWriter<ImmutableBytesWritable, KeyValue> getRecordWriter(final TaskAttemptContext context)
throws IOException, InterruptedException {
@@ -90,7 +95,7 @@ public class HFileOutputFormat extends F
// create a map from column family to the compression algorithm
final Map<byte[], String> compressionMap = createFamilyCompressionMap(conf);
- final int bytesPerChecksum = HFile.getBytesPerChecksum(conf, conf);
+ final Map<byte[], BloomType> bloomTypeMap = createFamilyBloomTypeMap(conf);
return new RecordWriter<ImmutableBytesWritable, KeyValue>() {
// Map of families to writers and how much has been output on the writer.
@@ -102,6 +107,7 @@ public class HFileOutputFormat extends F
public void write(ImmutableBytesWritable row, KeyValue kv)
throws IOException {
+
// null input == user explicitly wants to flush
if (row == null && kv == null) {
rollWriters();
@@ -167,21 +173,35 @@ public class HFileOutputFormat extends F
Path familydir = new Path(outputdir, Bytes.toString(family));
String compression = compressionMap.get(family);
compression = compression == null ? defaultCompression : compression;
- wl.writer = HFile.getWriterFactory(conf).createWriter(fs,
- StoreFile.getUniqueFile(fs, familydir), blocksize,
- bytesPerChecksum, compression, KeyValue.KEY_COMPARATOR);
+ Compression.Algorithm compressionAlgo =
+ Compression.getCompressionAlgorithmByName(compression);
+
+ BloomType bloomType = bloomTypeMap.get(family);
+ if (bloomType == null) {
+ bloomType = BloomType.NONE;
+ }
+
+ /* new bloom filter does not require maxKeys. */
+ int maxKeys = 0;
+ wl.writer = StoreFile.createWriter(fs, familydir, blocksize,
+ compressionAlgo, KeyValue.COMPARATOR, conf, bloomType, maxKeys);
this.writers.put(family, wl);
return wl;
}
- private void close(final HFile.Writer w) throws IOException {
+ private void close(final StoreFile.Writer w) throws IOException {
if (w != null) {
w.appendFileInfo(StoreFile.BULKLOAD_TIME_KEY,
Bytes.toBytes(System.currentTimeMillis()));
w.appendFileInfo(StoreFile.BULKLOAD_TASK_KEY,
Bytes.toBytes(context.getTaskAttemptID().toString()));
- w.appendFileInfo(StoreFile.MAJOR_COMPACTION_KEY,
- Bytes.toBytes(true));
+
+ /* Set maxSequenceId to be 0 for bulk imported files since
+ * these files do not correspond to any edit log items.
+ *
+ * Set majorCompaction flag to be false for bulk import file.
+ */
+ w.appendMetadata(0, false);
w.close();
}
}
@@ -200,7 +220,7 @@ public class HFileOutputFormat extends F
*/
static class WriterLength {
long written = 0;
- HFile.Writer writer = null;
+ StoreFile.Writer writer = null;
}
/**
@@ -317,6 +337,10 @@ public class HFileOutputFormat extends F
// Set compression algorithms based on column families
configureCompression(table, conf);
+ // Set BloomFilter type based on column families and
+ // relevant parameters.
+ configureBloomFilter(table, conf);
+
LOG.info("Incremental table output configured.");
}
@@ -379,4 +403,66 @@ public class HFileOutputFormat extends F
// Get rid of the last ampersand
conf.set(COMPRESSION_CONF_KEY, compressionConfigValue.toString());
}
+
+ private static void configureBloomFilter(HTable table, Configuration conf)
+ throws IOException {
+ // get conf information needed by BloomFilter
+ Configuration tableConf = table.getConfiguration();
+
+ // Now go through the column family and save the BloomFilter setting for
+ // each column family
+ HTableDescriptor tableDescriptor = table.getTableDescriptor();
+ if (tableDescriptor == null){
+ return;
+ }
+
+ if (tableConf != null) {
+ // copying Bloom filter related configuration to conf.
+ BloomFilterFactory.copyBloomFilterConf(tableConf, conf);
+ }
+
+ StringBuilder bloomfilterTypePerCFConfigValue = new StringBuilder();
+ Collection<HColumnDescriptor> families = tableDescriptor.getFamilies();
+ int i = 0;
+ for (HColumnDescriptor familyDescriptor : families) {
+ if (i++ > 0) {
+ bloomfilterTypePerCFConfigValue.append('&');
+ }
+
+ bloomfilterTypePerCFConfigValue.append(
+ URLEncoder.encode(familyDescriptor.getNameAsString(), "UTF-8"));
+ bloomfilterTypePerCFConfigValue.append('=');
+ bloomfilterTypePerCFConfigValue.append(
+ URLEncoder.encode(familyDescriptor.getBloomFilterType().toString(),
+ "UTF-8"));
+ }
+
+ conf.set(BLOOMFILTER_TYPE_PER_CF_KEY, bloomfilterTypePerCFConfigValue.toString());
+ }
+
+ static Map<byte[], BloomType> createFamilyBloomTypeMap(Configuration conf) {
+ Map<byte[], BloomType> bloomTypeMap =
+ new TreeMap<byte[], BloomType >(Bytes.BYTES_COMPARATOR);
+ String bloomFilterTypeConf = conf.get(BLOOMFILTER_TYPE_PER_CF_KEY, "");
+
+ if (bloomFilterTypeConf.isEmpty()) {
+ return bloomTypeMap;
+ }
+
+ for (String familyConf : bloomFilterTypeConf.split("&")) {
+ String[] familySplit = familyConf.split("=");
+ if (familySplit.length != 2) {
+ throw new AssertionError("invalid bloomfilter type configuration");
+ }
+
+ try {
+ bloomTypeMap.put(URLDecoder.decode(familySplit[0], "UTF-8").getBytes(),
+ BloomType.valueOf(URLDecoder.decode(familySplit[1], "UTF-8")));
+ } catch (UnsupportedEncodingException e) {
+ // will not happen with UTF-8 encoding
+ throw new AssertionError(e);
+ }
+ }
+ return bloomTypeMap;
+ }
}
Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java?rev=1181566&r1=1181565&r2=1181566&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java Tue Oct 11 02:20:53 2011
@@ -116,7 +116,7 @@ public class StoreFile {
private static final byte[] LAST_BLOOM_KEY = Bytes.toBytes("LAST_BLOOM_KEY");
/** Key for Timerange information in metadata*/
- static final byte[] TIMERANGE_KEY = Bytes.toBytes("TIMERANGE");
+ public static final byte[] TIMERANGE_KEY = Bytes.toBytes("TIMERANGE");
// Make default block size for StoreFiles 8k while testing. TODO: FIX!
// Need to make it 8k for testing.
Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/util/BloomFilterFactory.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/util/BloomFilterFactory.java?rev=1181566&r1=1181565&r2=1181566&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/util/BloomFilterFactory.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/util/BloomFilterFactory.java Tue Oct 11 02:20:53 2011
@@ -121,11 +121,52 @@ public final class BloomFilterFactory {
return conf.getBoolean(IO_STOREFILE_BLOOM_ENABLED, true);
}
+ /**
+ * @return the Bloom filter error rate in the given configuration
+ */
public static float getErrorRate(Configuration conf) {
return conf.getFloat(IO_STOREFILE_BLOOM_ERROR_RATE, (float) 0.01);
}
/**
+ * @return the value for Bloom filter max fold in the given configuration
+ */
+ public static int getMaxFold(Configuration conf) {
+ return conf.getInt(IO_STOREFILE_BLOOM_MAX_FOLD, MAX_ALLOWED_FOLD_FACTOR);
+ }
+
+ /** @return the compound Bloom filter block size from the configuration */
+ public static int getBloomBlockSize(Configuration conf) {
+ return conf.getInt(IO_STOREFILE_BLOOM_BLOCK_SIZE, 128 * 1024);
+ }
+
+ /** @return whether to cache compound Bloom filter chunks on write */
+ public static boolean cacheChunksOnWrite(Configuration conf) {
+ return conf.getBoolean(IO_STOREFILE_BLOOM_CACHE_ON_WRITE, false);
+ }
+
+ /**
+ * @return max key for the Bloom filter from the configuration
+ */
+ public static int getMaxKeys(Configuration conf) {
+ return conf.getInt(IO_STOREFILE_BLOOM_MAX_KEYS, 128 * 1000 * 1000);
+ }
+
+ /**
+ * Copy the BloomFilter related configuration from fromConf to toConf
+ * @param fromConf conf we will copy from
+ * @param toConf conf we will copy to
+ */
+ public static void copyBloomFilterConf(Configuration fromConf, Configuration toConf) {
+ toConf.setBoolean(IO_STOREFILE_BLOOM_ENABLED, isBloomEnabled(fromConf));
+ toConf.setFloat(IO_STOREFILE_BLOOM_ERROR_RATE, getErrorRate(fromConf));
+ toConf.setInt(IO_STOREFILE_BLOOM_MAX_FOLD, getMaxFold(fromConf));
+ toConf.setInt(IO_STOREFILE_BLOOM_BLOCK_SIZE, getBloomBlockSize(fromConf));
+ toConf.setBoolean(IO_STOREFILE_BLOOM_CACHE_ON_WRITE, cacheChunksOnWrite(fromConf));
+ toConf.setInt(IO_STOREFILE_BLOOM_MAX_KEYS, getMaxKeys(fromConf));
+ }
+
+ /**
* Creates a new Bloom filter at the time of
* {@link org.apache.hadoop.hbase.regionserver.StoreFile} writing.
*
@@ -161,8 +202,7 @@ public final class BloomFilterFactory {
err = (float) (1 - Math.sqrt(1 - err));
}
- int maxFold = conf.getInt(IO_STOREFILE_BLOOM_MAX_FOLD,
- MAX_ALLOWED_FOLD_FACTOR);
+ int maxFold = getMaxFold(conf);
if (HFile.getFormatVersion(conf) > HFile.MIN_FORMAT_VERSION) {
// In case of compound Bloom filters we ignore the maxKeys hint.
@@ -175,8 +215,7 @@ public final class BloomFilterFactory {
} else {
// A single-block Bloom filter. Only used when testing HFile format
// version 1.
- int tooBig = conf.getInt(IO_STOREFILE_BLOOM_MAX_KEYS,
- 128 * 1000 * 1000);
+ int tooBig = getMaxKeys(conf);
if (maxKeys <= 0) {
LOG.warn("Invalid maximum number of keys specified: " + maxKeys
@@ -195,14 +234,5 @@ public final class BloomFilterFactory {
return null;
}
- /** @return the compound Bloom filter block size from the configuration */
- public static int getBloomBlockSize(Configuration conf) {
- return conf.getInt(IO_STOREFILE_BLOOM_BLOCK_SIZE, 128 * 1024);
- }
-
- /** @return whether to cache compound Bloom filter chunks on write */
- public static boolean cacheChunksOnWrite(Configuration conf) {
- return conf.getBoolean(IO_STOREFILE_BLOOM_CACHE_ON_WRITE, false);
- }
};
Modified: hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java?rev=1181566&r1=1181565&r2=1181566&view=diff
==============================================================================
--- hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java (original)
+++ hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java Tue Oct 11 02:20:53 2011
@@ -56,6 +56,9 @@ import org.apache.hadoop.hbase.io.hfile.
import org.apache.hadoop.hbase.io.hfile.Compression.Algorithm;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.HFile.Reader;
+import org.apache.hadoop.hbase.io.hfile.HFile.Reader;
+import org.apache.hadoop.hbase.regionserver.StoreFile;
+import org.apache.hadoop.hbase.regionserver.StoreFile.BloomType;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
@@ -82,7 +85,8 @@ public class TestHFileOutputFormat {
private static final byte[][] FAMILIES
= { Bytes.add(PerformanceEvaluation.FAMILY_NAME, Bytes.toBytes("-A"))
- , Bytes.add(PerformanceEvaluation.FAMILY_NAME, Bytes.toBytes("-B"))};
+ , Bytes.add(PerformanceEvaluation.FAMILY_NAME, Bytes.toBytes("-B"))
+ , Bytes.add(PerformanceEvaluation.FAMILY_NAME, Bytes.toBytes("-C"))};
private static final byte[] TABLE_NAME = Bytes.toBytes("TestTable");
private HBaseTestingUtility util = new HBaseTestingUtility();
@@ -552,6 +556,105 @@ public class TestHFileOutputFormat {
return supportedAlgos.toArray(new Compression.Algorithm[0]);
}
+ private void setupColumnFamiliesBloomType(HTable table,
+ Map<String, BloomType> familyToBloom) throws IOException
+ {
+ HTableDescriptor mockTableDescriptor = new HTableDescriptor(TABLE_NAME);
+ for (Entry<String, BloomType> entry : familyToBloom.entrySet()) {
+ mockTableDescriptor.addFamily(
+ new HColumnDescriptor(entry.getKey().getBytes(), 1,
+ Compression.Algorithm.NONE.getName(), false,
+ false, 0, entry.getValue().toString()));
+ }
+ Mockito.doReturn(mockTableDescriptor).when(table).getTableDescriptor();
+ }
+
+ /**
+ * Test that {@link HFileOutputFormat} RecordWriter uses bloomfilter settings
+ * from the column family descriptor
+ */
+ @Test
+ public void testColumnFamilyBloomFilter()
+ throws IOException, InterruptedException {
+ Configuration conf = new Configuration(this.util.getConfiguration());
+ RecordWriter<ImmutableBytesWritable, KeyValue> writer = null;
+ TaskAttemptContext context = null;
+ Path dir =
+ HBaseTestingUtility.getTestDir("testColumnFamilyBloomFilter");
+
+ HTable table = Mockito.mock(HTable.class);
+
+ Map<String, BloomType> configuredBloomFilter =
+ new HashMap<String, BloomType>();
+ BloomType [] bloomTypeValues = BloomType.values();
+
+ int familyIndex = 0;
+ for (byte[] family : FAMILIES) {
+ configuredBloomFilter.put(Bytes.toString(family),
+ bloomTypeValues[familyIndex++ % bloomTypeValues.length]);
+ }
+
+ setupColumnFamiliesBloomType(table, configuredBloomFilter);
+
+ // set up the table to return some mock keys
+ setupMockStartKeys(table);
+
+ try {
+ // partial map red setup to get an operational writer for testing
+ Job job = new Job(conf, "testLocalMRIncrementalLoad");
+ setupRandomGeneratorMapper(job);
+ HFileOutputFormat.configureIncrementalLoad(job, table);
+ FileOutputFormat.setOutputPath(job, dir);
+ context = new TaskAttemptContext(job.getConfiguration(),
+ new TaskAttemptID());
+ HFileOutputFormat hof = new HFileOutputFormat();
+ writer = hof.getRecordWriter(context);
+
+ // write out random rows
+ writeRandomKeyValues(writer, context, ROWSPERSPLIT);
+ writer.close(context);
+
+ // Make sure that a directory was created for every CF
+ FileSystem fileSystem = dir.getFileSystem(conf);
+
+ // commit so that the filesystem has one directory per column family
+ hof.getOutputCommitter(context).commitTask(context);
+ for (byte[] family : FAMILIES) {
+ String familyStr = new String(family);
+ boolean found = false;
+ for (FileStatus f : fileSystem.listStatus(dir)) {
+
+ if (Bytes.toString(family).equals(f.getPath().getName())) {
+ // we found a matching directory
+ found = true;
+
+ // verify that the bloomfilter type on this file matches the
+ // configured bloom type.
+ Path dataFilePath = fileSystem.listStatus(f.getPath())[0].getPath();
+ StoreFile.Reader reader = new StoreFile.Reader(fileSystem,
+ dataFilePath, null, false, true);
+ Map<byte[], byte[]> metadataMap = reader.loadFileInfo();
+
+ assertTrue("timeRange is not set",
+ metadataMap.get(StoreFile.TIMERANGE_KEY) != null);
+ assertEquals("Incorrect bloom type used for column family " +
+ familyStr + "(reader: " + reader + ")",
+ configuredBloomFilter.get(familyStr),
+ reader.getBloomFilterType());
+ break;
+ }
+ }
+
+ if (!found) {
+ fail("HFile for column family " + familyStr + " not found");
+ }
+ }
+
+ } finally {
+ dir.getFileSystem(conf).delete(dir, true);
+ }
+ }
+
/**
* Write random values to the writer assuming a table created using