You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by ch...@apache.org on 2013/08/23 05:15:20 UTC
git commit: CRUNCH-255: HFileOutputFormatForCrunch should use
configuration from table for compression, block encoding, block size...
Updated Branches:
refs/heads/master 1eae50c1a -> 6b994e3bb
CRUNCH-255: HFileOutputFormatForCrunch should use configuration from table for compression, block encoding, block size...
Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/6b994e3b
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/6b994e3b
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/6b994e3b
Branch: refs/heads/master
Commit: 6b994e3bb484235b9d8e62cdf13cf924c993d3fc
Parents: 1eae50c
Author: Chao Shi <ch...@apache.org>
Authored: Wed Aug 21 17:56:11 2013 +0800
Committer: Chao Shi <ch...@apache.org>
Committed: Thu Aug 22 16:17:11 2013 +0800
----------------------------------------------------------------------
.../apache/crunch/io/hbase/HFileTargetIT.java | 84 +++++++++++++++++---
.../io/hbase/HFileOutputFormatForCrunch.java | 68 +++++++---------
.../org/apache/crunch/io/hbase/HFileTarget.java | 44 +++++++++-
.../org/apache/crunch/io/hbase/HFileUtils.java | 22 ++++-
4 files changed, 167 insertions(+), 51 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/crunch/blob/6b994e3b/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/HFileTargetIT.java
----------------------------------------------------------------------
diff --git a/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/HFileTargetIT.java b/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/HFileTargetIT.java
index 667b5ad..aac9e317 100644
--- a/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/HFileTargetIT.java
+++ b/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/HFileTargetIT.java
@@ -47,8 +47,10 @@ import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
import org.apache.hadoop.hbase.regionserver.KeyValueHeap;
import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
@@ -107,9 +109,13 @@ public class HFileTargetIT implements Serializable {
}
private static HTable createTable(int splits) throws IOException {
+ HColumnDescriptor hcol = new HColumnDescriptor(TEST_FAMILY);
+ return createTable(splits, hcol);
+ }
+
+ private static HTable createTable(int splits, HColumnDescriptor hcol) throws IOException {
byte[] tableName = Bytes.toBytes("test_table_" + tableCounter);
HBaseAdmin admin = HBASE_TEST_UTILITY.getHBaseAdmin();
- HColumnDescriptor hcol = new HColumnDescriptor(TEST_FAMILY);
HTableDescriptor htable = new HTableDescriptor(tableName);
htable.addFamily(hcol);
admin.createTable(htable, Bytes.split(Bytes.toBytes("a"), Bytes.toBytes("z"), splits));
@@ -166,8 +172,8 @@ public class HFileTargetIT implements Serializable {
PCollection<String> shakespeare = pipeline.read(At.textFile(inputPath, Writables.strings()));
PCollection<String> words = split(shakespeare, "\\s+");
PTable<String,Long> wordCounts = words.count();
- PCollection<KeyValue> wordCountKvs = convertToKeyValues(wordCounts);
- pipeline.write(wordCountKvs, ToHBase.hfile(outputPath));
+ PCollection<KeyValue> wordCountKeyValues = convertToKeyValues(wordCounts);
+ pipeline.write(wordCountKeyValues, ToHBase.hfile(outputPath));
PipelineResult result = pipeline.run();
assertTrue(result.succeeded());
@@ -188,9 +194,9 @@ public class HFileTargetIT implements Serializable {
PCollection<String> shakespeare = pipeline.read(At.textFile(inputPath, Writables.strings()));
PCollection<String> words = split(shakespeare, "\\s+");
PTable<String,Long> wordCounts = words.count();
- PCollection<KeyValue> wordCountKvs = convertToKeyValues(wordCounts);
- HFileUtils.writeToHFilesForIncrementalLoad(
- wordCountKvs,
+ PCollection<Put> wordCountPuts = convertToPuts(wordCounts);
+ HFileUtils.writePutsToHFilesForIncrementalLoad(
+ wordCountPuts,
testTable,
outputPath);
@@ -231,12 +237,12 @@ public class HFileTargetIT implements Serializable {
PCollection<String> longWords = words.filter(FilterFns.not(SHORT_WORD_FILTER));
PTable<String, Long> shortWordCounts = shortWords.count();
PTable<String, Long> longWordCounts = longWords.count();
- HFileUtils.writeToHFilesForIncrementalLoad(
- convertToKeyValues(shortWordCounts),
+ HFileUtils.writePutsToHFilesForIncrementalLoad(
+ convertToPuts(shortWordCounts),
table1,
outputPath1);
- HFileUtils.writeToHFilesForIncrementalLoad(
- convertToKeyValues(longWordCounts),
+ HFileUtils.writePutsToHFilesForIncrementalLoad(
+ convertToPuts(longWordCounts),
table2,
outputPath2);
@@ -245,11 +251,67 @@ public class HFileTargetIT implements Serializable {
loader.doBulkLoad(outputPath1, table1);
loader.doBulkLoad(outputPath2, table2);
- FileSystem fs = FileSystem.get(conf);
assertEquals(396L, getWordCountFromTable(table1, "of"));
assertEquals(427L, getWordCountFromTable(table2, "and"));
}
+ @Test
+ public void testHFileUsesFamilyConfig() throws IOException {
+ DataBlockEncoding newBlockEncoding = DataBlockEncoding.PREFIX;
+ assertTrue(newBlockEncoding != DataBlockEncoding.valueOf(HColumnDescriptor.DEFAULT_DATA_BLOCK_ENCODING));
+
+ Configuration conf = HBASE_TEST_UTILITY.getConfiguration();
+ Pipeline pipeline = new MRPipeline(HFileTargetIT.class, conf);
+ Path inputPath = copyResourceFileToHDFS("shakes.txt");
+ Path outputPath = getTempPathOnHDFS("out");
+ HColumnDescriptor hcol = new HColumnDescriptor(TEST_FAMILY);
+ hcol.setDataBlockEncoding(newBlockEncoding);
+ HTable testTable = createTable(10, hcol);
+
+ PCollection<String> shakespeare = pipeline.read(At.textFile(inputPath, Writables.strings()));
+ PCollection<String> words = split(shakespeare, "\\s+");
+ PTable<String,Long> wordCounts = words.count();
+ PCollection<Put> wordCountPuts = convertToPuts(wordCounts);
+ HFileUtils.writePutsToHFilesForIncrementalLoad(
+ wordCountPuts,
+ testTable,
+ outputPath);
+
+ PipelineResult result = pipeline.run();
+ assertTrue(result.succeeded());
+
+ int hfilesCount = 0;
+ FileSystem fs = outputPath.getFileSystem(conf);
+ for (FileStatus e : fs.listStatus(new Path(outputPath, Bytes.toString(TEST_FAMILY)))) {
+ Path f = e.getPath();
+ if (!f.getName().startsWith("part-")) { // filter out "_SUCCESS"
+ continue;
+ }
+ HFile.Reader reader = null;
+ try {
+ reader = HFile.createReader(fs, f, new CacheConfig(conf));
+ assertEquals(DataBlockEncoding.PREFIX, reader.getEncodingOnDisk());
+ } finally {
+ reader.close();
+ }
+ hfilesCount++;
+ }
+ assertTrue(hfilesCount > 0);
+ }
+
+ private PCollection<Put> convertToPuts(PTable<String, Long> in) {
+ return in.parallelDo(new MapFn<Pair<String, Long>, Put>() {
+ @Override
+ public Put map(Pair<String, Long> input) {
+ String w = input.first();
+ long c = input.second();
+ Put p = new Put(Bytes.toBytes(w));
+ p.add(TEST_FAMILY, TEST_QUALIFIER, Bytes.toBytes(c));
+ return p;
+ }
+ }, Writables.writables(Put.class));
+ }
+
private PCollection<KeyValue> convertToKeyValues(PTable<String, Long> in) {
return in.parallelDo(new MapFn<Pair<String, Long>, KeyValue>() {
@Override
http://git-wip-us.apache.org/repos/asf/crunch/blob/6b994e3b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileOutputFormatForCrunch.java
----------------------------------------------------------------------
diff --git a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileOutputFormatForCrunch.java b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileOutputFormatForCrunch.java
index 311d91c..70f10d5 100644
--- a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileOutputFormatForCrunch.java
+++ b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileOutputFormatForCrunch.java
@@ -19,17 +19,18 @@
*/
package org.apache.crunch.io.hbase;
+import com.sun.org.apache.commons.logging.Log;
+import com.sun.org.apache.commons.logging.LogFactory;
+import org.apache.commons.codec.DecoderException;
+import org.apache.commons.codec.binary.Hex;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
-import org.apache.hadoop.hbase.io.hfile.Compression;
import org.apache.hadoop.hbase.io.hfile.HFile;
-import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder;
import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoderImpl;
-import org.apache.hadoop.hbase.io.hfile.NoOpDataBlockEncoder;
import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.regionserver.TimeRangeTracker;
@@ -39,6 +40,8 @@ import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
import java.io.IOException;
/**
@@ -53,13 +56,10 @@ import java.io.IOException;
*/
public class HFileOutputFormatForCrunch extends FileOutputFormat<Object, KeyValue> {
- private static final String COMPACTION_EXCLUDE_CONF_KEY =
- "hbase.mapreduce.hfileoutputformat.compaction.exclude";
- private static final String DATABLOCK_ENCODING_CONF_KEY =
- "hbase.mapreduce.hfileoutputformat.datablock.encoding";
- private static final String BLOCK_SIZE_CONF_KEY =
- "hbase.mapreduce.hfileoutputformat.blocksize";
- private static final String COMPRESSION_CONF_KEY = "hbase.hfileoutputformat.families.compression";
+ public static final String HCOLUMN_DESCRIPTOR_KEY = "hbase.hfileoutputformat.column.descriptor";
+ private static final String COMPACTION_EXCLUDE_CONF_KEY = "hbase.mapreduce.hfileoutputformat.compaction.exclude";
+ private static final Log LOG = LogFactory.getLog(HFileOutputFormatForCrunch.class);
+
private final byte [] now = Bytes.toBytes(System.currentTimeMillis());
private final TimeRangeTracker trt = new TimeRangeTracker();
@@ -68,20 +68,30 @@ public class HFileOutputFormatForCrunch extends FileOutputFormat<Object, KeyValu
Path outputPath = getDefaultWorkFile(context, "");
Configuration conf = context.getConfiguration();
FileSystem fs = outputPath.getFileSystem(conf);
- int blocksize = conf.getInt(BLOCK_SIZE_CONF_KEY,
- HFile.DEFAULT_BLOCKSIZE);
- String compression = conf.get(
- COMPRESSION_CONF_KEY, Compression.Algorithm.NONE.getName());
+
final boolean compactionExclude = conf.getBoolean(
COMPACTION_EXCLUDE_CONF_KEY, false);
- HFileDataBlockEncoder encoder = getDataBlockEncoder(
- conf.get(DATABLOCK_ENCODING_CONF_KEY));
+
+ String hcolStr = conf.get(HCOLUMN_DESCRIPTOR_KEY);
+ if (hcolStr == null) {
+ throw new AssertionError(HCOLUMN_DESCRIPTOR_KEY + " is not set in conf");
+ }
+ byte[] hcolBytes;
+ try {
+ hcolBytes = Hex.decodeHex(hcolStr.toCharArray());
+ } catch (DecoderException e) {
+ throw new AssertionError("Bad hex string: " + hcolStr);
+ }
+ HColumnDescriptor hcol = new HColumnDescriptor();
+ hcol.readFields(new DataInputStream(new ByteArrayInputStream(hcolBytes)));
+ LOG.info("Output path: " + outputPath);
+ LOG.info("HColumnDescriptor: " + hcol.toString());
final HFile.Writer writer = HFile.getWriterFactoryNoCache(conf)
.withPath(fs, outputPath)
- .withBlockSize(blocksize)
- .withCompression(compression)
+ .withBlockSize(hcol.getBlocksize())
+ .withCompression(hcol.getCompression())
.withComparator(KeyValue.KEY_COMPARATOR)
- .withDataBlockEncoder(encoder)
+ .withDataBlockEncoder(new HFileDataBlockEncoderImpl(hcol.getDataBlockEncoding()))
.withChecksumType(Store.getChecksumType(conf))
.withBytesPerChecksum(Store.getBytesPerChecksum(conf))
.create();
@@ -112,22 +122,4 @@ public class HFileOutputFormatForCrunch extends FileOutputFormat<Object, KeyValu
}
};
}
-
- private HFileDataBlockEncoder getDataBlockEncoder(String dataBlockEncodingStr) {
- final HFileDataBlockEncoder encoder;
- if (dataBlockEncodingStr == null) {
- encoder = NoOpDataBlockEncoder.INSTANCE;
- } else {
- try {
- encoder = new HFileDataBlockEncoderImpl(DataBlockEncoding
- .valueOf(dataBlockEncodingStr));
- } catch (IllegalArgumentException ex) {
- throw new RuntimeException(
- "Invalid data block encoding type configured for the param "
- + DATABLOCK_ENCODING_CONF_KEY + " : "
- + dataBlockEncodingStr);
- }
- }
- return encoder;
- }
}
http://git-wip-us.apache.org/repos/asf/crunch/blob/6b994e3b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileTarget.java
----------------------------------------------------------------------
diff --git a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileTarget.java b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileTarget.java
index 0038394..bc51b2c 100644
--- a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileTarget.java
+++ b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileTarget.java
@@ -17,20 +17,62 @@
*/
package org.apache.crunch.io.hbase;
+import com.google.common.base.Preconditions;
+import org.apache.commons.codec.binary.Hex;
+import org.apache.crunch.io.CrunchOutputs;
+import org.apache.crunch.io.FormatBundle;
import org.apache.crunch.io.SequentialFileNamingScheme;
import org.apache.crunch.io.impl.FileTargetImpl;
+import org.apache.crunch.types.PType;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class HFileTarget extends FileTargetImpl {
- // TODO(chaoshi): configurable compression algorithm, block size, data block encoder for hfile...
+ private static final HColumnDescriptor DEFAULT_COLUMN_DESCRIPTOR = new HColumnDescriptor();
+ private final HColumnDescriptor hcol;
public HFileTarget(String path) {
this(new Path(path));
}
public HFileTarget(Path path) {
+ this(path, DEFAULT_COLUMN_DESCRIPTOR);
+ }
+
+ public HFileTarget(Path path, HColumnDescriptor hcol) {
super(path, HFileOutputFormatForCrunch.class, new SequentialFileNamingScheme());
+ this.hcol = Preconditions.checkNotNull(hcol);
+ }
+
+ @Override
+ protected void configureForMapReduce(
+ Job job,
+ Class keyClass,
+ Class valueClass,
+ Class outputFormatClass,
+ Path outputPath,
+ String name) {
+ try {
+ FileOutputFormat.setOutputPath(job, outputPath);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+
+ String hcolStr = Hex.encodeHexString(WritableUtils.toByteArray(hcol));
+ if (name == null) {
+ job.setOutputFormatClass(HFileOutputFormatForCrunch.class);
+ job.setOutputKeyClass(keyClass);
+ job.setOutputValueClass(valueClass);
+ job.getConfiguration().set(HFileOutputFormatForCrunch.HCOLUMN_DESCRIPTOR_KEY, hcolStr);
+ } else {
+ FormatBundle<HFileOutputFormatForCrunch> bundle = FormatBundle.forOutput(HFileOutputFormatForCrunch.class);
+ bundle.set(HFileOutputFormatForCrunch.HCOLUMN_DESCRIPTOR_KEY, hcolStr);
+ CrunchOutputs.addNamedOutput(job, name, bundle, keyClass, valueClass);
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/crunch/blob/6b994e3b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileUtils.java
----------------------------------------------------------------------
diff --git a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileUtils.java b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileUtils.java
index 5e07a67..d026555 100644
--- a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileUtils.java
+++ b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileUtils.java
@@ -21,6 +21,8 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.sun.org.apache.commons.logging.Log;
import com.sun.org.apache.commons.logging.LogFactory;
+import org.apache.crunch.DoFn;
+import org.apache.crunch.Emitter;
import org.apache.crunch.FilterFn;
import org.apache.crunch.GroupingOptions;
import org.apache.crunch.MapFn;
@@ -34,6 +36,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.RawComparator;
@@ -104,10 +107,27 @@ public final class HFileUtils {
byte[] family = f.getName();
PCollection<KeyValue> sorted = sortAndPartition(
kvs.filter(new FilterByFamilyFn(family)), table);
- sorted.write(new HFileTarget(new Path(outputPath, Bytes.toString(family))));
+ sorted.write(new HFileTarget(new Path(outputPath, Bytes.toString(family)), f));
}
}
+ public static void writePutsToHFilesForIncrementalLoad(
+ PCollection<Put> puts,
+ HTable table,
+ Path outputPath) throws IOException {
+ PCollection<KeyValue> kvs = puts.parallelDo("ConvertPutToKeyValue", new DoFn<Put, KeyValue>() {
+ @Override
+ public void process(Put input, Emitter<KeyValue> emitter) {
+ for (List<KeyValue> keyValues : input.getFamilyMap().values()) {
+ for (KeyValue keyValue : keyValues) {
+ emitter.emit(keyValue);
+ }
+ }
+ }
+ }, writables(KeyValue.class));
+ writeToHFilesForIncrementalLoad(kvs, table, outputPath);
+ }
+
public static PCollection<KeyValue> sortAndPartition(PCollection<KeyValue> kvs, HTable table) throws IOException {
Configuration conf = kvs.getPipeline().getConfiguration();
PTable<KeyValue, Void> t = kvs.parallelDo(new MapFn<KeyValue, Pair<KeyValue, Void>>() {