You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ns...@apache.org on 2011/10/11 04:09:32 UTC
svn commit: r1181452 - in /hbase/branches/0.89/src:
main/java/org/apache/hadoop/hbase/mapreduce/
test/java/org/apache/hadoop/hbase/mapreduce/
Author: nspiegelberg
Date: Tue Oct 11 02:09:31 2011
New Revision: 1181452
URL: http://svn.apache.org/viewvc?rev=1181452&view=rev
Log:
HBASE-1861 : Multi-CF support for HFileOutputFormat
Summary:
Added ability for multiple column families to be concurrently written to
by HFOF.
Removed unnecessary dependency on Google jar
Test Plan:
- ant test-core
DiffCamp Revision: 183938
Reviewed By: kannan
CC: nspiegelberg, kannan, ruifang
Revert Plan:
OK
Modified:
hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java
hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java
hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/mapreduce/PutSortReducer.java
hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java
hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java
Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java?rev=1181452&r1=1181451&r2=1181452&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java Tue Oct 11 02:09:31 2011
@@ -53,8 +53,6 @@ import org.apache.hadoop.mapreduce.lib.o
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import com.google.common.base.Preconditions;
-
/**
* Writes HFiles. Passed KeyValues must arrive in order.
* Currently, can only write files to a single column family at a
@@ -74,8 +72,10 @@ public class HFileOutputFormat extends F
Configuration conf = context.getConfiguration();
final FileSystem fs = outputdir.getFileSystem(conf);
// These configs. are from hbase-*.xml
- final long maxsize = conf.getLong("hbase.hregion.max.filesize", 268435456);
- final int blocksize = conf.getInt("hfile.min.blocksize.size", 65536);
+ final long maxsize = conf.getLong("hbase.hregion.max.filesize",
+ HConstants.DEFAULT_MAX_FILE_SIZE);
+ final int blocksize = conf.getInt("hfile.min.blocksize.size",
+ HFile.DEFAULT_BLOCKSIZE);
// Invented config. Add to hbase-*.xml if other than default compression.
final String compression = conf.get("hfile.compression",
Compression.Algorithm.NONE.getName());
@@ -87,48 +87,77 @@ public class HFileOutputFormat extends F
new TreeMap<byte [], WriterLength>(Bytes.BYTES_COMPARATOR);
private byte [] previousRow = HConstants.EMPTY_BYTE_ARRAY;
private final byte [] now = Bytes.toBytes(System.currentTimeMillis());
+ private boolean rollRequested = false;
public void write(ImmutableBytesWritable row, KeyValue kv)
throws IOException {
+ // null input == user explicitly wants to flush
+ if (row == null && kv == null) {
+ rollWriters();
+ return;
+ }
+
+ byte [] rowKey = kv.getRow();
long length = kv.getLength();
byte [] family = kv.getFamily();
WriterLength wl = this.writers.get(family);
- if (wl == null || ((length + wl.written) >= maxsize) &&
- Bytes.compareTo(this.previousRow, 0, this.previousRow.length,
- kv.getBuffer(), kv.getRowOffset(), kv.getRowLength()) != 0) {
- // Get a new writer.
- Path basedir = new Path(outputdir, Bytes.toString(family));
- if (wl == null) {
- wl = new WriterLength();
- this.writers.put(family, wl);
- if (this.writers.size() > 1) throw new IOException("One family only");
- // If wl == null, first file in family. Ensure family dir exits.
- if (!fs.exists(basedir)) fs.mkdirs(basedir);
- }
- wl.writer = getNewWriter(wl.writer, basedir);
- LOG.info("Writer=" + wl.writer.getPath() +
- ((wl.written == 0)? "": ", wrote=" + wl.written));
- wl.written = 0;
+
+ // If this is a new column family, verify that the directory exists
+ if (wl == null) {
+ fs.mkdirs(new Path(outputdir, Bytes.toString(family)));
+ }
+
+ // If any of the HFiles for the column families has reached
+ // maxsize, we need to roll all the writers
+ if (wl != null && wl.written + length >= maxsize) {
+ this.rollRequested = true;
+ }
+
+ // This can only happen once a row is finished though
+ if (rollRequested && Bytes.compareTo(this.previousRow, rowKey) != 0) {
+ rollWriters();
+ }
+
+ // create a new HLog writer, if necessary
+ if (wl == null || wl.writer == null) {
+ wl = getNewWriter(family);
}
+
+ // we now have the proper HLog writer. full steam ahead
kv.updateLatestStamp(this.now);
wl.writer.append(kv);
wl.written += length;
+
// Copy the row so we know when a row transition.
- this.previousRow = kv.getRow();
+ this.previousRow = rowKey;
}
- /* Create a new HFile.Writer. Close current if there is one.
- * @param writer
- * @param familydir
- * @return A new HFile.Writer.
+ private void rollWriters() throws IOException {
+ for (WriterLength wl : this.writers.values()) {
+ if (wl.writer != null) {
+ LOG.info("Writer=" + wl.writer.getPath() +
+ ((wl.written == 0)? "": ", wrote=" + wl.written));
+ close(wl.writer);
+ }
+ wl.writer = null;
+ wl.written = 0;
+ }
+ this.rollRequested = false;
+ }
+
+ /* Create a new HFile.Writer.
+ * @param family
+ * @return A WriterLength, containing a new HFile.Writer.
* @throws IOException
*/
- private HFile.Writer getNewWriter(final HFile.Writer writer,
- final Path familydir)
- throws IOException {
- close(writer);
- return new HFile.Writer(fs, StoreFile.getUniqueFile(fs, familydir),
- blocksize, bytesPerChecksum, compression, KeyValue.KEY_COMPARATOR);
+ private WriterLength getNewWriter(byte[] family) throws IOException {
+ WriterLength wl = new WriterLength();
+ Path familydir = new Path(outputdir, Bytes.toString(family));
+ wl.writer = new HFile.Writer(fs,
+ StoreFile.getUniqueFile(fs, familydir), blocksize,
+ bytesPerChecksum, compression, KeyValue.KEY_COMPARATOR);
+ this.writers.put(family, wl);
+ return wl;
}
private void close(final HFile.Writer w) throws IOException {
@@ -145,8 +174,8 @@ public class HFileOutputFormat extends F
public void close(TaskAttemptContext c)
throws IOException, InterruptedException {
- for (Map.Entry<byte [], WriterLength> e: this.writers.entrySet()) {
- close(e.getValue().writer);
+ for (WriterLength wl: this.writers.values()) {
+ close(wl.writer);
}
}
};
@@ -183,7 +212,9 @@ public class HFileOutputFormat extends F
*/
private static void writePartitions(Configuration conf, Path partitionsPath,
List<ImmutableBytesWritable> startKeys) throws IOException {
- Preconditions.checkArgument(!startKeys.isEmpty(), "No regions passed");
+ if (startKeys.isEmpty()) {
+ throw new IllegalArgumentException("No regions passed");
+ }
// We're generating a list of split points, and we don't ever
// have keys < the first region (which has an empty start key)
@@ -193,10 +224,11 @@ public class HFileOutputFormat extends F
new TreeSet<ImmutableBytesWritable>(startKeys);
ImmutableBytesWritable first = sorted.first();
- Preconditions.checkArgument(
- first.equals(HConstants.EMPTY_BYTE_ARRAY),
- "First region of table should have empty start key. Instead has: %s",
- Bytes.toStringBinary(first.get()));
+ if (!first.equals(HConstants.EMPTY_BYTE_ARRAY)) {
+ throw new IllegalArgumentException(
+ "First region of table should have empty start key. Instead has: "
+ + Bytes.toStringBinary(first.get()));
+ }
sorted.remove(first);
// Write the actual file
Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java?rev=1181452&r1=1181451&r2=1181452&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java Tue Oct 11 02:09:31 2011
@@ -290,6 +290,8 @@ public class ImportTsv {
}
TableMapReduceUtil.addDependencyJars(job);
+ TableMapReduceUtil.addDependencyJars(job.getConfiguration(),
+ com.google.common.base.Function.class /* Guava used by TsvParser */);
return job;
}
Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/mapreduce/PutSortReducer.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/mapreduce/PutSortReducer.java?rev=1181452&r1=1181451&r2=1181452&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/mapreduce/PutSortReducer.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/mapreduce/PutSortReducer.java Tue Oct 11 02:09:31 2011
@@ -19,6 +19,7 @@
*/
package org.apache.hadoop.hbase.mapreduce;
+import java.util.Iterator;
import java.util.List;
import java.util.TreeSet;
@@ -26,6 +27,7 @@ import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.util.StringUtils;
/**
* Emits sorted Puts.
@@ -46,21 +48,37 @@ public class PutSortReducer extends
ImmutableBytesWritable, KeyValue>.Context context)
throws java.io.IOException, InterruptedException
{
- TreeSet<KeyValue> map = new TreeSet<KeyValue>(KeyValue.COMPARATOR);
-
- for (Put p : puts) {
- for (List<KeyValue> kvs : p.getFamilyMap().values()) {
- for (KeyValue kv : kvs) {
- map.add(kv.clone());
+ // although reduce() is called per-row, handle pathological case
+ long threshold = context.getConfiguration().getLong(
+ "putsortreducer.row.threshold", 2L * (1<<30));
+ Iterator<Put> iter = puts.iterator();
+ while (iter.hasNext()) {
+ TreeSet<KeyValue> map = new TreeSet<KeyValue>(KeyValue.COMPARATOR);
+ long curSize = 0;
+ // stop at the end or the RAM threshold
+ while (iter.hasNext() && curSize < threshold) {
+ Put p = iter.next();
+ for (List<KeyValue> kvs : p.getFamilyMap().values()) {
+ for (KeyValue kv : kvs) {
+ map.add(kv);
+ curSize += kv.getValueLength();
+ }
}
}
- }
- context.setStatus("Read " + map.getClass());
- int index = 0;
- for (KeyValue kv : map) {
- context.write(row, kv);
- if (index > 0 && index % 100 == 0)
- context.setStatus("Wrote " + index);
+ context.setStatus("Read " + map.size() + " entries of " + map.getClass()
+ + "(" + StringUtils.humanReadableInt(curSize) + ")");
+ int index = 0;
+ for (KeyValue kv : map) {
+ context.write(row, kv);
+ if (index > 0 && index % 100 == 0)
+ context.setStatus("Wrote " + index);
+ }
+
+ // if we have more entries to process
+ if (iter.hasNext()) {
+ // force flush because we cannot guarantee intra-row sorted order
+ context.write(null, null);
+ }
}
}
}
Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java?rev=1181452&r1=1181451&r2=1181452&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java Tue Oct 11 02:09:31 2011
@@ -49,8 +49,6 @@ import org.apache.hadoop.util.StringUtil
import org.apache.hadoop.conf.Configuration;
import org.apache.zookeeper.ZooKeeper;
-import com.google.common.base.Function;
-
/**
* Utility for {@link TableMapper} and {@link TableReducer}
*/
@@ -247,7 +245,6 @@ public class TableMapReduceUtil {
try {
addDependencyJars(job.getConfiguration(),
ZooKeeper.class,
- Function.class, // Guava collections
job.getMapOutputKeyClass(),
job.getMapOutputValueClass(),
job.getOutputKeyClass(),
Modified: hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java?rev=1181452&r1=1181451&r2=1181452&view=diff
==============================================================================
--- hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java (original)
+++ hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java Tue Oct 11 02:09:31 2011
@@ -41,6 +41,9 @@ import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.PerformanceEvaluation;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.NullWritable;
@@ -64,7 +67,9 @@ import org.mockito.Mockito;
public class TestHFileOutputFormat {
private final static int ROWSPERSPLIT = 1024;
- private static final byte[] FAMILY_NAME = PerformanceEvaluation.FAMILY_NAME;
+ private static final byte[][] FAMILIES
+ = { Bytes.add(PerformanceEvaluation.FAMILY_NAME, Bytes.toBytes("-A"))
+ , Bytes.add(PerformanceEvaluation.FAMILY_NAME, Bytes.toBytes("-B"))};
private static final byte[] TABLE_NAME = Bytes.toBytes("TestTable");
private HBaseTestingUtility util = new HBaseTestingUtility();
@@ -118,9 +123,11 @@ public class TestHFileOutputFormat {
random.nextBytes(valBytes);
ImmutableBytesWritable key = new ImmutableBytesWritable(keyBytes);
- KeyValue kv = new KeyValue(keyBytes, PerformanceEvaluation.FAMILY_NAME,
- PerformanceEvaluation.QUALIFIER_NAME, valBytes);
- context.write(key, kv);
+ for (byte[] family : TestHFileOutputFormat.FAMILIES) {
+ KeyValue kv = new KeyValue(keyBytes, family,
+ PerformanceEvaluation.QUALIFIER_NAME, valBytes);
+ context.write(key, kv);
+ }
}
}
}
@@ -267,14 +274,14 @@ public class TestHFileOutputFormat {
try {
util.startMiniCluster();
HBaseAdmin admin = new HBaseAdmin(conf);
- HTable table = util.createTable(TABLE_NAME, FAMILY_NAME);
- int numRegions = util.createMultiRegions(
- util.getConfiguration(), table, FAMILY_NAME,
- startKeys);
- assertEquals("Should make 5 regions",
- numRegions, 5);
+ HTable table = util.createTable(TABLE_NAME, FAMILIES);
assertEquals("Should start with empty table",
0, util.countRows(table));
+ for (byte[] family : FAMILIES) {
+ int numRegions = util.createMultiRegions(
+ util.getConfiguration(), table, family, startKeys);
+ assertEquals("Should make 5 regions", numRegions, 5);
+ }
// Generate the bulk load files
util.startMiniMapReduceCluster();
@@ -283,12 +290,26 @@ public class TestHFileOutputFormat {
assertEquals("HFOF should not touch actual table",
0, util.countRows(table));
+ // Make sure that a directory was created for every CF
+ int dir = 0;
+ for (FileStatus f : testDir.getFileSystem(conf).listStatus(testDir)) {
+ for (byte[] family : FAMILIES) {
+ if (Bytes.toString(family).equals(f.getPath().getName())) {
+ ++dir;
+ }
+ }
+ }
+ assertEquals("Column family not found in FS.", FAMILIES.length, dir);
+
+ // handle the split case
if (shouldChangeRegions) {
LOG.info("Changing regions in table");
admin.disableTable(table.getTableName());
byte[][] newStartKeys = generateRandomStartKeys(15);
- util.createMultiRegions(util.getConfiguration(),
- table, FAMILY_NAME, newStartKeys);
+ for (byte[] family : FAMILIES) {
+ util.createMultiRegions(
+ util.getConfiguration(), table, family, newStartKeys);
+ }
admin.enableTable(table.getTableName());
while (table.getRegionsInfo().size() != 15 ||
!admin.isTableAvailable(table.getTableName())) {
@@ -304,6 +325,19 @@ public class TestHFileOutputFormat {
int expectedRows = conf.getInt("mapred.map.tasks", 1) * ROWSPERSPLIT;
assertEquals("LoadIncrementalHFiles should put expected data in table",
expectedRows, util.countRows(table));
+ Scan scan = new Scan();
+ ResultScanner results = table.getScanner(scan);
+ int count = 0;
+ for (Result res : results) {
+ count++;
+ assertEquals(FAMILIES.length, res.raw().length);
+ KeyValue first = res.raw()[0];
+ for (KeyValue kv : res.raw()) {
+ assertTrue(KeyValue.COMPARATOR.matchingRows(first, kv));
+ assertTrue(Bytes.equals(first.getValue(), kv.getValue()));
+ }
+ }
+ results.close();
String tableDigestBefore = util.checksumRows(table);
// Cause regions to reopen
@@ -348,11 +382,13 @@ public class TestHFileOutputFormat {
util = new HBaseTestingUtility(conf);
if ("newtable".equals(args[0])) {
byte[] tname = args[1].getBytes();
- HTable table = util.createTable(tname, FAMILY_NAME);
+ HTable table = util.createTable(tname, FAMILIES);
HBaseAdmin admin = new HBaseAdmin(conf);
admin.disableTable(tname);
- util.createMultiRegions(conf, table, FAMILY_NAME,
- generateRandomStartKeys(5));
+ byte[][] startKeys = generateRandomStartKeys(5);
+ for (byte[] family : FAMILIES) {
+ util.createMultiRegions(conf, table, family, startKeys);
+ }
admin.enableTable(tname);
} else if ("incremental".equals(args[0])) {
byte[] tname = args[1].getBytes();