You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by jw...@apache.org on 2012/12/05 01:40:57 UTC
git commit: CRUNCH-123: Add support for Deletes to crunch-hbase.
Contributed by Micah Whitacre.
Updated Branches:
refs/heads/master 63050d0d4 -> 374bf3de6
CRUNCH-123: Add support for Deletes to crunch-hbase. Contributed by Micah Whitacre.
Project: http://git-wip-us.apache.org/repos/asf/incubator-crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-crunch/commit/374bf3de
Tree: http://git-wip-us.apache.org/repos/asf/incubator-crunch/tree/374bf3de
Diff: http://git-wip-us.apache.org/repos/asf/incubator-crunch/diff/374bf3de
Branch: refs/heads/master
Commit: 374bf3de658f8e519da0fc7306f31793e4c075b2
Parents: 63050d0
Author: Josh Wills <jw...@apache.org>
Authored: Tue Dec 4 16:36:57 2012 -0800
Committer: Josh Wills <jw...@apache.org>
Committed: Tue Dec 4 16:36:57 2012 -0800
----------------------------------------------------------------------
.../apache/crunch/io/hbase/WordCountHBaseIT.java | 35 +++++++++++++++
.../org/apache/crunch/io/hbase/HBaseTarget.java | 3 +-
2 files changed, 37 insertions(+), 1 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/374bf3de/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/WordCountHBaseIT.java
----------------------------------------------------------------------
diff --git a/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/WordCountHBaseIT.java b/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/WordCountHBaseIT.java
index f13edeb..51abdaa 100644
--- a/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/WordCountHBaseIT.java
+++ b/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/WordCountHBaseIT.java
@@ -50,6 +50,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
@@ -97,6 +98,18 @@ public class WordCountHBaseIT {
}, Writables.writables(Put.class));
}
+
+ @SuppressWarnings("serial")
+ public static PCollection<Delete> clearCounts(PTable<ImmutableBytesWritable, Result> counts) {
+ return counts.parallelDo("convert to delete", new DoFn<Pair<ImmutableBytesWritable, Result>, Delete>() {
+ @Override
+ public void process(Pair<ImmutableBytesWritable, Result> input, Emitter<Delete> emitter) {
+ Delete delete = new Delete(input.first().get());
+ emitter.emit(delete);
+ }
+
+ }, Writables.writables(Delete.class));
+ }
@Before
public void setUp() throws Exception {
@@ -151,6 +164,7 @@ public class WordCountHBaseIT {
jarUp(jos, baseDir, prefix + "WordCountHBaseIT.class");
jarUp(jos, baseDir, prefix + "WordCountHBaseIT$1.class");
jarUp(jos, baseDir, prefix + "WordCountHBaseIT$2.class");
+ jarUp(jos, baseDir, prefix + "WordCountHBaseIT$3.class");
jos.close();
Path target = new Path(tmpPath, "WordCountHBaseIT.jar");
@@ -205,6 +219,20 @@ public class WordCountHBaseIT {
assertIsLong(outputTable, "cat", 2);
assertIsLong(outputTable, "dog", 1);
+
+ //verify HBaseTarget supports deletes.
+ Scan clearScan = new Scan();
+ clearScan.addColumn(COUNTS_COLFAM, null);
+ pipeline = new MRPipeline(WordCountHBaseIT.class, hbaseTestUtil.getConfiguration());
+ HBaseSourceTarget clearSource = new HBaseSourceTarget(outputTableName, clearScan);
+ PTable<ImmutableBytesWritable, Result> counts = pipeline.read(clearSource);
+ pipeline.write(clearCounts(counts), new HBaseTarget(outputTableName));
+ pipeline.done();
+
+ assertDeleted(outputTable, "cat");
+ assertDeleted(outputTable, "dog");
+
+
} finally {
// not quite sure...
}
@@ -226,4 +254,11 @@ public class WordCountHBaseIT {
assertTrue(rawCount != null);
assertEquals(new Long(i), new Long(Bytes.toLong(rawCount)));
}
+
+ protected void assertDeleted(HTable table, String key) throws IOException {
+ Get get = new Get(Bytes.toBytes(key));
+ get.addColumn(COUNTS_COLFAM, null);
+ Result result = table.get(get);
+ assertTrue(result.isEmpty());
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/374bf3de/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseTarget.java
----------------------------------------------------------------------
diff --git a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseTarget.java b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseTarget.java
index c659c86..48593b8 100644
--- a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseTarget.java
+++ b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseTarget.java
@@ -29,6 +29,7 @@ import org.apache.crunch.types.PType;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
@@ -69,7 +70,7 @@ public class HBaseTarget implements MapReduceTarget {
@Override
public boolean accept(OutputHandler handler, PType<?> ptype) {
- if (Put.class.equals(ptype.getTypeClass())) {
+ if (Put.class.equals(ptype.getTypeClass()) || Delete.class.equals(ptype.getTypeClass())) {
handler.configure(this, ptype);
return true;
}