You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by jw...@apache.org on 2013/09/09 20:31:36 UTC

git commit: CRUNCH-261: Make HBase sources readable.

Updated Branches:
  refs/heads/master 9a63b294c -> 35136cb4e


CRUNCH-261: Make HBase sources readable.


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/35136cb4
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/35136cb4
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/35136cb4

Branch: refs/heads/master
Commit: 35136cb4eaf45b6e4c69932d4a1dca04a68ce5c6
Parents: 9a63b29
Author: Josh Wills <jw...@apache.org>
Authored: Sat Sep 7 13:22:45 2013 -0700
Committer: Josh Wills <jw...@apache.org>
Committed: Mon Sep 9 10:55:50 2013 -0700

----------------------------------------------------------------------
 .../apache/crunch/io/hbase/HFileSourceIT.java   | 16 +++-
 .../crunch/io/hbase/WordCountHBaseIT.java       |  5 ++
 .../crunch/io/hbase/HBaseSourceTarget.java      | 72 +++++++++++++++-
 .../crunch/io/hbase/HFileReaderFactory.java     | 88 ++++++++++++++++++++
 .../org/apache/crunch/io/hbase/HFileSource.java |  8 +-
 5 files changed, 185 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/35136cb4/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/HFileSourceIT.java
----------------------------------------------------------------------
diff --git a/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/HFileSourceIT.java b/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/HFileSourceIT.java
index 61e7663..9363aba 100644
--- a/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/HFileSourceIT.java
+++ b/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/HFileSourceIT.java
@@ -81,7 +81,6 @@ public class HFileSourceIT implements Serializable {
     conf = tmpDir.getDefaultConfiguration();
   }
 
-  @Test
   public void testHFileSource() throws IOException {
     List<KeyValue> kvs = generateKeyValues(100);
     Path inputPath = tmpDir.getPath("in");
@@ -108,6 +107,12 @@ public class HFileSourceIT implements Serializable {
   }
 
   @Test
+  public void testReadHFile() throws Exception {
+    List<KeyValue> kvs = generateKeyValues(100);
+    assertEquals(kvs, doTestReadHFiles(kvs, new Scan()));
+  }
+
+  @Test
   public void testScanHFiles() throws IOException {
     List<KeyValue> kvs = ImmutableList.of(
         new KeyValue(ROW1, FAMILY1, QUALIFIER1, 0, VALUE1),
@@ -196,6 +201,15 @@ public class HFileSourceIT implements Serializable {
     return ImmutableList.copyOf(results.materialize());
   }
 
+  private List<KeyValue> doTestReadHFiles(List<KeyValue> kvs, Scan scan) throws IOException {
+    Path inputPath = tmpDir.getPath("in");
+    writeKeyValuesToHFile(inputPath, kvs);
+
+    Pipeline pipeline = new MRPipeline(HFileSourceIT.class, conf);
+    PCollection<KeyValue> results = pipeline.read(FromHBase.hfile(inputPath));
+    return ImmutableList.copyOf(results.materialize());
+  }
+
   private List<KeyValue> generateKeyValues(int count) {
     List<KeyValue> kvs = Lists.newArrayList();
     for (int i = 0; i < count; i++) {

http://git-wip-us.apache.org/repos/asf/crunch/blob/35136cb4/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/WordCountHBaseIT.java
----------------------------------------------------------------------
diff --git a/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/WordCountHBaseIT.java b/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/WordCountHBaseIT.java
index 6375ea1..149e359 100644
--- a/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/WordCountHBaseIT.java
+++ b/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/WordCountHBaseIT.java
@@ -26,6 +26,7 @@ import java.io.FileInputStream;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.InputStreamReader;
+import java.util.Map;
 import java.util.Random;
 import java.util.jar.JarEntry;
 import java.util.jar.JarOutputStream;
@@ -243,6 +244,10 @@ public class WordCountHBaseIT {
       scan.addFamily(WORD_COLFAM);
       HBaseSourceTarget source = new HBaseSourceTarget(inputTableName, scan);
       PTable<ImmutableBytesWritable, Result> words = pipeline.read(source);
+
+      Map<ImmutableBytesWritable, Result> materialized = words.materializeToMap();
+      assertEquals(3, materialized.size());
+
       PCollection<Put> puts = wordCount(words);
       pipeline.write(puts, new HBaseTarget(outputTableName));
       pipeline.write(puts, new HBaseTarget(otherTableName));

http://git-wip-us.apache.org/repos/asf/crunch/blob/35136cb4/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseSourceTarget.java
----------------------------------------------------------------------
diff --git a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseSourceTarget.java b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseSourceTarget.java
index 71e752a..c003e48 100644
--- a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseSourceTarget.java
+++ b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseSourceTarget.java
@@ -20,16 +20,17 @@ package org.apache.crunch.io.hbase;
 import java.io.ByteArrayOutputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
+import java.util.Iterator;
 
 import org.apache.commons.lang.builder.HashCodeBuilder;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.crunch.Pair;
-import org.apache.crunch.SourceTarget;
 import org.apache.crunch.TableSource;
 import org.apache.crunch.impl.mr.run.CrunchMapper;
 import org.apache.crunch.io.CrunchInputs;
 import org.apache.crunch.io.FormatBundle;
+import org.apache.crunch.io.ReadableSourceTarget;
 import org.apache.crunch.types.Converter;
 import org.apache.crunch.types.PTableType;
 import org.apache.crunch.types.PType;
@@ -37,7 +38,9 @@ import org.apache.crunch.types.writable.Writables;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
@@ -45,7 +48,8 @@ import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
 import org.apache.hadoop.hbase.util.Base64;
 import org.apache.hadoop.mapreduce.Job;
 
-public class HBaseSourceTarget extends HBaseTarget implements SourceTarget<Pair<ImmutableBytesWritable, Result>>,
+public class HBaseSourceTarget extends HBaseTarget implements
+    ReadableSourceTarget<Pair<ImmutableBytesWritable, Result>>,
     TableSource<ImmutableBytesWritable, Result> {
 
   private static final Log LOG = LogFactory.getLog(HBaseSourceTarget.class);
@@ -134,4 +138,68 @@ public class HBaseSourceTarget extends HBaseTarget implements SourceTarget<Pair<
   public Converter<?, ?, ?, ?> getConverter() {
     return PTYPE.getConverter();
   }
+
+  @Override
+  public Iterable<Pair<ImmutableBytesWritable, Result>> read(Configuration conf) throws IOException {
+    Configuration hconf = HBaseConfiguration.create(conf);
+    HTable htable = new HTable(hconf, table);
+    return new HTableIterable(htable, scan);
+  }
+
+  private static class HTableIterable implements Iterable<Pair<ImmutableBytesWritable, Result>> {
+    private final HTable table;
+    private final Scan scan;
+
+    public HTableIterable(HTable table, Scan scan) {
+      this.table = table;
+      this.scan = scan;
+    }
+
+    @Override
+    public Iterator<Pair<ImmutableBytesWritable, Result>> iterator() {
+      try {
+        return new HTableIterator(table, table.getScanner(scan));
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+    }
+  }
+
+  private static class HTableIterator implements Iterator<Pair<ImmutableBytesWritable, Result>> {
+
+    private final HTable table;
+    private final ResultScanner scanner;
+    private final Iterator<Result> iter;
+
+    public HTableIterator(HTable table, ResultScanner scanner) {
+      this.table = table;
+      this.scanner = scanner;
+      this.iter = scanner.iterator();
+    }
+
+    @Override
+    public boolean hasNext() {
+      boolean hasNext = iter.hasNext();
+      if (!hasNext) {
+        scanner.close();
+        try {
+          table.close();
+        } catch (IOException e) {
+          LOG.error("Exception closing HTable: " + table.getTableName(), e);
+        }
+      }
+      return hasNext;
+    }
+
+    @Override
+    public Pair<ImmutableBytesWritable, Result> next() {
+      Result next = iter.next();
+      return Pair.of(new ImmutableBytesWritable(next.getRow()), next);
+    }
+
+    @Override
+    public void remove() {
+      throw new UnsupportedOperationException();
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/crunch/blob/35136cb4/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileReaderFactory.java
----------------------------------------------------------------------
diff --git a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileReaderFactory.java b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileReaderFactory.java
new file mode 100644
index 0000000..f5db516
--- /dev/null
+++ b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileReaderFactory.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io.hbase;
+
+import org.apache.crunch.io.FileReaderFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+import org.apache.hadoop.hbase.io.hfile.HFile;
+import org.apache.hadoop.hbase.io.hfile.HFileScanner;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+public class HFileReaderFactory implements FileReaderFactory<KeyValue> {
+
+  public static final String HFILE_SCANNER_CACHE_BLOCKS = "crunch.hfile.scanner.cache.blocks";
+  public static final String HFILE_SCANNER_PREAD = "crunch.hfile.scanner.pread";
+
+  @Override
+  public Iterator<KeyValue> read(FileSystem fs, Path path) {
+    Configuration conf = fs.getConf();
+    CacheConfig cacheConfig = new CacheConfig(conf);
+    try {
+      HFile.Reader hfr = HFile.createReader(fs, path, cacheConfig);
+      HFileScanner scanner = hfr.getScanner(
+          conf.getBoolean(HFILE_SCANNER_CACHE_BLOCKS, false),
+          conf.getBoolean(HFILE_SCANNER_PREAD, false));
+      scanner.seekTo();
+      return new HFileIterator(scanner);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  private static class HFileIterator implements Iterator<KeyValue> {
+
+    private final HFileScanner scanner;
+    private KeyValue curr;
+
+    public HFileIterator(HFileScanner scanner) {
+      this.scanner = scanner;
+      this.curr = scanner.getKeyValue();
+    }
+
+    @Override
+    public boolean hasNext() {
+      return curr != null;
+    }
+
+    @Override
+    public KeyValue next() {
+      KeyValue ret = curr;
+      try {
+        if (scanner.next()) {
+          curr = scanner.getKeyValue();
+        } else {
+          curr = null;
+        }
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+      return ret;
+    }
+
+    @Override
+    public void remove() {
+      throw new UnsupportedOperationException("HFileIterator is read-only");
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/35136cb4/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileSource.java
----------------------------------------------------------------------
diff --git a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileSource.java b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileSource.java
index 7fa7280..c1d15a2 100644
--- a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileSource.java
+++ b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileSource.java
@@ -66,7 +66,13 @@ public class HFileSource extends FileSourceImpl<KeyValue> implements ReadableSou
 
   @Override
   public Iterable<KeyValue> read(Configuration conf) throws IOException {
-    throw new UnsupportedOperationException("HFileSource#read(Configuration) is not implemented yet");
+    conf = new Configuration(conf);
+    inputBundle.configure(conf);
+    if (conf.get(HFileInputFormat.START_ROW_KEY) != null ||
+        conf.get(HFileInputFormat.STOP_ROW_KEY) != null) {
+      throw new IllegalStateException("Cannot filter row ranges in HFileSource.read");
+    }
+    return read(conf, new HFileReaderFactory());
   }
 
   @Override