You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by mk...@apache.org on 2014/05/28 22:37:01 UTC

git commit: CRUNCH-387: Added support for the 0.8 code stream to support multiple HBase Scans

Repository: crunch
Updated Branches:
  refs/heads/apache-crunch-0.8 dbec907af -> 8fe96d6cc


CRUNCH-387: Added support for the 0.8 code stream to support multiple HBase Scans


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/8fe96d6c
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/8fe96d6c
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/8fe96d6c

Branch: refs/heads/apache-crunch-0.8
Commit: 8fe96d6ccb7bb53a5fa0c3f011f3e6e6e8133838
Parents: dbec907
Author: Micah Whitacre <mk...@apache.org>
Authored: Wed May 28 10:57:31 2014 -0500
Committer: Micah Whitacre <mk...@apache.org>
Committed: Wed May 28 12:09:06 2014 -0500

----------------------------------------------------------------------
 .../crunch/io/hbase/WordCountHBaseIT.java       | 13 +++++-
 .../org/apache/crunch/io/hbase/HBaseData.java   | 17 +++++---
 .../crunch/io/hbase/HBaseSourceTarget.java      | 43 +++++++++++++-------
 .../apache/crunch/io/hbase/HTableIterable.java  | 13 +++---
 .../apache/crunch/io/hbase/HTableIterator.java  | 35 ++++++++++++----
 pom.xml                                         |  2 +-
 6 files changed, 85 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/8fe96d6c/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/WordCountHBaseIT.java
----------------------------------------------------------------------
diff --git a/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/WordCountHBaseIT.java b/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/WordCountHBaseIT.java
index af32c1a..2bbb70b 100644
--- a/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/WordCountHBaseIT.java
+++ b/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/WordCountHBaseIT.java
@@ -69,6 +69,8 @@ import com.google.common.base.Joiner;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.io.ByteStreams;
 
+import javax.ws.rs.HEAD;
+
 public class WordCountHBaseIT {
 
   static class StringifyFn extends MapFn<Pair<ImmutableBytesWritable, Pair<Result, Result>>, String> {
@@ -237,9 +239,18 @@ public class WordCountHBaseIT {
     key = put(inputTable, key, "cat");
     key = put(inputTable, key, "cat");
     key = put(inputTable, key, "dog");
+    inputTable.flushCommits();
+
+    //Setup scan using multiple scans that simply cut the rows in half.
     Scan scan = new Scan();
     scan.addFamily(WORD_COLFAM);
-    HBaseSourceTarget source = new HBaseSourceTarget(inputTableName, scan);
+    byte[] cutoffPoint = Bytes.toBytes(2);
+    scan.setStopRow(cutoffPoint);
+    Scan scan2 = new Scan();
+    scan.addFamily(WORD_COLFAM);
+    scan2.setStartRow(cutoffPoint);
+
+    HBaseSourceTarget source = new HBaseSourceTarget(inputTableName, scan, scan2);
     PTable<ImmutableBytesWritable, Result> words = pipeline.read(source);
 
     Map<ImmutableBytesWritable, Result> materialized = words.materializeToMap();

http://git-wip-us.apache.org/repos/asf/crunch/blob/8fe96d6c/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseData.java
----------------------------------------------------------------------
diff --git a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseData.java b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseData.java
index 84c39db..84de288 100644
--- a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseData.java
+++ b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseData.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.mapreduce.TaskInputOutputContext;
+import org.apache.hadoop.util.StringUtils;
 
 import java.io.IOException;
 import java.util.Set;
@@ -35,12 +36,12 @@ import java.util.Set;
 public class HBaseData implements ReadableData<Pair<ImmutableBytesWritable, Result>> {
 
   private final String table;
-  private final String scanAsString;
+  private final String scansAsString;
   private transient SourceTarget parent;
 
-  public HBaseData(String table, String scanAsString, SourceTarget<?> parent) {
+  public HBaseData(String table, String scansAsString, SourceTarget<?> parent) {
     this.table = table;
-    this.scanAsString = scanAsString;
+    this.scansAsString = scansAsString;
     this.parent = parent;
   }
 
@@ -63,7 +64,13 @@ public class HBaseData implements ReadableData<Pair<ImmutableBytesWritable, Resu
       TaskInputOutputContext<?, ?, ?, ?> ctxt) throws IOException {
     Configuration hconf = HBaseConfiguration.create(ctxt.getConfiguration());
     HTable htable = new HTable(hconf, table);
-    Scan scan = HBaseSourceTarget.convertStringToScan(scanAsString);
-    return new HTableIterable(htable, scan);
+
+    String[] scanStrings = StringUtils.getStrings(scansAsString);
+    Scan[] scans = new Scan[scanStrings.length];
+    for(int i = 0; i < scanStrings.length; i++){
+      scans[i] = HBaseSourceTarget.convertStringToScan(scanStrings[i]);
+    }
+
+    return new HTableIterable(htable, scans);
   }
 }

http://git-wip-us.apache.org/repos/asf/crunch/blob/8fe96d6c/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseSourceTarget.java
----------------------------------------------------------------------
diff --git a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseSourceTarget.java b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseSourceTarget.java
index c1d7eb7..6ed3b42 100644
--- a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseSourceTarget.java
+++ b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseSourceTarget.java
@@ -46,10 +46,12 @@ import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
+import org.apache.hadoop.hbase.mapreduce.MultiTableInputFormat;
 import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
 import org.apache.hadoop.hbase.util.Base64;
+import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.util.StringUtils;
 
 public class HBaseSourceTarget extends HBaseTarget implements
     ReadableSourceTarget<Pair<ImmutableBytesWritable, Result>>,
@@ -60,16 +62,31 @@ public class HBaseSourceTarget extends HBaseTarget implements
   private static final PTableType<ImmutableBytesWritable, Result> PTYPE = Writables.tableOf(
       Writables.writables(ImmutableBytesWritable.class), Writables.writables(Result.class));
 
-  protected Scan scan;
-  private FormatBundle<TableInputFormat> inputBundle;
+  protected Scan[] scans;
+  protected String scansAsString;
+  private FormatBundle<MultiTableInputFormat> inputBundle;
   
-  public HBaseSourceTarget(String table, Scan scan) {
+  public HBaseSourceTarget(String table, Scan... scans) {
     super(table);
-    this.scan = scan;
+    this.scans = scans;
+
     try {
-      this.inputBundle = FormatBundle.forInput(TableInputFormat.class)
-          .set(TableInputFormat.INPUT_TABLE, table)
-          .set(TableInputFormat.SCAN, convertScanToString(scan));
+
+      byte[] tableName = Bytes.toBytes(table);
+      //Copy scans and enforce that they are for the table specified
+      Scan[] tableScans = new Scan[scans.length];
+      String[] scanStrings = new String[scans.length];
+      for(int i = 0; i < scans.length; i++){
+        tableScans[i] =  new Scan(scans[i]);
+        //enforce Scan is for same table
+        tableScans[i].setAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME, tableName);
+        //Convert the Scan into a String
+        scanStrings[i] = convertScanToString(tableScans[i]);
+      }
+      this.scans = tableScans;
+      this.scansAsString = StringUtils.arrayToString(scanStrings);
+      this.inputBundle = FormatBundle.forInput(MultiTableInputFormat.class)
+          .set(MultiTableInputFormat.SCANS, scansAsString);
     } catch (IOException e) {
       throw new RuntimeException(e);
     }
@@ -103,7 +120,7 @@ public class HBaseSourceTarget extends HBaseTarget implements
 
   @Override
   public int hashCode() {
-    return new HashCodeBuilder().append(table).append(scan).toHashCode();
+    return new HashCodeBuilder().append(table).append(scansAsString).toHashCode();
   }
 
   @Override
@@ -161,16 +178,12 @@ public class HBaseSourceTarget extends HBaseTarget implements
   public Iterable<Pair<ImmutableBytesWritable, Result>> read(Configuration conf) throws IOException {
     Configuration hconf = HBaseConfiguration.create(conf);
     HTable htable = new HTable(hconf, table);
-    return new HTableIterable(htable, scan);
+    return new HTableIterable(htable, scans);
   }
 
   @Override
   public ReadableData<Pair<ImmutableBytesWritable, Result>> asReadable() {
-    try {
-      return new HBaseData(table, convertScanToString(scan), this);
-    } catch (IOException e) {
-      throw new RuntimeException(e);
-    }
+      return new HBaseData(table, scansAsString, this);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/crunch/blob/8fe96d6c/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HTableIterable.java
----------------------------------------------------------------------
diff --git a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HTableIterable.java b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HTableIterable.java
index c58732c..a3dfc7d 100644
--- a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HTableIterable.java
+++ b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HTableIterable.java
@@ -26,23 +26,20 @@ import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 
 import java.io.IOException;
+import java.util.Arrays;
 import java.util.Iterator;
 
 class HTableIterable implements Iterable<Pair<ImmutableBytesWritable, Result>> {
   private final HTable table;
-  private final Scan scan;
+  private final Scan[] scans;
 
-  public HTableIterable(HTable table, Scan scan) {
+  public HTableIterable(HTable table, Scan... scans) {
     this.table = table;
-    this.scan = scan;
+    this.scans = scans;
   }
 
   @Override
   public Iterator<Pair<ImmutableBytesWritable, Result>> iterator() {
-    try {
-      return new HTableIterator(table, table.getScanner(scan));
-    } catch (IOException e) {
-      throw new RuntimeException(e);
-    }
+      return new HTableIterator(table, Arrays.asList(scans));
   }
 }

http://git-wip-us.apache.org/repos/asf/crunch/blob/8fe96d6c/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HTableIterator.java
----------------------------------------------------------------------
diff --git a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HTableIterator.java b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HTableIterator.java
index daa4a48..d679b72 100644
--- a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HTableIterator.java
+++ b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HTableIterator.java
@@ -25,21 +25,30 @@ import org.apache.crunch.Pair;
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes;
 
 import java.io.IOException;
 import java.util.Iterator;
+import java.util.List;
 
 class HTableIterator implements Iterator<Pair<ImmutableBytesWritable, Result>> {
   private static final Log LOG = LogFactory.getLog(HTableIterator.class);
 
   private final HTable table;
-  private final ResultScanner scanner;
-  private final Iterator<Result> iter;
+  private final Iterator<Scan> scans;
+  private ResultScanner scanner;
+  private Iterator<Result> iter;
 
-  public HTableIterator(HTable table, ResultScanner scanner) {
+  public HTableIterator(HTable table, List<Scan> scans) {
     this.table = table;
-    this.scanner = scanner;
+    this.scans = scans.iterator();
+    try{
+      this.scanner = table.getScanner(this.scans.next());
+    }catch(IOException ioe){
+      throw new RuntimeException(ioe);
+    }
     this.iter = scanner.iterator();
   }
 
@@ -48,10 +57,20 @@ class HTableIterator implements Iterator<Pair<ImmutableBytesWritable, Result>> {
     boolean hasNext = iter.hasNext();
     if (!hasNext) {
       scanner.close();
-      try {
-        table.close();
-      } catch (IOException e) {
-        LOG.error("Exception closing HTable: " + table.getTableName(), e);
+      hasNext = scans.hasNext();
+      if(hasNext){
+        try{
+          scanner = table.getScanner(this.scans.next());
+          iter = scanner.iterator();
+        } catch(IOException ioe){
+          throw new RuntimeException("Unable to create next scanner from "+ Bytes.toString(table.getTableName()), ioe);
+        }
+      } else {
+        try {
+          table.close();
+        } catch (IOException e) {
+          LOG.error("Exception closing HTable: " + Bytes.toString(table.getTableName()), e);
+        }
       }
     }
     return hasNext;

http://git-wip-us.apache.org/repos/asf/crunch/blob/8fe96d6c/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 4f82457..224d5a6 100644
--- a/pom.xml
+++ b/pom.xml
@@ -86,7 +86,7 @@ under the License.
     <mockito.version>1.9.0</mockito.version>
     <pkg>org.apache.crunch</pkg>
     <hadoop.version>1.1.2</hadoop.version>
-    <hbase.version>0.94.3</hbase.version>
+    <hbase.version>0.94.15</hbase.version>
 
     <scala.base.version>2.10</scala.base.version>
     <scala.version>2.10.4</scala.version>