You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by gr...@apache.org on 2016/05/12 19:15:09 UTC

crunch git commit: CRUNCH-608 Write Bloom filters in HFiles

Repository: crunch
Updated Branches:
  refs/heads/master 49e457559 -> c09c4ee2d


CRUNCH-608 Write Bloom filters in HFiles

Use a correctly-configured StoreFile.Writer (instead of HFile.Writer)
for writing HFiles so that Bloom filter data is also included in
the written HFiles.


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/c09c4ee2
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/c09c4ee2
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/c09c4ee2

Branch: refs/heads/master
Commit: c09c4ee2de992b50c15d2cb91a3e6e22c88fb0b1
Parents: 49e4575
Author: Gabriel Reid <gr...@apache.org>
Authored: Tue May 10 11:02:11 2016 +0200
Committer: Gabriel Reid <gr...@apache.org>
Committed: Tue May 10 11:02:11 2016 +0200

----------------------------------------------------------------------
 .../org/apache/crunch/io/hbase/HFileTargetIT.java   | 16 ++++++++++++++++
 .../crunch/io/hbase/HFileOutputFormatForCrunch.java | 11 ++++++++---
 2 files changed, 24 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/c09c4ee2/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/HFileTargetIT.java
----------------------------------------------------------------------
diff --git a/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/HFileTargetIT.java b/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/HFileTargetIT.java
index c78ae75..af24865 100644
--- a/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/HFileTargetIT.java
+++ b/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/HFileTargetIT.java
@@ -66,10 +66,14 @@ import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
 import org.apache.hadoop.hbase.io.hfile.HFile;
 import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
+import org.apache.hadoop.hbase.regionserver.BloomType;
 import org.apache.hadoop.hbase.regionserver.KeyValueHeap;
 import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
 import org.apache.hadoop.hbase.regionserver.StoreFile;
 import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
+import org.apache.hadoop.hbase.util.BloomFilter;
+import org.apache.hadoop.hbase.util.BloomFilterFactory;
+import org.apache.hadoop.hbase.util.ByteBloomFilter;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.NullWritable;
@@ -81,6 +85,7 @@ import org.junit.Rule;
 import org.junit.Test;
 
 import java.io.BufferedReader;
+import java.io.DataInput;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.InputStreamReader;
@@ -96,7 +101,9 @@ import java.util.Random;
 import static org.apache.crunch.types.writable.Writables.nulls;
 import static org.apache.crunch.types.writable.Writables.tableOf;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNotSame;
+import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -277,6 +284,7 @@ public class HFileTargetIT implements Serializable {
     Path outputPath = getTempPathOnHDFS("out");
     HColumnDescriptor hcol = new HColumnDescriptor(TEST_FAMILY);
     hcol.setDataBlockEncoding(newBlockEncoding);
+    hcol.setBloomFilterType(BloomType.ROWCOL);
     HTable testTable = createTable(26, hcol);
 
     PCollection<String> shakespeare = pipeline.read(At.textFile(inputPath, Writables.strings()));
@@ -303,6 +311,14 @@ public class HFileTargetIT implements Serializable {
       try {
         reader = HFile.createReader(fs, f, new CacheConfig(conf), conf);
         assertEquals(DataBlockEncoding.PREFIX, reader.getDataBlockEncoding());
+
+        BloomType bloomFilterType = BloomType.valueOf(Bytes.toString(
+            reader.loadFileInfo().get(StoreFile.BLOOM_FILTER_TYPE_KEY)));
+        assertEquals(BloomType.ROWCOL, bloomFilterType);
+        DataInput bloomMeta = reader.getGeneralBloomFilterMetadata();
+        assertNotNull(bloomMeta);
+        BloomFilter bloomFilter = BloomFilterFactory.createFromMeta(bloomMeta, reader);
+        assertNotNull(bloomFilter);
       } finally {
         if (reader != null) {
           reader.close();

http://git-wip-us.apache.org/repos/asf/crunch/blob/c09c4ee2/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileOutputFormatForCrunch.java
----------------------------------------------------------------------
diff --git a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileOutputFormatForCrunch.java b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileOutputFormatForCrunch.java
index 7611235..0b6ae2f 100644
--- a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileOutputFormatForCrunch.java
+++ b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileOutputFormatForCrunch.java
@@ -30,8 +30,10 @@ import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.Tag;
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
 import org.apache.hadoop.hbase.io.hfile.HFile;
 import org.apache.hadoop.hbase.io.hfile.HFileContext;
+import org.apache.hadoop.hbase.regionserver.BloomType;
 import org.apache.hadoop.hbase.regionserver.StoreFile;
 import org.apache.hadoop.hbase.regionserver.TimeRangeTracker;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -89,11 +91,14 @@ public class HFileOutputFormatForCrunch extends FileOutputFormat<Object, Cell> {
     hcol.readFields(new DataInputStream(new ByteArrayInputStream(hcolBytes)));
     LOG.info("Output path: {}", outputPath);
     LOG.info("HColumnDescriptor: {}", hcol.toString());
-    final HFile.Writer writer = HFile.getWriterFactoryNoCache(conf)
-        .withPath(fs, outputPath)
+    Configuration noCacheConf = new Configuration(conf);
+    noCacheConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0.0f);
+    final StoreFile.Writer writer = new StoreFile.WriterBuilder(conf, new CacheConfig(noCacheConf), fs)
         .withComparator(KeyValue.COMPARATOR)
         .withFileContext(getContext(hcol))
-        .create();
+        .withFilePath(outputPath)
+        .withBloomType(hcol.getBloomFilterType())
+        .build();
 
     return new RecordWriter<Object, Cell>() {
       @Override