You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2011/03/24 23:59:45 UTC
svn commit: r1085179 - in /hbase/trunk: CHANGES.txt
src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java
src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java
src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java
Author: stack
Date: Thu Mar 24 22:59:45 2011
New Revision: 1085179
URL: http://svn.apache.org/viewvc?rev=1085179&view=rev
Log:
HBASE-3474 HFileOutputFormat to use column family's compression algorithm
Modified:
hbase/trunk/CHANGES.txt
hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java
hbase/trunk/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java
Modified: hbase/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hbase/trunk/CHANGES.txt?rev=1085179&r1=1085178&r2=1085179&view=diff
==============================================================================
--- hbase/trunk/CHANGES.txt (original)
+++ hbase/trunk/CHANGES.txt Thu Mar 24 22:59:45 2011
@@ -97,6 +97,7 @@ Release 0.91.0 - Unreleased
number of maps
HBASE-3673 Reduce HTable Pool Contention Using Concurrent Collections
(Karthick Sankarachary via Stack)
+ HBASE-3474 HFileOutputFormat to use column family's compression algorithm
TASK
HBASE-3559 Move report of split to master OFF the heartbeat channel
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java?rev=1085179&r1=1085178&r2=1085179&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java Thu Mar 24 22:59:45 2011
@@ -1216,6 +1216,10 @@ public class HFile {
return this.comparator;
}
+ public Compression.Algorithm getCompressionAlgorithm() {
+ return this.compressAlgo;
+ }
+
/**
* @return index size
*/
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java?rev=1085179&r1=1085178&r2=1085179&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java Thu Mar 24 22:59:45 2011
@@ -20,9 +20,13 @@
package org.apache.hadoop.hbase.mapreduce;
import java.io.IOException;
+import java.io.UnsupportedEncodingException;
import java.net.URI;
import java.net.URISyntaxException;
+import java.net.URLDecoder;
+import java.net.URLEncoder;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
@@ -32,7 +36,9 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
@@ -64,6 +70,7 @@ import org.apache.commons.logging.LogFac
*/
public class HFileOutputFormat extends FileOutputFormat<ImmutableBytesWritable, KeyValue> {
static Log LOG = LogFactory.getLog(HFileOutputFormat.class);
+ static final String COMPRESSION_CONF_KEY = "hbase.hfileoutputformat.families.compression";
public RecordWriter<ImmutableBytesWritable, KeyValue> getRecordWriter(final TaskAttemptContext context)
throws IOException, InterruptedException {
@@ -78,8 +85,11 @@ public class HFileOutputFormat extends F
final int blocksize = conf.getInt("hfile.min.blocksize.size",
HFile.DEFAULT_BLOCKSIZE);
// Invented config. Add to hbase-*.xml if other than default compression.
- final String compression = conf.get("hfile.compression",
- Compression.Algorithm.NONE.getName());
+ final String defaultCompression = conf.get("hfile.compression",
+ Compression.Algorithm.NONE.getName());
+
+ // create a map from column family to the compression algorithm
+ final Map<byte[], String> compressionMap = createFamilyCompressionMap(conf);
return new RecordWriter<ImmutableBytesWritable, KeyValue>() {
// Map of families to writers and how much has been output on the writer.
@@ -153,6 +163,8 @@ public class HFileOutputFormat extends F
private WriterLength getNewWriter(byte[] family) throws IOException {
WriterLength wl = new WriterLength();
Path familydir = new Path(outputdir, Bytes.toString(family));
+ String compression = compressionMap.get(family);
+ compression = compression == null ? defaultCompression : compression;
wl.writer = new HFile.Writer(fs,
StoreFile.getUniqueFile(fs, familydir), blocksize,
compression, KeyValue.KEY_COMPARATOR);
@@ -300,7 +312,69 @@ public class HFileOutputFormat extends F
DistributedCache.addCacheFile(cacheUri, conf);
DistributedCache.createSymlink(conf);
+ // Set compression algorithms based on column families
+ configureCompression(table, conf);
+
LOG.info("Incremental table output configured.");
}
+ /**
+ * Run inside the task to deserialize column family to compression algorithm
+ * map from the
+ * configuration.
+ *
+ * Package-private for unit tests only.
+ *
+ * @return a map from column family to the name of the configured compression
+ * algorithm
+ */
+ static Map<byte[], String> createFamilyCompressionMap(Configuration conf) {
+ Map<byte[], String> compressionMap = new TreeMap<byte[], String>(Bytes.BYTES_COMPARATOR);
+ String compressionConf = conf.get(COMPRESSION_CONF_KEY, "");
+ for (String familyConf : compressionConf.split("&")) {
+ String[] familySplit = familyConf.split("=");
+ if (familySplit.length != 2) {
+ continue;
+ }
+
+ try {
+ compressionMap.put(URLDecoder.decode(familySplit[0], "UTF-8").getBytes(),
+ URLDecoder.decode(familySplit[1], "UTF-8"));
+ } catch (UnsupportedEncodingException e) {
+ // will not happen with UTF-8 encoding
+ throw new AssertionError(e);
+ }
+ }
+ return compressionMap;
+ }
+
+ /**
+ * Serialize column family to compression algorithm map to configuration.
+ * Invoked while configuring the MR job for incremental load.
+ *
+ * Package-private for unit tests only.
+ *
+ * @throws IOException
+ * on failure to read column family descriptors
+ */
+ static void configureCompression(HTable table, Configuration conf) throws IOException {
+ StringBuilder compressionConfigValue = new StringBuilder();
+ HTableDescriptor tableDescriptor = table.getTableDescriptor();
+ if(tableDescriptor == null){
+ // could happen with mock table instance
+ return;
+ }
+ Collection<HColumnDescriptor> families = tableDescriptor.getFamilies();
+ int i = 0;
+ for (HColumnDescriptor familyDescriptor : families) {
+ if (i++ > 0) {
+ compressionConfigValue.append('&');
+ }
+ compressionConfigValue.append(URLEncoder.encode(familyDescriptor.getNameAsString(), "UTF-8"));
+ compressionConfigValue.append('=');
+ compressionConfigValue.append(URLEncoder.encode(familyDescriptor.getCompression().getName(), "UTF-8"));
+ }
+ // Get rid of the last ampersand
+ conf.set(COMPRESSION_CONF_KEY, compressionConfigValue.toString());
+ }
}
Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java?rev=1085179&r1=1085178&r2=1085179&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java (original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java Thu Mar 24 22:59:45 2011
@@ -23,9 +23,14 @@ import static org.junit.Assert.assertEqu
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotSame;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
import java.io.IOException;
import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
import java.util.Random;
import org.apache.commons.logging.Log;
@@ -36,7 +41,9 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.PerformanceEvaluation;
import org.apache.hadoop.hbase.client.HBaseAdmin;
@@ -45,6 +52,10 @@ import org.apache.hadoop.hbase.client.Re
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.io.hfile.Compression;
+import org.apache.hadoop.hbase.io.hfile.Compression.Algorithm;
+import org.apache.hadoop.hbase.io.hfile.HFile;
+import org.apache.hadoop.hbase.io.hfile.HFile.Reader;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.io.NullWritable;
@@ -58,6 +69,8 @@ import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
+import com.google.common.collect.Lists;
+
/**
* Simple test for {@link KeyValueSortReducer} and {@link HFileOutputFormat}.
* Sets up and runs a mapreduce job that writes hfile output.
@@ -232,18 +245,11 @@ public class TestHFileOutputFormat {
public void testJobConfiguration() throws Exception {
Job job = new Job();
HTable table = Mockito.mock(HTable.class);
- byte[][] mockKeys = new byte[][] {
- HConstants.EMPTY_BYTE_ARRAY,
- Bytes.toBytes("aaa"),
- Bytes.toBytes("ggg"),
- Bytes.toBytes("zzz")
- };
- Mockito.doReturn(mockKeys).when(table).getStartKeys();
-
+ setupMockStartKeys(table);
HFileOutputFormat.configureIncrementalLoad(job, table);
assertEquals(job.getNumReduceTasks(), 4);
}
-
+
private byte [][] generateRandomStartKeys(int numKeys) {
Random random = new Random();
byte[][] ret = new byte[numKeys][];
@@ -372,6 +378,205 @@ public class TestHFileOutputFormat {
assertTrue(job.waitForCompletion(true));
}
+ /**
+ * Test for
+ * {@link HFileOutputFormat#createFamilyCompressionMap(Configuration)}. Tests
+ * that the compression map is correctly deserialized from configuration
+ *
+ * @throws IOException
+ */
+ @Test
+ public void testCreateFamilyCompressionMap() throws IOException {
+ for (int numCfs = 0; numCfs <= 3; numCfs++) {
+ Configuration conf = new Configuration(this.util.getConfiguration());
+ Map<String, Compression.Algorithm> familyToCompression = getMockColumnFamilies(numCfs);
+ HTable table = Mockito.mock(HTable.class);
+ setupMockColumnFamilies(table, familyToCompression);
+ HFileOutputFormat.configureCompression(table, conf);
+
+ // read back family specific compression setting from the configuration
+ Map<byte[], String> retrievedFamilyToCompressionMap = HFileOutputFormat.createFamilyCompressionMap(conf);
+
+ // test that we have a value for all column families that matches with the
+ // used mock values
+ for (Entry<String, Algorithm> entry : familyToCompression.entrySet()) {
+ assertEquals("Compression configuration incorrect for column family:" + entry.getKey(), entry.getValue()
+ .getName(), retrievedFamilyToCompressionMap.get(entry.getKey().getBytes()));
+ }
+ }
+ }
+
+ private void setupMockColumnFamilies(HTable table,
+ Map<String, Compression.Algorithm> familyToCompression) throws IOException
+ {
+ HTableDescriptor mockTableDescriptor = new HTableDescriptor(TABLE_NAME);
+ for (Entry<String, Compression.Algorithm> entry : familyToCompression.entrySet()) {
+ mockTableDescriptor.addFamily(new HColumnDescriptor(entry.getKey().getBytes(), 1, entry.getValue().getName(),
+ false, false, 0, "none"));
+ }
+ Mockito.doReturn(mockTableDescriptor).when(table).getTableDescriptor();
+ }
+
+ private void setupMockStartKeys(HTable table) throws IOException {
+ byte[][] mockKeys = new byte[][] {
+ HConstants.EMPTY_BYTE_ARRAY,
+ Bytes.toBytes("aaa"),
+ Bytes.toBytes("ggg"),
+ Bytes.toBytes("zzz")
+ };
+ Mockito.doReturn(mockKeys).when(table).getStartKeys();
+ }
+
+ /**
+ * @return a map from column family names to compression algorithms for
+ * testing column family compression. Column family names have special characters
+ */
+ private Map<String, Compression.Algorithm> getMockColumnFamilies(int numCfs) {
+ Map<String, Compression.Algorithm> familyToCompression = new HashMap<String, Compression.Algorithm>();
+ // use column family names having special characters
+ if (numCfs-- > 0) {
+ familyToCompression.put("Family1!@#!@#&", Compression.Algorithm.LZO);
+ }
+ if (numCfs-- > 0) {
+ familyToCompression.put("Family2=asdads&!AASD", Compression.Algorithm.GZ);
+ }
+ if (numCfs-- > 0) {
+ familyToCompression.put("Family3", Compression.Algorithm.NONE);
+ }
+ return familyToCompression;
+ }
+
+ /**
+ * Test that {@link HFileOutputFormat} RecordWriter uses compression settings
+ * from the column family descriptor
+ */
+ @Test
+ public void testColumnFamilyCompression()
+ throws IOException, InterruptedException {
+ Configuration conf = new Configuration(this.util.getConfiguration());
+ RecordWriter<ImmutableBytesWritable, KeyValue> writer = null;
+ TaskAttemptContext context = null;
+ Path dir =
+ HBaseTestingUtility.getTestDir("testColumnFamilyCompression");
+
+ HTable table = Mockito.mock(HTable.class);
+
+ Map<String, Compression.Algorithm> configuredCompression =
+ new HashMap<String, Compression.Algorithm>();
+ Compression.Algorithm[] supportedAlgos = getSupportedCompressionAlgorithms();
+
+ int familyIndex = 0;
+ for (byte[] family : FAMILIES) {
+ configuredCompression.put(Bytes.toString(family),
+ supportedAlgos[familyIndex++ % supportedAlgos.length]);
+ }
+ setupMockColumnFamilies(table, configuredCompression);
+
+ // set up the table to return some mock keys
+ setupMockStartKeys(table);
+
+ try {
+ // partial map red setup to get an operational writer for testing
+ Job job = new Job(conf, "testLocalMRIncrementalLoad");
+ setupRandomGeneratorMapper(job);
+ HFileOutputFormat.configureIncrementalLoad(job, table);
+ FileOutputFormat.setOutputPath(job, dir);
+ context = new TaskAttemptContext(job.getConfiguration(),
+ new TaskAttemptID());
+ HFileOutputFormat hof = new HFileOutputFormat();
+ writer = hof.getRecordWriter(context);
+
+ // write out random rows
+ writeRandomKeyValues(writer, context, ROWSPERSPLIT);
+ writer.close(context);
+
+ // Make sure that a directory was created for every CF
+ FileSystem fileSystem = dir.getFileSystem(conf);
+
+ // commit so that the filesystem has one directory per column family
+ hof.getOutputCommitter(context).commitTask(context);
+ for (byte[] family : FAMILIES) {
+ String familyStr = new String(family);
+ boolean found = false;
+ for (FileStatus f : fileSystem.listStatus(dir)) {
+
+ if (Bytes.toString(family).equals(f.getPath().getName())) {
+ // we found a matching directory
+ found = true;
+
+ // verify that the compression on this file matches the configured
+ // compression
+ Path dataFilePath = fileSystem.listStatus(f.getPath())[0].getPath();
+ Reader reader = new HFile.Reader(fileSystem, dataFilePath, null, false, true);
+ reader.loadFileInfo();
+ assertEquals("Incorrect compression used for column family " + familyStr
+ + "(reader: " + reader + ")",
+ configuredCompression.get(familyStr), reader.getCompressionAlgorithm());
+ break;
+ }
+ }
+
+ if (!found) {
+ fail("HFile for column family " + familyStr + " not found");
+ }
+ }
+
+ } finally {
+ dir.getFileSystem(conf).delete(dir, true);
+ }
+ }
+
+
+ /**
+ * @return
+ */
+ private Compression.Algorithm[] getSupportedCompressionAlgorithms() {
+ String[] allAlgos = HFile.getSupportedCompressionAlgorithms();
+ List<Compression.Algorithm> supportedAlgos = Lists.newArrayList();
+
+ for (String algoName : allAlgos) {
+ try {
+ Compression.Algorithm algo = Compression.getCompressionAlgorithmByName(algoName);
+ algo.getCompressor();
+ supportedAlgos.add(algo);
+ }catch (Exception e) {
+ // this algo is not available
+ }
+ }
+
+ return supportedAlgos.toArray(new Compression.Algorithm[0]);
+ }
+
+
+ /**
+ * Write random values to the writer assuming a table created using
+ * {@link #FAMILIES} as column family descriptors
+ */
+ private void writeRandomKeyValues(RecordWriter<ImmutableBytesWritable, KeyValue> writer, TaskAttemptContext context,
+ int numRows)
+ throws IOException, InterruptedException {
+ byte keyBytes[] = new byte[Bytes.SIZEOF_INT];
+ int valLength = 10;
+ byte valBytes[] = new byte[valLength];
+
+ int taskId = context.getTaskAttemptID().getTaskID().getId();
+ assert taskId < Byte.MAX_VALUE : "Unit tests dont support > 127 tasks!";
+
+ Random random = new Random();
+ for (int i = 0; i < numRows; i++) {
+
+ Bytes.putInt(keyBytes, 0, i);
+ random.nextBytes(valBytes);
+ ImmutableBytesWritable key = new ImmutableBytesWritable(keyBytes);
+
+ for (byte[] family : TestHFileOutputFormat.FAMILIES) {
+ KeyValue kv = new KeyValue(keyBytes, family,
+ PerformanceEvaluation.QUALIFIER_NAME, valBytes);
+ writer.write(key, kv);
+ }
+ }
+ }
+
public static void main(String args[]) throws Exception {
new TestHFileOutputFormat().manualTest(args);
}