You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by ch...@apache.org on 2013/08/23 05:15:20 UTC

git commit: CRUNCH-255: HFileOutputFormatForCrunch should use configuration from table for compression, block encoding, block size...

Updated Branches:
  refs/heads/master 1eae50c1a -> 6b994e3bb


CRUNCH-255: HFileOutputFormatForCrunch should use configuration from table for compression, block encoding, block size...


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/6b994e3b
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/6b994e3b
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/6b994e3b

Branch: refs/heads/master
Commit: 6b994e3bb484235b9d8e62cdf13cf924c993d3fc
Parents: 1eae50c
Author: Chao Shi <ch...@apache.org>
Authored: Wed Aug 21 17:56:11 2013 +0800
Committer: Chao Shi <ch...@apache.org>
Committed: Thu Aug 22 16:17:11 2013 +0800

----------------------------------------------------------------------
 .../apache/crunch/io/hbase/HFileTargetIT.java   | 84 +++++++++++++++++---
 .../io/hbase/HFileOutputFormatForCrunch.java    | 68 +++++++---------
 .../org/apache/crunch/io/hbase/HFileTarget.java | 44 +++++++++-
 .../org/apache/crunch/io/hbase/HFileUtils.java  | 22 ++++-
 4 files changed, 167 insertions(+), 51 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/6b994e3b/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/HFileTargetIT.java
----------------------------------------------------------------------
diff --git a/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/HFileTargetIT.java b/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/HFileTargetIT.java
index 667b5ad..aac9e317 100644
--- a/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/HFileTargetIT.java
+++ b/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/HFileTargetIT.java
@@ -47,8 +47,10 @@ import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+import org.apache.hadoop.hbase.io.hfile.HFile;
 import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
 import org.apache.hadoop.hbase.regionserver.KeyValueHeap;
 import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
@@ -107,9 +109,13 @@ public class HFileTargetIT implements Serializable {
   }
 
   private static HTable createTable(int splits) throws IOException {
+    HColumnDescriptor hcol = new HColumnDescriptor(TEST_FAMILY);
+    return createTable(splits, hcol);
+  }
+
+  private static HTable createTable(int splits, HColumnDescriptor hcol) throws IOException {
     byte[] tableName = Bytes.toBytes("test_table_" + tableCounter);
     HBaseAdmin admin = HBASE_TEST_UTILITY.getHBaseAdmin();
-    HColumnDescriptor hcol = new HColumnDescriptor(TEST_FAMILY);
     HTableDescriptor htable = new HTableDescriptor(tableName);
     htable.addFamily(hcol);
     admin.createTable(htable, Bytes.split(Bytes.toBytes("a"), Bytes.toBytes("z"), splits));
@@ -166,8 +172,8 @@ public class HFileTargetIT implements Serializable {
     PCollection<String> shakespeare = pipeline.read(At.textFile(inputPath, Writables.strings()));
     PCollection<String> words = split(shakespeare, "\\s+");
     PTable<String,Long> wordCounts = words.count();
-    PCollection<KeyValue> wordCountKvs = convertToKeyValues(wordCounts);
-    pipeline.write(wordCountKvs, ToHBase.hfile(outputPath));
+    PCollection<KeyValue> wordCountKeyValues = convertToKeyValues(wordCounts);
+    pipeline.write(wordCountKeyValues, ToHBase.hfile(outputPath));
 
     PipelineResult result = pipeline.run();
     assertTrue(result.succeeded());
@@ -188,9 +194,9 @@ public class HFileTargetIT implements Serializable {
     PCollection<String> shakespeare = pipeline.read(At.textFile(inputPath, Writables.strings()));
     PCollection<String> words = split(shakespeare, "\\s+");
     PTable<String,Long> wordCounts = words.count();
-    PCollection<KeyValue> wordCountKvs = convertToKeyValues(wordCounts);
-    HFileUtils.writeToHFilesForIncrementalLoad(
-        wordCountKvs,
+    PCollection<Put> wordCountPuts = convertToPuts(wordCounts);
+    HFileUtils.writePutsToHFilesForIncrementalLoad(
+        wordCountPuts,
         testTable,
         outputPath);
 
@@ -231,12 +237,12 @@ public class HFileTargetIT implements Serializable {
     PCollection<String> longWords = words.filter(FilterFns.not(SHORT_WORD_FILTER));
     PTable<String, Long> shortWordCounts = shortWords.count();
     PTable<String, Long> longWordCounts = longWords.count();
-    HFileUtils.writeToHFilesForIncrementalLoad(
-        convertToKeyValues(shortWordCounts),
+    HFileUtils.writePutsToHFilesForIncrementalLoad(
+        convertToPuts(shortWordCounts),
         table1,
         outputPath1);
-    HFileUtils.writeToHFilesForIncrementalLoad(
-        convertToKeyValues(longWordCounts),
+    HFileUtils.writePutsToHFilesForIncrementalLoad(
+        convertToPuts(longWordCounts),
         table2,
         outputPath2);
 
@@ -245,11 +251,67 @@ public class HFileTargetIT implements Serializable {
     loader.doBulkLoad(outputPath1, table1);
     loader.doBulkLoad(outputPath2, table2);
 
-    FileSystem fs = FileSystem.get(conf);
     assertEquals(396L, getWordCountFromTable(table1, "of"));
     assertEquals(427L, getWordCountFromTable(table2, "and"));
   }
 
+  @Test
+  public void testHFileUsesFamilyConfig() throws IOException {
+    DataBlockEncoding newBlockEncoding = DataBlockEncoding.PREFIX;
+    assertTrue(newBlockEncoding != DataBlockEncoding.valueOf(HColumnDescriptor.DEFAULT_DATA_BLOCK_ENCODING));
+
+    Configuration conf = HBASE_TEST_UTILITY.getConfiguration();
+    Pipeline pipeline = new MRPipeline(HFileTargetIT.class, conf);
+    Path inputPath = copyResourceFileToHDFS("shakes.txt");
+    Path outputPath = getTempPathOnHDFS("out");
+    HColumnDescriptor hcol = new HColumnDescriptor(TEST_FAMILY);
+    hcol.setDataBlockEncoding(newBlockEncoding);
+    HTable testTable = createTable(10, hcol);
+
+    PCollection<String> shakespeare = pipeline.read(At.textFile(inputPath, Writables.strings()));
+    PCollection<String> words = split(shakespeare, "\\s+");
+    PTable<String,Long> wordCounts = words.count();
+    PCollection<Put> wordCountPuts = convertToPuts(wordCounts);
+    HFileUtils.writePutsToHFilesForIncrementalLoad(
+        wordCountPuts,
+        testTable,
+        outputPath);
+
+    PipelineResult result = pipeline.run();
+    assertTrue(result.succeeded());
+
+    int hfilesCount = 0;
+    FileSystem fs = outputPath.getFileSystem(conf);
+    for (FileStatus e : fs.listStatus(new Path(outputPath, Bytes.toString(TEST_FAMILY)))) {
+      Path f = e.getPath();
+      if (!f.getName().startsWith("part-")) { // filter out "_SUCCESS"
+        continue;
+      }
+      HFile.Reader reader = null;
+      try {
+        reader = HFile.createReader(fs, f, new CacheConfig(conf));
+        assertEquals(DataBlockEncoding.PREFIX, reader.getEncodingOnDisk());
+      } finally {
+        reader.close();
+      }
+      hfilesCount++;
+    }
+    assertTrue(hfilesCount > 0);
+  }
+
+  private PCollection<Put> convertToPuts(PTable<String, Long> in) {
+    return in.parallelDo(new MapFn<Pair<String, Long>, Put>() {
+      @Override
+      public Put map(Pair<String, Long> input) {
+        String w = input.first();
+        long c = input.second();
+        Put p = new Put(Bytes.toBytes(w));
+        p.add(TEST_FAMILY, TEST_QUALIFIER, Bytes.toBytes(c));
+        return p;
+      }
+    }, Writables.writables(Put.class));
+  }
+
   private PCollection<KeyValue> convertToKeyValues(PTable<String, Long> in) {
     return in.parallelDo(new MapFn<Pair<String, Long>, KeyValue>() {
       @Override

http://git-wip-us.apache.org/repos/asf/crunch/blob/6b994e3b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileOutputFormatForCrunch.java
----------------------------------------------------------------------
diff --git a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileOutputFormatForCrunch.java b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileOutputFormatForCrunch.java
index 311d91c..70f10d5 100644
--- a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileOutputFormatForCrunch.java
+++ b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileOutputFormatForCrunch.java
@@ -19,17 +19,18 @@
  */
 package org.apache.crunch.io.hbase;
 
+import com.sun.org.apache.commons.logging.Log;
+import com.sun.org.apache.commons.logging.LogFactory;
+import org.apache.commons.codec.DecoderException;
+import org.apache.commons.codec.binary.Hex;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
-import org.apache.hadoop.hbase.io.hfile.Compression;
 import org.apache.hadoop.hbase.io.hfile.HFile;
-import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder;
 import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoderImpl;
-import org.apache.hadoop.hbase.io.hfile.NoOpDataBlockEncoder;
 import org.apache.hadoop.hbase.regionserver.Store;
 import org.apache.hadoop.hbase.regionserver.StoreFile;
 import org.apache.hadoop.hbase.regionserver.TimeRangeTracker;
@@ -39,6 +40,8 @@ import org.apache.hadoop.mapreduce.RecordWriter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
 import java.io.IOException;
 
 /**
@@ -53,13 +56,10 @@ import java.io.IOException;
  */
 public class HFileOutputFormatForCrunch extends FileOutputFormat<Object, KeyValue> {
 
-  private static final String COMPACTION_EXCLUDE_CONF_KEY =
-      "hbase.mapreduce.hfileoutputformat.compaction.exclude";
-  private static final String DATABLOCK_ENCODING_CONF_KEY =
-      "hbase.mapreduce.hfileoutputformat.datablock.encoding";
-  private static final String BLOCK_SIZE_CONF_KEY =
-      "hbase.mapreduce.hfileoutputformat.blocksize";
-  private static final String COMPRESSION_CONF_KEY = "hbase.hfileoutputformat.families.compression";
+  public static final String HCOLUMN_DESCRIPTOR_KEY = "hbase.hfileoutputformat.column.descriptor";
+  private static final String COMPACTION_EXCLUDE_CONF_KEY = "hbase.mapreduce.hfileoutputformat.compaction.exclude";
+  private static final Log LOG = LogFactory.getLog(HFileOutputFormatForCrunch.class);
+
   private final byte [] now = Bytes.toBytes(System.currentTimeMillis());
   private final TimeRangeTracker trt = new TimeRangeTracker();
 
@@ -68,20 +68,30 @@ public class HFileOutputFormatForCrunch extends FileOutputFormat<Object, KeyValu
     Path outputPath = getDefaultWorkFile(context, "");
     Configuration conf = context.getConfiguration();
     FileSystem fs = outputPath.getFileSystem(conf);
-    int blocksize = conf.getInt(BLOCK_SIZE_CONF_KEY,
-        HFile.DEFAULT_BLOCKSIZE);
-    String compression = conf.get(
-        COMPRESSION_CONF_KEY, Compression.Algorithm.NONE.getName());
+
     final boolean compactionExclude = conf.getBoolean(
         COMPACTION_EXCLUDE_CONF_KEY, false);
-    HFileDataBlockEncoder encoder = getDataBlockEncoder(
-        conf.get(DATABLOCK_ENCODING_CONF_KEY));
+
+    String hcolStr = conf.get(HCOLUMN_DESCRIPTOR_KEY);
+    if (hcolStr == null) {
+      throw new AssertionError(HCOLUMN_DESCRIPTOR_KEY + " is not set in conf");
+    }
+    byte[] hcolBytes;
+    try {
+      hcolBytes = Hex.decodeHex(hcolStr.toCharArray());
+    } catch (DecoderException e) {
+      throw new AssertionError("Bad hex string: " + hcolStr);
+    }
+    HColumnDescriptor hcol = new HColumnDescriptor();
+    hcol.readFields(new DataInputStream(new ByteArrayInputStream(hcolBytes)));
+    LOG.info("Output path: " + outputPath);
+    LOG.info("HColumnDescriptor: " + hcol.toString());
     final HFile.Writer writer = HFile.getWriterFactoryNoCache(conf)
         .withPath(fs, outputPath)
-        .withBlockSize(blocksize)
-        .withCompression(compression)
+        .withBlockSize(hcol.getBlocksize())
+        .withCompression(hcol.getCompression())
         .withComparator(KeyValue.KEY_COMPARATOR)
-        .withDataBlockEncoder(encoder)
+        .withDataBlockEncoder(new HFileDataBlockEncoderImpl(hcol.getDataBlockEncoding()))
         .withChecksumType(Store.getChecksumType(conf))
         .withBytesPerChecksum(Store.getBytesPerChecksum(conf))
         .create();
@@ -112,22 +122,4 @@ public class HFileOutputFormatForCrunch extends FileOutputFormat<Object, KeyValu
       }
     };
   }
-
-  private HFileDataBlockEncoder getDataBlockEncoder(String dataBlockEncodingStr) {
-    final HFileDataBlockEncoder encoder;
-    if (dataBlockEncodingStr == null) {
-      encoder = NoOpDataBlockEncoder.INSTANCE;
-    } else {
-      try {
-        encoder = new HFileDataBlockEncoderImpl(DataBlockEncoding
-            .valueOf(dataBlockEncodingStr));
-      } catch (IllegalArgumentException ex) {
-        throw new RuntimeException(
-            "Invalid data block encoding type configured for the param "
-                + DATABLOCK_ENCODING_CONF_KEY + " : "
-                + dataBlockEncodingStr);
-      }
-    }
-    return encoder;
-  }
 }

http://git-wip-us.apache.org/repos/asf/crunch/blob/6b994e3b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileTarget.java
----------------------------------------------------------------------
diff --git a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileTarget.java b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileTarget.java
index 0038394..bc51b2c 100644
--- a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileTarget.java
+++ b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileTarget.java
@@ -17,20 +17,62 @@
  */
 package org.apache.crunch.io.hbase;
 
+import com.google.common.base.Preconditions;
+import org.apache.commons.codec.binary.Hex;
+import org.apache.crunch.io.CrunchOutputs;
+import org.apache.crunch.io.FormatBundle;
 import org.apache.crunch.io.SequentialFileNamingScheme;
 import org.apache.crunch.io.impl.FileTargetImpl;
+import org.apache.crunch.types.PType;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 
 public class HFileTarget extends FileTargetImpl {
 
-  // TODO(chaoshi): configurable compression algorithm, block size, data block encoder for hfile...
+  private static final HColumnDescriptor DEFAULT_COLUMN_DESCRIPTOR = new HColumnDescriptor();
+  private final HColumnDescriptor hcol;
 
   public HFileTarget(String path) {
     this(new Path(path));
   }
 
   public HFileTarget(Path path) {
+    this(path, DEFAULT_COLUMN_DESCRIPTOR);
+  }
+
+  public HFileTarget(Path path, HColumnDescriptor hcol) {
     super(path, HFileOutputFormatForCrunch.class, new SequentialFileNamingScheme());
+    this.hcol = Preconditions.checkNotNull(hcol);
+  }
+
+  @Override
+  protected void configureForMapReduce(
+      Job job,
+      Class keyClass,
+      Class valueClass,
+      Class outputFormatClass,
+      Path outputPath,
+      String name) {
+    try {
+      FileOutputFormat.setOutputPath(job, outputPath);
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+
+    String hcolStr = Hex.encodeHexString(WritableUtils.toByteArray(hcol));
+    if (name == null) {
+      job.setOutputFormatClass(HFileOutputFormatForCrunch.class);
+      job.setOutputKeyClass(keyClass);
+      job.setOutputValueClass(valueClass);
+      job.getConfiguration().set(HFileOutputFormatForCrunch.HCOLUMN_DESCRIPTOR_KEY, hcolStr);
+    } else {
+      FormatBundle<HFileOutputFormatForCrunch> bundle = FormatBundle.forOutput(HFileOutputFormatForCrunch.class);
+      bundle.set(HFileOutputFormatForCrunch.HCOLUMN_DESCRIPTOR_KEY, hcolStr);
+      CrunchOutputs.addNamedOutput(job, name, bundle, keyClass, valueClass);
+    }
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/crunch/blob/6b994e3b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileUtils.java
----------------------------------------------------------------------
diff --git a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileUtils.java b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileUtils.java
index 5e07a67..d026555 100644
--- a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileUtils.java
+++ b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileUtils.java
@@ -21,6 +21,8 @@ import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 import com.sun.org.apache.commons.logging.Log;
 import com.sun.org.apache.commons.logging.LogFactory;
+import org.apache.crunch.DoFn;
+import org.apache.crunch.Emitter;
 import org.apache.crunch.FilterFn;
 import org.apache.crunch.GroupingOptions;
 import org.apache.crunch.MapFn;
@@ -34,6 +36,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.RawComparator;
@@ -104,10 +107,27 @@ public final class HFileUtils {
       byte[] family = f.getName();
       PCollection<KeyValue> sorted = sortAndPartition(
           kvs.filter(new FilterByFamilyFn(family)), table);
-      sorted.write(new HFileTarget(new Path(outputPath, Bytes.toString(family))));
+      sorted.write(new HFileTarget(new Path(outputPath, Bytes.toString(family)), f));
     }
   }
 
+  public static void writePutsToHFilesForIncrementalLoad(
+      PCollection<Put> puts,
+      HTable table,
+      Path outputPath) throws IOException {
+    PCollection<KeyValue> kvs = puts.parallelDo("ConvertPutToKeyValue", new DoFn<Put, KeyValue>() {
+      @Override
+      public void process(Put input, Emitter<KeyValue> emitter) {
+        for (List<KeyValue> keyValues : input.getFamilyMap().values()) {
+          for (KeyValue keyValue : keyValues) {
+            emitter.emit(keyValue);
+          }
+        }
+      }
+    }, writables(KeyValue.class));
+    writeToHFilesForIncrementalLoad(kvs, table, outputPath);
+  }
+
   public static PCollection<KeyValue> sortAndPartition(PCollection<KeyValue> kvs, HTable table) throws IOException {
     Configuration conf = kvs.getPipeline().getConfiguration();
     PTable<KeyValue, Void> t = kvs.parallelDo(new MapFn<KeyValue, Pair<KeyValue, Void>>() {