You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by mk...@apache.org on 2014/05/28 22:37:01 UTC
git commit: CRUNCH-387: Added support for the 0.8 code stream to
support multiple HBase Scans
Repository: crunch
Updated Branches:
refs/heads/apache-crunch-0.8 dbec907af -> 8fe96d6cc
CRUNCH-387: Added support for the 0.8 code stream to support multiple HBase Scans
Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/8fe96d6c
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/8fe96d6c
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/8fe96d6c
Branch: refs/heads/apache-crunch-0.8
Commit: 8fe96d6ccb7bb53a5fa0c3f011f3e6e6e8133838
Parents: dbec907
Author: Micah Whitacre <mk...@apache.org>
Authored: Wed May 28 10:57:31 2014 -0500
Committer: Micah Whitacre <mk...@apache.org>
Committed: Wed May 28 12:09:06 2014 -0500
----------------------------------------------------------------------
.../crunch/io/hbase/WordCountHBaseIT.java | 13 +++++-
.../org/apache/crunch/io/hbase/HBaseData.java | 17 +++++---
.../crunch/io/hbase/HBaseSourceTarget.java | 43 +++++++++++++-------
.../apache/crunch/io/hbase/HTableIterable.java | 13 +++---
.../apache/crunch/io/hbase/HTableIterator.java | 35 ++++++++++++----
pom.xml | 2 +-
6 files changed, 85 insertions(+), 38 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/crunch/blob/8fe96d6c/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/WordCountHBaseIT.java
----------------------------------------------------------------------
diff --git a/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/WordCountHBaseIT.java b/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/WordCountHBaseIT.java
index af32c1a..2bbb70b 100644
--- a/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/WordCountHBaseIT.java
+++ b/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/WordCountHBaseIT.java
@@ -69,6 +69,8 @@ import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableSet;
import com.google.common.io.ByteStreams;
+import javax.ws.rs.HEAD;
+
public class WordCountHBaseIT {
static class StringifyFn extends MapFn<Pair<ImmutableBytesWritable, Pair<Result, Result>>, String> {
@@ -237,9 +239,18 @@ public class WordCountHBaseIT {
key = put(inputTable, key, "cat");
key = put(inputTable, key, "cat");
key = put(inputTable, key, "dog");
+ inputTable.flushCommits();
+
+ //Setup scan using multiple scans that simply cut the rows in half.
Scan scan = new Scan();
scan.addFamily(WORD_COLFAM);
- HBaseSourceTarget source = new HBaseSourceTarget(inputTableName, scan);
+ byte[] cutoffPoint = Bytes.toBytes(2);
+ scan.setStopRow(cutoffPoint);
+ Scan scan2 = new Scan();
+ scan.addFamily(WORD_COLFAM);
+ scan2.setStartRow(cutoffPoint);
+
+ HBaseSourceTarget source = new HBaseSourceTarget(inputTableName, scan, scan2);
PTable<ImmutableBytesWritable, Result> words = pipeline.read(source);
Map<ImmutableBytesWritable, Result> materialized = words.materializeToMap();
http://git-wip-us.apache.org/repos/asf/crunch/blob/8fe96d6c/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseData.java
----------------------------------------------------------------------
diff --git a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseData.java b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseData.java
index 84c39db..84de288 100644
--- a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseData.java
+++ b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseData.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.mapreduce.TaskInputOutputContext;
+import org.apache.hadoop.util.StringUtils;
import java.io.IOException;
import java.util.Set;
@@ -35,12 +36,12 @@ import java.util.Set;
public class HBaseData implements ReadableData<Pair<ImmutableBytesWritable, Result>> {
private final String table;
- private final String scanAsString;
+ private final String scansAsString;
private transient SourceTarget parent;
- public HBaseData(String table, String scanAsString, SourceTarget<?> parent) {
+ public HBaseData(String table, String scansAsString, SourceTarget<?> parent) {
this.table = table;
- this.scanAsString = scanAsString;
+ this.scansAsString = scansAsString;
this.parent = parent;
}
@@ -63,7 +64,13 @@ public class HBaseData implements ReadableData<Pair<ImmutableBytesWritable, Resu
TaskInputOutputContext<?, ?, ?, ?> ctxt) throws IOException {
Configuration hconf = HBaseConfiguration.create(ctxt.getConfiguration());
HTable htable = new HTable(hconf, table);
- Scan scan = HBaseSourceTarget.convertStringToScan(scanAsString);
- return new HTableIterable(htable, scan);
+
+ String[] scanStrings = StringUtils.getStrings(scansAsString);
+ Scan[] scans = new Scan[scanStrings.length];
+ for(int i = 0; i < scanStrings.length; i++){
+ scans[i] = HBaseSourceTarget.convertStringToScan(scanStrings[i]);
+ }
+
+ return new HTableIterable(htable, scans);
}
}
http://git-wip-us.apache.org/repos/asf/crunch/blob/8fe96d6c/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseSourceTarget.java
----------------------------------------------------------------------
diff --git a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseSourceTarget.java b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseSourceTarget.java
index c1d7eb7..6ed3b42 100644
--- a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseSourceTarget.java
+++ b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseSourceTarget.java
@@ -46,10 +46,12 @@ import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
+import org.apache.hadoop.hbase.mapreduce.MultiTableInputFormat;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.util.Base64;
+import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.util.StringUtils;
public class HBaseSourceTarget extends HBaseTarget implements
ReadableSourceTarget<Pair<ImmutableBytesWritable, Result>>,
@@ -60,16 +62,31 @@ public class HBaseSourceTarget extends HBaseTarget implements
private static final PTableType<ImmutableBytesWritable, Result> PTYPE = Writables.tableOf(
Writables.writables(ImmutableBytesWritable.class), Writables.writables(Result.class));
- protected Scan scan;
- private FormatBundle<TableInputFormat> inputBundle;
+ protected Scan[] scans;
+ protected String scansAsString;
+ private FormatBundle<MultiTableInputFormat> inputBundle;
- public HBaseSourceTarget(String table, Scan scan) {
+ public HBaseSourceTarget(String table, Scan... scans) {
super(table);
- this.scan = scan;
+ this.scans = scans;
+
try {
- this.inputBundle = FormatBundle.forInput(TableInputFormat.class)
- .set(TableInputFormat.INPUT_TABLE, table)
- .set(TableInputFormat.SCAN, convertScanToString(scan));
+
+ byte[] tableName = Bytes.toBytes(table);
+ //Copy scans and enforce that they are for the table specified
+ Scan[] tableScans = new Scan[scans.length];
+ String[] scanStrings = new String[scans.length];
+ for(int i = 0; i < scans.length; i++){
+ tableScans[i] = new Scan(scans[i]);
+ //enforce Scan is for same table
+ tableScans[i].setAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME, tableName);
+ //Convert the Scan into a String
+ scanStrings[i] = convertScanToString(tableScans[i]);
+ }
+ this.scans = tableScans;
+ this.scansAsString = StringUtils.arrayToString(scanStrings);
+ this.inputBundle = FormatBundle.forInput(MultiTableInputFormat.class)
+ .set(MultiTableInputFormat.SCANS, scansAsString);
} catch (IOException e) {
throw new RuntimeException(e);
}
@@ -103,7 +120,7 @@ public class HBaseSourceTarget extends HBaseTarget implements
@Override
public int hashCode() {
- return new HashCodeBuilder().append(table).append(scan).toHashCode();
+ return new HashCodeBuilder().append(table).append(scansAsString).toHashCode();
}
@Override
@@ -161,16 +178,12 @@ public class HBaseSourceTarget extends HBaseTarget implements
public Iterable<Pair<ImmutableBytesWritable, Result>> read(Configuration conf) throws IOException {
Configuration hconf = HBaseConfiguration.create(conf);
HTable htable = new HTable(hconf, table);
- return new HTableIterable(htable, scan);
+ return new HTableIterable(htable, scans);
}
@Override
public ReadableData<Pair<ImmutableBytesWritable, Result>> asReadable() {
- try {
- return new HBaseData(table, convertScanToString(scan), this);
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
+ return new HBaseData(table, scansAsString, this);
}
@Override
http://git-wip-us.apache.org/repos/asf/crunch/blob/8fe96d6c/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HTableIterable.java
----------------------------------------------------------------------
diff --git a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HTableIterable.java b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HTableIterable.java
index c58732c..a3dfc7d 100644
--- a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HTableIterable.java
+++ b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HTableIterable.java
@@ -26,23 +26,20 @@ import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import java.io.IOException;
+import java.util.Arrays;
import java.util.Iterator;
class HTableIterable implements Iterable<Pair<ImmutableBytesWritable, Result>> {
private final HTable table;
- private final Scan scan;
+ private final Scan[] scans;
- public HTableIterable(HTable table, Scan scan) {
+ public HTableIterable(HTable table, Scan... scans) {
this.table = table;
- this.scan = scan;
+ this.scans = scans;
}
@Override
public Iterator<Pair<ImmutableBytesWritable, Result>> iterator() {
- try {
- return new HTableIterator(table, table.getScanner(scan));
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
+ return new HTableIterator(table, Arrays.asList(scans));
}
}
http://git-wip-us.apache.org/repos/asf/crunch/blob/8fe96d6c/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HTableIterator.java
----------------------------------------------------------------------
diff --git a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HTableIterator.java b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HTableIterator.java
index daa4a48..d679b72 100644
--- a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HTableIterator.java
+++ b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HTableIterator.java
@@ -25,21 +25,30 @@ import org.apache.crunch.Pair;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes;
import java.io.IOException;
import java.util.Iterator;
+import java.util.List;
class HTableIterator implements Iterator<Pair<ImmutableBytesWritable, Result>> {
private static final Log LOG = LogFactory.getLog(HTableIterator.class);
private final HTable table;
- private final ResultScanner scanner;
- private final Iterator<Result> iter;
+ private final Iterator<Scan> scans;
+ private ResultScanner scanner;
+ private Iterator<Result> iter;
- public HTableIterator(HTable table, ResultScanner scanner) {
+ public HTableIterator(HTable table, List<Scan> scans) {
this.table = table;
- this.scanner = scanner;
+ this.scans = scans.iterator();
+ try{
+ this.scanner = table.getScanner(this.scans.next());
+ }catch(IOException ioe){
+ throw new RuntimeException(ioe);
+ }
this.iter = scanner.iterator();
}
@@ -48,10 +57,20 @@ class HTableIterator implements Iterator<Pair<ImmutableBytesWritable, Result>> {
boolean hasNext = iter.hasNext();
if (!hasNext) {
scanner.close();
- try {
- table.close();
- } catch (IOException e) {
- LOG.error("Exception closing HTable: " + table.getTableName(), e);
+ hasNext = scans.hasNext();
+ if(hasNext){
+ try{
+ scanner = table.getScanner(this.scans.next());
+ iter = scanner.iterator();
+ } catch(IOException ioe){
+ throw new RuntimeException("Unable to create next scanner from "+ Bytes.toString(table.getTableName()), ioe);
+ }
+ } else {
+ try {
+ table.close();
+ } catch (IOException e) {
+ LOG.error("Exception closing HTable: " + Bytes.toString(table.getTableName()), e);
+ }
}
}
return hasNext;
http://git-wip-us.apache.org/repos/asf/crunch/blob/8fe96d6c/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 4f82457..224d5a6 100644
--- a/pom.xml
+++ b/pom.xml
@@ -86,7 +86,7 @@ under the License.
<mockito.version>1.9.0</mockito.version>
<pkg>org.apache.crunch</pkg>
<hadoop.version>1.1.2</hadoop.version>
- <hbase.version>0.94.3</hbase.version>
+ <hbase.version>0.94.15</hbase.version>
<scala.base.version>2.10</scala.base.version>
<scala.version>2.10.4</scala.version>