You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ns...@apache.org on 2011/10/11 04:21:04 UTC
svn commit: r1181568 - in /hbase/branches/0.89/src:
main/java/org/apache/hadoop/hbase/client/
main/java/org/apache/hadoop/hbase/mapreduce/
test/java/org/apache/hadoop/hbase/mapreduce/
Author: nspiegelberg
Date: Tue Oct 11 02:21:01 2011
New Revision: 1181568
URL: http://svn.apache.org/viewvc?rev=1181568&view=rev
Log:
Adding RowMutation for being able to do both Delete and Put in MR jobs
Summary:
I have added a union-style class called RowMutation to encapsulate either a
Delete or a Put in MapReduce jobs.
This is going to be useful in gathering up mutations for batch import jobs that
also require deletes to be made available.
I have also added a matching RowMutationSortReducer modified from the initial
PutSortReducer so as to know how to sort these elements in a MR job.
Finally, I added a line for selecting the proper reducer in the
HFileOutputFormatter, due to the rather hacky nature of checking for instances
of input...
Test Plan:
I've implemented a small unit test which creates a MR job with a mapper that
statically outputs a Put and a Delete. The new reducer properly sorts them and
outputs them and the test wil dump to a file using HFileOutputFormat.
Reviewed By: kannan
Reviewers: kannan, jfan, nspiegelberg
Commenters: nspiegelberg, gqchen
CC: kannan, nspiegelberg, gqchen, bogdan
Differential Revision: 267252
Added:
hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/client/RowMutation.java
hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/mapreduce/RowMutationSortReducer.java
Modified:
hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/client/Row.java
hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java
hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java
Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/client/Row.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/client/Row.java?rev=1181568&r1=1181567&r2=1181568&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/client/Row.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/client/Row.java Tue Oct 11 02:21:01 2011
@@ -19,10 +19,12 @@
*/
package org.apache.hadoop.hbase.client;
+import org.apache.hadoop.io.WritableComparable;
+
/**
* Has a row.
*/
-interface Row {
+public interface Row extends WritableComparable<Row> {
/**
* @return The row.
*/
Added: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/client/RowMutation.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/client/RowMutation.java?rev=1181568&view=auto
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/client/RowMutation.java (added)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/client/RowMutation.java Tue Oct 11 02:21:01 2011
@@ -0,0 +1,147 @@
+/**
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * Class to be used in Bulk Import MR jobs to be able to pass both Deletes and Puts in as parameters.
+ */
+public class RowMutation implements Row {
+
+ /**
+ * Enum used for serializing
+ */
+ public static enum Type {
+ Put((byte)4),
+ Delete((byte)8);
+
+ private final byte code;
+
+ Type(final byte c) {
+ this.code = c;
+ }
+
+ /**
+ * @return the code of this union
+ */
+ public byte getCode() {
+ return this.code;
+ }
+ }
+
+ /**
+ * Union type of field to hold either a Put or Delete
+ * Useful for abstractions
+ */
+ private Row row = null;
+
+ /**
+ * To be used for Writable.
+ * DO NOT USE!!!
+ */
+ public RowMutation() {}
+
+ /**
+ * Copy constructor
+ * @param r the item to copy
+ * @throws IOException if passed parameter is not of required type
+ */
+ public RowMutation(final RowMutation r)
+ throws IOException {
+ if (null == r) {
+ throw new IOException("Cannot pass a null object to constructor");
+ }
+
+ row = r.getInstance();
+ }
+
+ /**
+ * Constructor to set the inner union style field.
+ * @param request -- the Put or Delete to be executed
+ * @throws IOException if passed parameter is not of required type
+ */
+ public RowMutation(final WritableComparable<Row> request)
+ throws IOException {
+ if(request instanceof Put) {
+ row = new Put((Put)request);
+ } else if(request instanceof Delete) {
+ row = new Delete((Delete)request);
+ } else {
+ throw new IOException("Must pass either a Delete or a Put");
+ }
+ }
+
+ /**
+ * Method for getting the Row instance from inside
+ * @return row
+ */
+ public Row getInstance() {
+ return row;
+ }
+
+ @Override
+ public int compareTo(Row o) {
+ return row.compareTo(o);
+ }
+
+ @Override
+ public byte[] getRow() {
+ return row.getRow();
+ }
+
+ @Override
+ public void readFields(DataInput in)
+ throws IOException {
+ byte b = in.readByte();
+
+ if(Type.Put.getCode() == b) {
+ row = new Put();
+ } else if(Type.Delete.getCode() == b) {
+ row = new Delete();
+ } else {
+ throw new IOException("Tried to read an invalid type of serialized object!");
+ }
+
+ row.readFields(in);
+ }
+
+ @Override
+ public void write(DataOutput out)
+ throws IOException {
+ byte b = 0;
+
+ if(row instanceof Put) {
+ b = Type.Put.getCode();
+ } else if(row instanceof Delete) {
+ b = Type.Delete.getCode();
+ } else {
+ throw new IOException("Tried to write an invalid type of object to serialize!");
+ }
+
+ out.write(b);
+ row.write(out);
+ }
+
+}
\ No newline at end of file
Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java?rev=1181568&r1=1181567&r2=1181568&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java Tue Oct 11 02:21:01 2011
@@ -42,6 +42,7 @@ import org.apache.hadoop.hbase.HTableDes
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.RowMutation;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.io.hfile.Compression;
import org.apache.hadoop.hbase.io.hfile.HFile;
@@ -307,6 +308,8 @@ public class HFileOutputFormat extends F
job.setReducerClass(KeyValueSortReducer.class);
} else if (Put.class.equals(job.getMapOutputValueClass())) {
job.setReducerClass(PutSortReducer.class);
+ } else if (RowMutation.class.equals(job.getMapOutputValueClass())) {
+ job.setReducerClass(RowMutationSortReducer.class);
} else {
LOG.warn("Unknown map output value type:" + job.getMapOutputValueClass());
}
Added: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/mapreduce/RowMutationSortReducer.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/mapreduce/RowMutationSortReducer.java?rev=1181568&view=auto
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/mapreduce/RowMutationSortReducer.java (added)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/mapreduce/RowMutationSortReducer.java Tue Oct 11 02:21:01 2011
@@ -0,0 +1,99 @@
+/**
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeSet;
+
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Row;
+import org.apache.hadoop.hbase.client.RowMutation;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.util.StringUtils;
+
+/**
+ * Emits sorted Puts and Deletes.
+ * Reads in all Puts and Deletes from passed Iterator (over RowMutation elements), sorts them, then emits
+ * KeyValue items in sorted order. If lots of columns per row, it will use lots of
+ * memory sorting.
+ * @see HFileOutputFormat
+ * @see KeyValueSortReducer
+ */
+public class RowMutationSortReducer extends
+ Reducer<ImmutableBytesWritable, RowMutation, ImmutableBytesWritable, KeyValue> {
+
+ @Override
+ protected void reduce(
+ ImmutableBytesWritable row,
+ java.lang.Iterable<RowMutation> requests,
+ Reducer<ImmutableBytesWritable, RowMutation,
+ ImmutableBytesWritable, KeyValue>.Context context)
+ throws java.io.IOException, InterruptedException
+ {
+ // although reduce() is called per-row, handle pathological case
+ long threshold = context.getConfiguration().getLong(
+ "rowmutationsortreducer.row.threshold", 2L * (1<<30));
+ Iterator<RowMutation> iter = requests.iterator();
+ while (iter.hasNext()) {
+ TreeSet<KeyValue> map = new TreeSet<KeyValue>(KeyValue.COMPARATOR);
+ long curSize = 0;
+ // stop at the end or the RAM threshold
+ while (iter.hasNext() && curSize < threshold) {
+ Row r = iter.next().getInstance();
+ Map< byte[], List<KeyValue> > familyMap;
+ if (r instanceof Put) {
+ familyMap = ((Put) r).getFamilyMap();
+ } else if (r instanceof Delete) {
+ familyMap = ((Delete) r).getFamilyMap();
+ } else {
+ familyMap = null;
+ }
+
+ if (null != familyMap) {
+ for (List<KeyValue> kvs : familyMap.values()) {
+ for (KeyValue kv : kvs) {
+ map.add(kv);
+ curSize += kv.getValueLength();
+ }
+ }
+ }
+ }
+ context.setStatus("Read " + map.size() + " entries of " + map.getClass()
+ + "(" + StringUtils.humanReadableInt(curSize) + ")");
+ int index = 0;
+ for (KeyValue kv : map) {
+ context.write(row, kv);
+ if (index > 0 && index % 100 == 0)
+ context.setStatus("Wrote " + index);
+ }
+
+ // if we have more entries to process
+ if (iter.hasNext()) {
+ // force flush because we cannot guarantee intra-row sorted order
+ context.write(null, null);
+ }
+ }
+ }
+}
\ No newline at end of file
Modified: hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java?rev=1181568&r1=1181567&r2=1181568&view=diff
==============================================================================
--- hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java (original)
+++ hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java Tue Oct 11 02:21:01 2011
@@ -46,11 +46,17 @@ import org.apache.hadoop.hbase.HConstant
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.PerformanceEvaluation;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Row;
+
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.RowMutation;
+
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.io.hfile.Compression;
import org.apache.hadoop.hbase.io.hfile.Compression.Algorithm;
@@ -124,7 +130,6 @@ public class TestHFileOutputFormat {
ImmutableBytesWritable,KeyValue>.Context context)
throws java.io.IOException ,InterruptedException
{
-
byte keyBytes[] = new byte[keyLength];
byte valBytes[] = new byte[valLength];
@@ -154,7 +159,6 @@ public class TestHFileOutputFormat {
util.cleanupTestDir();
}
-
private void setupRandomGeneratorMapper(Job job) {
job.setInputFormatClass(NMapInputFormat.class);
job.setMapperClass(RandomKVGeneratingMapper.class);
@@ -162,6 +166,104 @@ public class TestHFileOutputFormat {
job.setMapOutputValueClass(KeyValue.class);
}
+ static class RowSorterMapper
+ extends Mapper<NullWritable, NullWritable, ImmutableBytesWritable, RowMutation> {
+ @Override
+ protected void map(
+ NullWritable n1, NullWritable n2,
+ Mapper<NullWritable, NullWritable,
+ ImmutableBytesWritable, RowMutation>.Context context)
+ throws IOException ,InterruptedException
+ {
+ byte[] row = Bytes.toBytes("row1");
+
+ // need one for every task...
+ byte[] key = Bytes.toBytes("key");
+
+ byte[] col1 = Bytes.toBytes("col1");
+ byte[] col2 = Bytes.toBytes("col2");
+ byte[] col3 = Bytes.toBytes("col3");
+
+ // PUT cf=info-A
+ Row put1 = new Put(row).add(
+ TestHFileOutputFormat.FAMILIES[0], col1, 10, Bytes.toBytes("val10"));
+ Row put2 = new Put(row).add(
+ TestHFileOutputFormat.FAMILIES[0], col2, 11, Bytes.toBytes("val11"));
+
+ // PUT cf=info-B
+ Row put3 = new Put(row).add(
+ TestHFileOutputFormat.FAMILIES[1], col1, 20, Bytes.toBytes("val20"));
+ Row put4 = new Put(row).add(
+ TestHFileOutputFormat.FAMILIES[1], col2, 21, Bytes.toBytes("val21"));
+
+ // PUT new column
+ Row put5 = new Put(row).add(
+ TestHFileOutputFormat.FAMILIES[1], col3, 30, Bytes.toBytes("val30"));
+ Row put6 = new Put(row).add(
+ TestHFileOutputFormat.FAMILIES[1], col3, 31, Bytes.toBytes("val31"));
+ Row put7 = new Put(row).add(
+ TestHFileOutputFormat.FAMILIES[1], col3, 32, Bytes.toBytes("val32"));
+
+ // DELETEs
+ Row del1 = new Delete(row).deleteColumn(
+ TestHFileOutputFormat.FAMILIES[1], col2, 21);
+
+ Row del2 = new Delete(row).deleteFamily(
+ TestHFileOutputFormat.FAMILIES[0]);
+
+ Row del3 = new Delete(row).deleteColumns(
+ TestHFileOutputFormat.FAMILIES[1], col3);
+
+ context.write(new ImmutableBytesWritable(key), new RowMutation(put1));
+ context.write(new ImmutableBytesWritable(key), new RowMutation(put2));
+
+ context.write(new ImmutableBytesWritable(key), new RowMutation(put3));
+ context.write(new ImmutableBytesWritable(key), new RowMutation(put4));
+ context.write(new ImmutableBytesWritable(key), new RowMutation(put5));
+ context.write(new ImmutableBytesWritable(key), new RowMutation(put6));
+ context.write(new ImmutableBytesWritable(key), new RowMutation(put7));
+
+ context.write(new ImmutableBytesWritable(key), new RowMutation(del1));
+ context.write(new ImmutableBytesWritable(key), new RowMutation(del2));
+ context.write(new ImmutableBytesWritable(key), new RowMutation(del3));
+ }
+ }
+
+ /**
+ * Test for the union style MR jobs that runs both Put and Delete requests
+ * @throws Exception on job, sorting, IO or fs errors
+ */
+ @Test
+ public void testRowSortReducer()
+ throws Exception {
+ Configuration conf = new Configuration(this.util.getConfiguration());
+ conf.setInt("io.sort.mb", 20);
+
+ Path dir = HBaseTestingUtility.getTestDir("testRowSortReducer");
+
+ try {
+ Job job = new Job(conf);
+
+ job.setInputFormatClass(NMapInputFormat.class);
+ job.setOutputFormatClass(HFileOutputFormat.class);
+
+ job.setMapperClass(RowSorterMapper.class); // local
+ job.setReducerClass(RowMutationSortReducer.class);
+
+ job.setOutputKeyClass(ImmutableBytesWritable.class);
+ job.setOutputValueClass(KeyValue.class);
+
+ job.setMapOutputKeyClass(ImmutableBytesWritable.class);
+ job.setMapOutputValueClass(RowMutation.class);
+
+ FileOutputFormat.setOutputPath(job, dir);
+
+ assertTrue(job.waitForCompletion(false));
+ } finally {
+// dir.getFileSystem(conf).delete(dir, true);
+ }
+ }
+
/**
* Test that {@link HFileOutputFormat} RecordWriter amends timestamps if
* passed a keyvalue whose timestamp is {@link HConstants#LATEST_TIMESTAMP}.
@@ -367,8 +469,6 @@ public class TestHFileOutputFormat {
}
}
-
-
private void runIncrementalPELoad(
Configuration conf, HTable table, Path outDir)
throws Exception {