You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ns...@apache.org on 2011/10/11 04:16:13 UTC

svn commit: r1181518 - in /hbase/branches/0.89/src: main/java/org/apache/hadoop/hbase/io/hfile/HFile.java main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java

Author: nspiegelberg
Date: Tue Oct 11 02:16:13 2011
New Revision: 1181518

URL: http://svn.apache.org/viewvc?rev=1181518&view=rev
Log:
HBASE-3474 HFileOutputFormat to use column family's comp algo

Summary:
import patch for HBASE-3474. This is on 0.92 two weeks ago:

HBASE-3474 HFileOutputFormat to use column family's compression
algorithm
https://svn.apache.org/repos/asf/hbase/trunk@1085179

Test Plan:
- reviewed the code and run the unit tests for this change.

Reviewed By: kannan
Reviewers: kannan, kranganathan
CC: kannan, gqchen, hbase@lists
Revert Plan:
OK

Differential Revision: 235203

Modified:
    hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java
    hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java
    hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java

Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java?rev=1181518&r1=1181517&r2=1181518&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java Tue Oct 11 02:16:13 2011
@@ -1350,6 +1350,10 @@ public class HFile {
       return this.comparator;
     }
 
+    public Compression.Algorithm getCompressionAlgorithm() {
+      return this.compressAlgo;
+    }
+
     /**
      * @return index size
      */

Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java?rev=1181518&r1=1181517&r2=1181518&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java Tue Oct 11 02:16:13 2011
@@ -20,9 +20,13 @@
 package org.apache.hadoop.hbase.mapreduce;
 
 import java.io.IOException;
+import java.io.UnsupportedEncodingException;
 import java.net.URI;
 import java.net.URISyntaxException;
+import java.net.URLDecoder;
+import java.net.URLEncoder;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
@@ -32,7 +36,9 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.filecache.DistributedCache;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Put;
@@ -63,6 +69,7 @@ import org.apache.commons.logging.LogFac
  */
 public class HFileOutputFormat extends FileOutputFormat<ImmutableBytesWritable, KeyValue> {
   static Log LOG = LogFactory.getLog(HFileOutputFormat.class);
+  static final String COMPRESSION_CONF_KEY = "hbase.hfileoutputformat.families.compression";
 
   public RecordWriter<ImmutableBytesWritable, KeyValue> getRecordWriter(final TaskAttemptContext context)
   throws IOException, InterruptedException {
@@ -77,8 +84,12 @@ public class HFileOutputFormat extends F
     final int blocksize = conf.getInt("hfile.min.blocksize.size",
         HFile.DEFAULT_BLOCKSIZE);
     // Invented config.  Add to hbase-*.xml if other than default compression.
-    final String compression = conf.get("hfile.compression",
-      Compression.Algorithm.NONE.getName());
+    final String defaultCompression = conf.get("hfile.compression",
+        Compression.Algorithm.NONE.getName());
+
+    // create a map from column family to the compression algorithm
+    final Map<byte[], String> compressionMap = createFamilyCompressionMap(conf);
+
     final int bytesPerChecksum = HFile.getBytesPerChecksum(conf, conf);
 
     return new RecordWriter<ImmutableBytesWritable, KeyValue>() {
@@ -153,6 +164,8 @@ public class HFileOutputFormat extends F
       private WriterLength getNewWriter(byte[] family) throws IOException {
         WriterLength wl = new WriterLength();
         Path familydir = new Path(outputdir, Bytes.toString(family));
+        String compression = compressionMap.get(family);
+        compression = compression == null ? defaultCompression : compression;
         wl.writer = new HFile.Writer(fs,
           StoreFile.getUniqueFile(fs, familydir), blocksize,
           bytesPerChecksum, compression, KeyValue.KEY_COMPARATOR);
@@ -300,7 +313,69 @@ public class HFileOutputFormat extends F
     DistributedCache.addCacheFile(cacheUri, conf);
     DistributedCache.createSymlink(conf);
 
+    // Set compression algorithms based on column families
+    configureCompression(table, conf);
+
     LOG.info("Incremental table output configured.");
   }
 
+  /**
+   * Run inside the task to deserialize column family to compression algorithm
+   * map from the
+   * configuration.
+   *
+   * Package-private for unit tests only.
+   *
+   * @return a map from column family to the name of the configured compression
+   *         algorithm
+   */
+  static Map<byte[], String> createFamilyCompressionMap(Configuration conf) {
+    Map<byte[], String> compressionMap = new TreeMap<byte[], String>(Bytes.BYTES_COMPARATOR);
+    String compressionConf = conf.get(COMPRESSION_CONF_KEY, "");
+    for (String familyConf : compressionConf.split("&")) {
+      String[] familySplit = familyConf.split("=");
+      if (familySplit.length != 2) {
+        continue;
+      }
+
+      try {
+        compressionMap.put(URLDecoder.decode(familySplit[0], "UTF-8").getBytes(),
+            URLDecoder.decode(familySplit[1], "UTF-8"));
+      } catch (UnsupportedEncodingException e) {
+        // will not happen with UTF-8 encoding
+        throw new AssertionError(e);
+      }
+    }
+    return compressionMap;
+  }
+
+  /**
+   * Serialize column family to compression algorithm map to configuration.
+   * Invoked while configuring the MR job for incremental load.
+   *
+   * Package-private for unit tests only.
+   *
+   * @throws IOException
+   *           on failure to read column family descriptors
+   */
+  static void configureCompression(HTable table, Configuration conf) throws IOException {
+    StringBuilder compressionConfigValue = new StringBuilder();
+    HTableDescriptor tableDescriptor = table.getTableDescriptor();
+    if(tableDescriptor == null){
+      // could happen with mock table instance
+      return;
+    }
+    Collection<HColumnDescriptor> families = tableDescriptor.getFamilies();
+    int i = 0;
+    for (HColumnDescriptor familyDescriptor : families) {
+      if (i++ > 0) {
+        compressionConfigValue.append('&');
+      }
+      compressionConfigValue.append(URLEncoder.encode(familyDescriptor.getNameAsString(), "UTF-8"));
+      compressionConfigValue.append('=');
+      compressionConfigValue.append(URLEncoder.encode(familyDescriptor.getCompression().getName(), "UTF-8"));
+    }
+    // Get rid of the last ampersand
+    conf.set(COMPRESSION_CONF_KEY, compressionConfigValue.toString());
+  }
 }

Modified: hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java?rev=1181518&r1=1181517&r2=1181518&view=diff
==============================================================================
--- hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java (original)
+++ hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java Tue Oct 11 02:16:13 2011
@@ -23,9 +23,14 @@ import static org.junit.Assert.assertEqu
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotSame;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 import java.io.IOException;
 import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Random;
 
 import org.apache.commons.logging.Log;
@@ -36,7 +41,9 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.PerformanceEvaluation;
 import org.apache.hadoop.hbase.client.HBaseAdmin;
@@ -45,6 +52,10 @@ import org.apache.hadoop.hbase.client.Re
 import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.io.hfile.Compression;
+import org.apache.hadoop.hbase.io.hfile.Compression.Algorithm;
+import org.apache.hadoop.hbase.io.hfile.HFile;
+import org.apache.hadoop.hbase.io.hfile.HFile.Reader;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapreduce.Job;
@@ -57,6 +68,8 @@ import org.junit.Before;
 import org.junit.Test;
 import org.mockito.Mockito;
 
+import com.google.common.collect.Lists;
+
 /**
  * Simple test for {@link KeyValueSortReducer} and {@link HFileOutputFormat}.
  * Sets up and runs a mapreduce job that writes hfile output.
@@ -232,14 +245,7 @@ public class TestHFileOutputFormat  {
   public void testJobConfiguration() throws Exception {
     Job job = new Job();
     HTable table = Mockito.mock(HTable.class);
-    byte[][] mockKeys = new byte[][] {
-        HConstants.EMPTY_BYTE_ARRAY,
-        Bytes.toBytes("aaa"),
-        Bytes.toBytes("ggg"),
-        Bytes.toBytes("zzz")
-    };
-    Mockito.doReturn(mockKeys).when(table).getStartKeys();
-
+    setupMockStartKeys(table);
     HFileOutputFormat.configureIncrementalLoad(job, table);
     assertEquals(job.getNumReduceTasks(), 4);
   }
@@ -373,6 +379,205 @@ public class TestHFileOutputFormat  {
     assertTrue(job.waitForCompletion(true));
   }
 
+  /**
+   * Test for
+   * {@link HFileOutputFormat#createFamilyCompressionMap(Configuration)}. Tests
+   * that the compression map is correctly deserialized from configuration
+   *
+   * @throws IOException
+   */
+  @Test
+  public void testCreateFamilyCompressionMap() throws IOException {
+    for (int numCfs = 0; numCfs <= 3; numCfs++) {
+      Configuration conf = new Configuration(this.util.getConfiguration());
+      Map<String, Compression.Algorithm> familyToCompression = getMockColumnFamilies(numCfs);
+      HTable table = Mockito.mock(HTable.class);
+      setupMockColumnFamilies(table, familyToCompression);
+      HFileOutputFormat.configureCompression(table, conf);
+
+      // read back family specific compression setting from the configuration
+      Map<byte[], String> retrievedFamilyToCompressionMap = HFileOutputFormat.createFamilyCompressionMap(conf);
+
+      // test that we have a value for all column families that matches with the
+      // used mock values
+      for (Entry<String, Algorithm> entry : familyToCompression.entrySet()) {
+        assertEquals("Compression configuration incorrect for column family:" + entry.getKey(), entry.getValue()
+                     .getName(), retrievedFamilyToCompressionMap.get(entry.getKey().getBytes()));
+      }
+    }
+  }
+
+  private void setupMockColumnFamilies(HTable table,
+    Map<String, Compression.Algorithm> familyToCompression) throws IOException
+  {
+    HTableDescriptor mockTableDescriptor = new HTableDescriptor(TABLE_NAME);
+    for (Entry<String, Compression.Algorithm> entry : familyToCompression.entrySet()) {
+      mockTableDescriptor.addFamily(new HColumnDescriptor(entry.getKey().getBytes(), 1, entry.getValue().getName(),
+                                                          false, false, 0, "none"));
+    }
+    Mockito.doReturn(mockTableDescriptor).when(table).getTableDescriptor();
+  }
+
+  private void setupMockStartKeys(HTable table) throws IOException {
+    byte[][] mockKeys = new byte[][] {
+        HConstants.EMPTY_BYTE_ARRAY,
+        Bytes.toBytes("aaa"),
+        Bytes.toBytes("ggg"),
+        Bytes.toBytes("zzz")
+    };
+    Mockito.doReturn(mockKeys).when(table).getStartKeys();
+  }
+
+  /**
+   * @return a map from column family names to compression algorithms for
+   *         testing column family compression. Column family names have special characters
+   */
+  private Map<String, Compression.Algorithm> getMockColumnFamilies(int numCfs) {
+    Map<String, Compression.Algorithm> familyToCompression = new HashMap<String, Compression.Algorithm>();
+    // use column family names having special characters
+    if (numCfs-- > 0) {
+      familyToCompression.put("Family1!@#!@#&", Compression.Algorithm.LZO);
+    }
+    if (numCfs-- > 0) {
+      familyToCompression.put("Family2=asdads&!AASD", Compression.Algorithm.GZ);
+    }
+    if (numCfs-- > 0) {
+      familyToCompression.put("Family3", Compression.Algorithm.NONE);
+    }
+    return familyToCompression;
+  }
+
+  /**
+   * Test that {@link HFileOutputFormat} RecordWriter uses compression settings
+   * from the column family descriptor
+   */
+  @Test
+  public void testColumnFamilyCompression()
+      throws IOException, InterruptedException {
+    Configuration conf = new Configuration(this.util.getConfiguration());
+    RecordWriter<ImmutableBytesWritable, KeyValue> writer = null;
+    TaskAttemptContext context = null;
+    Path dir =
+        HBaseTestingUtility.getTestDir("testColumnFamilyCompression");
+
+    HTable table = Mockito.mock(HTable.class);
+
+    Map<String, Compression.Algorithm> configuredCompression =
+      new HashMap<String, Compression.Algorithm>();
+    Compression.Algorithm[] supportedAlgos = getSupportedCompressionAlgorithms();
+
+    int familyIndex = 0;
+    for (byte[] family : FAMILIES) {
+      configuredCompression.put(Bytes.toString(family),
+                                supportedAlgos[familyIndex++ % supportedAlgos.length]);
+    }
+    setupMockColumnFamilies(table, configuredCompression);
+
+    // set up the table to return some mock keys
+    setupMockStartKeys(table);
+
+    try {
+      // partial map red setup to get an operational writer for testing
+      Job job = new Job(conf, "testLocalMRIncrementalLoad");
+      setupRandomGeneratorMapper(job);
+      HFileOutputFormat.configureIncrementalLoad(job, table);
+      FileOutputFormat.setOutputPath(job, dir);
+      context = new TaskAttemptContext(job.getConfiguration(),
+          new TaskAttemptID());
+      HFileOutputFormat hof = new HFileOutputFormat();
+      writer = hof.getRecordWriter(context);
+
+      // write out random rows
+      writeRandomKeyValues(writer, context, ROWSPERSPLIT);
+      writer.close(context);
+
+      // Make sure that a directory was created for every CF
+      FileSystem fileSystem = dir.getFileSystem(conf);
+
+      // commit so that the filesystem has one directory per column family
+      hof.getOutputCommitter(context).commitTask(context);
+      for (byte[] family : FAMILIES) {
+        String familyStr = new String(family);
+        boolean found = false;
+        for (FileStatus f : fileSystem.listStatus(dir)) {
+
+          if (Bytes.toString(family).equals(f.getPath().getName())) {
+            // we found a matching directory
+            found = true;
+
+            // verify that the compression on this file matches the configured
+            // compression
+            Path dataFilePath = fileSystem.listStatus(f.getPath())[0].getPath();
+            Reader reader = new HFile.Reader(fileSystem, dataFilePath, null, false, true);
+            reader.loadFileInfo();
+            assertEquals("Incorrect compression used for column family " + familyStr
+                         + "(reader: " + reader + ")",
+                         configuredCompression.get(familyStr), reader.getCompressionAlgorithm());
+            break;
+          }
+        }
+
+        if (!found) {
+          fail("HFile for column family " + familyStr + " not found");
+        }
+      }
+
+    } finally {
+      dir.getFileSystem(conf).delete(dir, true);
+    }
+  }
+
+
+  /**
+   * @return
+   */
+  private Compression.Algorithm[] getSupportedCompressionAlgorithms() {
+    String[] allAlgos = HFile.getSupportedCompressionAlgorithms();
+    List<Compression.Algorithm> supportedAlgos = Lists.newArrayList();
+
+    for (String algoName : allAlgos) {
+      try {
+        Compression.Algorithm algo = Compression.getCompressionAlgorithmByName(algoName);
+        algo.getCompressor();
+        supportedAlgos.add(algo);
+      }catch (Exception e) {
+        // this algo is not available
+      }
+    }
+
+    return supportedAlgos.toArray(new Compression.Algorithm[0]);
+  }
+
+
+  /**
+   * Write random values to the writer assuming a table created using
+   * {@link #FAMILIES} as column family descriptors
+   */
+  private void writeRandomKeyValues(RecordWriter<ImmutableBytesWritable, KeyValue> writer, TaskAttemptContext context,
+      int numRows)
+      throws IOException, InterruptedException {
+    byte keyBytes[] = new byte[Bytes.SIZEOF_INT];
+    int valLength = 10;
+    byte valBytes[] = new byte[valLength];
+
+    int taskId = context.getTaskAttemptID().getTaskID().getId();
+    assert taskId < Byte.MAX_VALUE : "Unit tests dont support > 127 tasks!";
+
+    Random random = new Random();
+    for (int i = 0; i < numRows; i++) {
+
+      Bytes.putInt(keyBytes, 0, i);
+      random.nextBytes(valBytes);
+      ImmutableBytesWritable key = new ImmutableBytesWritable(keyBytes);
+
+      for (byte[] family : TestHFileOutputFormat.FAMILIES) {
+        KeyValue kv = new KeyValue(keyBytes, family,
+            PerformanceEvaluation.QUALIFIER_NAME, valBytes);
+        writer.write(key, kv);
+      }
+    }
+  }
+
   public static void main(String args[]) throws Exception {
     new TestHFileOutputFormat().manualTest(args);
   }