You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by jw...@apache.org on 2012/12/05 01:40:57 UTC

git commit: CRUNCH-123: Add support for Deletes to crunch-hbase. Contributed by Micah Whitacre.

Updated Branches:
  refs/heads/master 63050d0d4 -> 374bf3de6


CRUNCH-123: Add support for Deletes to crunch-hbase. Contributed by Micah Whitacre.


Project: http://git-wip-us.apache.org/repos/asf/incubator-crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-crunch/commit/374bf3de
Tree: http://git-wip-us.apache.org/repos/asf/incubator-crunch/tree/374bf3de
Diff: http://git-wip-us.apache.org/repos/asf/incubator-crunch/diff/374bf3de

Branch: refs/heads/master
Commit: 374bf3de658f8e519da0fc7306f31793e4c075b2
Parents: 63050d0
Author: Josh Wills <jw...@apache.org>
Authored: Tue Dec 4 16:36:57 2012 -0800
Committer: Josh Wills <jw...@apache.org>
Committed: Tue Dec 4 16:36:57 2012 -0800

----------------------------------------------------------------------
 .../apache/crunch/io/hbase/WordCountHBaseIT.java   |   35 +++++++++++++++
 .../org/apache/crunch/io/hbase/HBaseTarget.java    |    3 +-
 2 files changed, 37 insertions(+), 1 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/374bf3de/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/WordCountHBaseIT.java
----------------------------------------------------------------------
diff --git a/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/WordCountHBaseIT.java b/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/WordCountHBaseIT.java
index f13edeb..51abdaa 100644
--- a/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/WordCountHBaseIT.java
+++ b/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/WordCountHBaseIT.java
@@ -50,6 +50,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Put;
@@ -97,6 +98,18 @@ public class WordCountHBaseIT {
 
     }, Writables.writables(Put.class));
   }
+  
+  @SuppressWarnings("serial")
+  public static PCollection<Delete> clearCounts(PTable<ImmutableBytesWritable, Result> counts) {
+    return counts.parallelDo("convert to delete", new DoFn<Pair<ImmutableBytesWritable, Result>, Delete>() {
+      @Override
+      public void process(Pair<ImmutableBytesWritable, Result> input, Emitter<Delete> emitter) {
+        Delete delete = new Delete(input.first().get());
+        emitter.emit(delete);
+      }
+
+    }, Writables.writables(Delete.class));
+  }
 
   @Before
   public void setUp() throws Exception {
@@ -151,6 +164,7 @@ public class WordCountHBaseIT {
       jarUp(jos, baseDir, prefix + "WordCountHBaseIT.class");
       jarUp(jos, baseDir, prefix + "WordCountHBaseIT$1.class");
       jarUp(jos, baseDir, prefix + "WordCountHBaseIT$2.class");
+      jarUp(jos, baseDir, prefix + "WordCountHBaseIT$3.class");
       jos.close();
 
       Path target = new Path(tmpPath, "WordCountHBaseIT.jar");
@@ -205,6 +219,20 @@ public class WordCountHBaseIT {
 
       assertIsLong(outputTable, "cat", 2);
       assertIsLong(outputTable, "dog", 1);
+      
+      //verify HBaseTarget supports deletes.
+      Scan clearScan = new Scan();
+      clearScan.addColumn(COUNTS_COLFAM, null);
+      pipeline = new MRPipeline(WordCountHBaseIT.class, hbaseTestUtil.getConfiguration());
+      HBaseSourceTarget clearSource = new HBaseSourceTarget(outputTableName, clearScan);
+      PTable<ImmutableBytesWritable, Result> counts = pipeline.read(clearSource);
+      pipeline.write(clearCounts(counts), new HBaseTarget(outputTableName));
+      pipeline.done();
+      
+      assertDeleted(outputTable, "cat");
+      assertDeleted(outputTable, "dog");
+      
+      
     } finally {
       // not quite sure...
     }
@@ -226,4 +254,11 @@ public class WordCountHBaseIT {
     assertTrue(rawCount != null);
     assertEquals(new Long(i), new Long(Bytes.toLong(rawCount)));
   }
+  
+  protected void assertDeleted(HTable table, String key) throws IOException {
+      Get get = new Get(Bytes.toBytes(key));
+      get.addColumn(COUNTS_COLFAM, null);
+      Result result = table.get(get);
+      assertTrue(result.isEmpty());
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/374bf3de/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseTarget.java
----------------------------------------------------------------------
diff --git a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseTarget.java b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseTarget.java
index c659c86..48593b8 100644
--- a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseTarget.java
+++ b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseTarget.java
@@ -29,6 +29,7 @@ import org.apache.crunch.types.PType;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
@@ -69,7 +70,7 @@ public class HBaseTarget implements MapReduceTarget {
 
   @Override
   public boolean accept(OutputHandler handler, PType<?> ptype) {
-    if (Put.class.equals(ptype.getTypeClass())) {
+    if (Put.class.equals(ptype.getTypeClass()) || Delete.class.equals(ptype.getTypeClass())) {
       handler.configure(this, ptype);
       return true;
     }