You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by jw...@apache.org on 2013/09/09 20:31:36 UTC
git commit: CRUNCH-261: Make HBase sources readable.
Updated Branches:
refs/heads/master 9a63b294c -> 35136cb4e
CRUNCH-261: Make HBase sources readable.
Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/35136cb4
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/35136cb4
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/35136cb4
Branch: refs/heads/master
Commit: 35136cb4eaf45b6e4c69932d4a1dca04a68ce5c6
Parents: 9a63b29
Author: Josh Wills <jw...@apache.org>
Authored: Sat Sep 7 13:22:45 2013 -0700
Committer: Josh Wills <jw...@apache.org>
Committed: Mon Sep 9 10:55:50 2013 -0700
----------------------------------------------------------------------
.../apache/crunch/io/hbase/HFileSourceIT.java | 16 +++-
.../crunch/io/hbase/WordCountHBaseIT.java | 5 ++
.../crunch/io/hbase/HBaseSourceTarget.java | 72 +++++++++++++++-
.../crunch/io/hbase/HFileReaderFactory.java | 88 ++++++++++++++++++++
.../org/apache/crunch/io/hbase/HFileSource.java | 8 +-
5 files changed, 185 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/crunch/blob/35136cb4/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/HFileSourceIT.java
----------------------------------------------------------------------
diff --git a/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/HFileSourceIT.java b/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/HFileSourceIT.java
index 61e7663..9363aba 100644
--- a/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/HFileSourceIT.java
+++ b/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/HFileSourceIT.java
@@ -81,7 +81,6 @@ public class HFileSourceIT implements Serializable {
conf = tmpDir.getDefaultConfiguration();
}
- @Test
public void testHFileSource() throws IOException {
List<KeyValue> kvs = generateKeyValues(100);
Path inputPath = tmpDir.getPath("in");
@@ -108,6 +107,12 @@ public class HFileSourceIT implements Serializable {
}
@Test
+ public void testReadHFile() throws Exception {
+ List<KeyValue> kvs = generateKeyValues(100);
+ assertEquals(kvs, doTestReadHFiles(kvs, new Scan()));
+ }
+
+ @Test
public void testScanHFiles() throws IOException {
List<KeyValue> kvs = ImmutableList.of(
new KeyValue(ROW1, FAMILY1, QUALIFIER1, 0, VALUE1),
@@ -196,6 +201,15 @@ public class HFileSourceIT implements Serializable {
return ImmutableList.copyOf(results.materialize());
}
+ private List<KeyValue> doTestReadHFiles(List<KeyValue> kvs, Scan scan) throws IOException {
+ Path inputPath = tmpDir.getPath("in");
+ writeKeyValuesToHFile(inputPath, kvs);
+
+ Pipeline pipeline = new MRPipeline(HFileSourceIT.class, conf);
+ PCollection<KeyValue> results = pipeline.read(FromHBase.hfile(inputPath));
+ return ImmutableList.copyOf(results.materialize());
+ }
+
private List<KeyValue> generateKeyValues(int count) {
List<KeyValue> kvs = Lists.newArrayList();
for (int i = 0; i < count; i++) {
http://git-wip-us.apache.org/repos/asf/crunch/blob/35136cb4/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/WordCountHBaseIT.java
----------------------------------------------------------------------
diff --git a/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/WordCountHBaseIT.java b/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/WordCountHBaseIT.java
index 6375ea1..149e359 100644
--- a/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/WordCountHBaseIT.java
+++ b/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/WordCountHBaseIT.java
@@ -26,6 +26,7 @@ import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStreamReader;
+import java.util.Map;
import java.util.Random;
import java.util.jar.JarEntry;
import java.util.jar.JarOutputStream;
@@ -243,6 +244,10 @@ public class WordCountHBaseIT {
scan.addFamily(WORD_COLFAM);
HBaseSourceTarget source = new HBaseSourceTarget(inputTableName, scan);
PTable<ImmutableBytesWritable, Result> words = pipeline.read(source);
+
+ Map<ImmutableBytesWritable, Result> materialized = words.materializeToMap();
+ assertEquals(3, materialized.size());
+
PCollection<Put> puts = wordCount(words);
pipeline.write(puts, new HBaseTarget(outputTableName));
pipeline.write(puts, new HBaseTarget(otherTableName));
http://git-wip-us.apache.org/repos/asf/crunch/blob/35136cb4/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseSourceTarget.java
----------------------------------------------------------------------
diff --git a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseSourceTarget.java b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseSourceTarget.java
index 71e752a..c003e48 100644
--- a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseSourceTarget.java
+++ b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseSourceTarget.java
@@ -20,16 +20,17 @@ package org.apache.crunch.io.hbase;
import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;
+import java.util.Iterator;
import org.apache.commons.lang.builder.HashCodeBuilder;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.crunch.Pair;
-import org.apache.crunch.SourceTarget;
import org.apache.crunch.TableSource;
import org.apache.crunch.impl.mr.run.CrunchMapper;
import org.apache.crunch.io.CrunchInputs;
import org.apache.crunch.io.FormatBundle;
+import org.apache.crunch.io.ReadableSourceTarget;
import org.apache.crunch.types.Converter;
import org.apache.crunch.types.PTableType;
import org.apache.crunch.types.PType;
@@ -37,7 +38,9 @@ import org.apache.crunch.types.writable.Writables;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
@@ -45,7 +48,8 @@ import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.util.Base64;
import org.apache.hadoop.mapreduce.Job;
-public class HBaseSourceTarget extends HBaseTarget implements SourceTarget<Pair<ImmutableBytesWritable, Result>>,
+public class HBaseSourceTarget extends HBaseTarget implements
+ ReadableSourceTarget<Pair<ImmutableBytesWritable, Result>>,
TableSource<ImmutableBytesWritable, Result> {
private static final Log LOG = LogFactory.getLog(HBaseSourceTarget.class);
@@ -134,4 +138,68 @@ public class HBaseSourceTarget extends HBaseTarget implements SourceTarget<Pair<
public Converter<?, ?, ?, ?> getConverter() {
return PTYPE.getConverter();
}
+
+ @Override
+ public Iterable<Pair<ImmutableBytesWritable, Result>> read(Configuration conf) throws IOException {
+ Configuration hconf = HBaseConfiguration.create(conf);
+ HTable htable = new HTable(hconf, table);
+ return new HTableIterable(htable, scan);
+ }
+
+ private static class HTableIterable implements Iterable<Pair<ImmutableBytesWritable, Result>> {
+ private final HTable table;
+ private final Scan scan;
+
+ public HTableIterable(HTable table, Scan scan) {
+ this.table = table;
+ this.scan = scan;
+ }
+
+ @Override
+ public Iterator<Pair<ImmutableBytesWritable, Result>> iterator() {
+ try {
+ return new HTableIterator(table, table.getScanner(scan));
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ private static class HTableIterator implements Iterator<Pair<ImmutableBytesWritable, Result>> {
+
+ private final HTable table;
+ private final ResultScanner scanner;
+ private final Iterator<Result> iter;
+
+ public HTableIterator(HTable table, ResultScanner scanner) {
+ this.table = table;
+ this.scanner = scanner;
+ this.iter = scanner.iterator();
+ }
+
+ @Override
+ public boolean hasNext() {
+ boolean hasNext = iter.hasNext();
+ if (!hasNext) {
+ scanner.close();
+ try {
+ table.close();
+ } catch (IOException e) {
+ LOG.error("Exception closing HTable: " + table.getTableName(), e);
+ }
+ }
+ return hasNext;
+ }
+
+ @Override
+ public Pair<ImmutableBytesWritable, Result> next() {
+ Result next = iter.next();
+ return Pair.of(new ImmutableBytesWritable(next.getRow()), next);
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/crunch/blob/35136cb4/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileReaderFactory.java
----------------------------------------------------------------------
diff --git a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileReaderFactory.java b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileReaderFactory.java
new file mode 100644
index 0000000..f5db516
--- /dev/null
+++ b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileReaderFactory.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io.hbase;
+
+import org.apache.crunch.io.FileReaderFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+import org.apache.hadoop.hbase.io.hfile.HFile;
+import org.apache.hadoop.hbase.io.hfile.HFileScanner;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+public class HFileReaderFactory implements FileReaderFactory<KeyValue> {
+
+ public static final String HFILE_SCANNER_CACHE_BLOCKS = "crunch.hfile.scanner.cache.blocks";
+ public static final String HFILE_SCANNER_PREAD = "crunch.hfile.scanner.pread";
+
+ @Override
+ public Iterator<KeyValue> read(FileSystem fs, Path path) {
+ Configuration conf = fs.getConf();
+ CacheConfig cacheConfig = new CacheConfig(conf);
+ try {
+ HFile.Reader hfr = HFile.createReader(fs, path, cacheConfig);
+ HFileScanner scanner = hfr.getScanner(
+ conf.getBoolean(HFILE_SCANNER_CACHE_BLOCKS, false),
+ conf.getBoolean(HFILE_SCANNER_PREAD, false));
+ scanner.seekTo();
+ return new HFileIterator(scanner);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private static class HFileIterator implements Iterator<KeyValue> {
+
+ private final HFileScanner scanner;
+ private KeyValue curr;
+
+ public HFileIterator(HFileScanner scanner) {
+ this.scanner = scanner;
+ this.curr = scanner.getKeyValue();
+ }
+
+ @Override
+ public boolean hasNext() {
+ return curr != null;
+ }
+
+ @Override
+ public KeyValue next() {
+ KeyValue ret = curr;
+ try {
+ if (scanner.next()) {
+ curr = scanner.getKeyValue();
+ } else {
+ curr = null;
+ }
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ return ret;
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException("HFileIterator is read-only");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/crunch/blob/35136cb4/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileSource.java
----------------------------------------------------------------------
diff --git a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileSource.java b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileSource.java
index 7fa7280..c1d15a2 100644
--- a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileSource.java
+++ b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileSource.java
@@ -66,7 +66,13 @@ public class HFileSource extends FileSourceImpl<KeyValue> implements ReadableSou
@Override
public Iterable<KeyValue> read(Configuration conf) throws IOException {
- throw new UnsupportedOperationException("HFileSource#read(Configuration) is not implemented yet");
+ conf = new Configuration(conf);
+ inputBundle.configure(conf);
+ if (conf.get(HFileInputFormat.START_ROW_KEY) != null ||
+ conf.get(HFileInputFormat.STOP_ROW_KEY) != null) {
+ throw new IllegalStateException("Cannot filter row ranges in HFileSource.read");
+ }
+ return read(conf, new HFileReaderFactory());
}
@Override