You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ns...@apache.org on 2010/12/08 07:37:06 UTC
svn commit: r1043318 - in /hbase/trunk/src:
main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java
main/java/org/apache/hadoop/hbase/mapreduce/PutSortReducer.java
test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java
Author: nspiegelberg
Date: Wed Dec 8 06:37:06 2010
New Revision: 1043318
URL: http://svn.apache.org/viewvc?rev=1043318&view=rev
Log:
HBASE-1861 : Multi-Family support for bulk upload tools
Modified:
hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/PutSortReducer.java
hbase/trunk/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java?rev=1043318&r1=1043317&r2=1043318&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java Wed Dec 8 06:37:06 2010
@@ -58,7 +58,8 @@ import org.apache.commons.logging.LogFac
* Currently, can only write files to a single column family at a
* time. Multiple column families requires coordinating keys cross family.
* Writes current time as the sequence id for the file. Sets the major compacted
- * attribute on created hfiles.
+ * attribute on created hfiles. Calling write(null,null) will forceably roll
+ * all HFiles being written.
* @see KeyValueSortReducer
*/
public class HFileOutputFormat extends FileOutputFormat<ImmutableBytesWritable, KeyValue> {
@@ -72,9 +73,10 @@ public class HFileOutputFormat extends F
Configuration conf = context.getConfiguration();
final FileSystem fs = outputdir.getFileSystem(conf);
// These configs. are from hbase-*.xml
- final long maxsize = conf.getLong("hbase.hregion.max.filesize", 268435456);
- final int blocksize =
- conf.getInt("hbase.mapreduce.hfileoutputformat.blocksize", 65536);
+ final long maxsize = conf.getLong("hbase.hregion.max.filesize",
+ HConstants.DEFAULT_MAX_FILE_SIZE);
+ final int blocksize = conf.getInt("hfile.min.blocksize.size",
+ HFile.DEFAULT_BLOCKSIZE);
// Invented config. Add to hbase-*.xml if other than default compression.
final String compression = conf.get("hfile.compression",
Compression.Algorithm.NONE.getName());
@@ -85,48 +87,77 @@ public class HFileOutputFormat extends F
new TreeMap<byte [], WriterLength>(Bytes.BYTES_COMPARATOR);
private byte [] previousRow = HConstants.EMPTY_BYTE_ARRAY;
private final byte [] now = Bytes.toBytes(System.currentTimeMillis());
+ private boolean rollRequested = false;
public void write(ImmutableBytesWritable row, KeyValue kv)
throws IOException {
+ // null input == user explicitly wants to flush
+ if (row == null && kv == null) {
+ rollWriters();
+ return;
+ }
+
+ byte [] rowKey = kv.getRow();
long length = kv.getLength();
byte [] family = kv.getFamily();
WriterLength wl = this.writers.get(family);
- if (wl == null || ((length + wl.written) >= maxsize) &&
- Bytes.compareTo(this.previousRow, 0, this.previousRow.length,
- kv.getBuffer(), kv.getRowOffset(), kv.getRowLength()) != 0) {
- // Get a new writer.
- Path basedir = new Path(outputdir, Bytes.toString(family));
- if (wl == null) {
- wl = new WriterLength();
- this.writers.put(family, wl);
- if (this.writers.size() > 1) throw new IOException("One family only");
- // If wl == null, first file in family. Ensure family dir exits.
- if (!fs.exists(basedir)) fs.mkdirs(basedir);
- }
- wl.writer = getNewWriter(wl.writer, basedir);
- LOG.info("Writer=" + wl.writer.getPath() +
- ((wl.written == 0)? "": ", wrote=" + wl.written));
- wl.written = 0;
+
+ // If this is a new column family, verify that the directory exists
+ if (wl == null) {
+ fs.mkdirs(new Path(outputdir, Bytes.toString(family)));
}
+
+ // If any of the HFiles for the column families has reached
+ // maxsize, we need to roll all the writers
+ if (wl != null && wl.written + length >= maxsize) {
+ this.rollRequested = true;
+ }
+
+ // This can only happen once a row is finished though
+ if (rollRequested && Bytes.compareTo(this.previousRow, rowKey) != 0) {
+ rollWriters();
+ }
+
+ // create a new HLog writer, if necessary
+ if (wl == null || wl.writer == null) {
+ wl = getNewWriter(family);
+ }
+
+ // we now have the proper HLog writer. full steam ahead
kv.updateLatestStamp(this.now);
wl.writer.append(kv);
wl.written += length;
+
// Copy the row so we know when a row transition.
- this.previousRow = kv.getRow();
+ this.previousRow = rowKey;
}
- /* Create a new HFile.Writer. Close current if there is one.
- * @param writer
- * @param familydir
- * @return A new HFile.Writer.
+ private void rollWriters() throws IOException {
+ for (WriterLength wl : this.writers.values()) {
+ if (wl.writer != null) {
+ LOG.info("Writer=" + wl.writer.getPath() +
+ ((wl.written == 0)? "": ", wrote=" + wl.written));
+ close(wl.writer);
+ }
+ wl.writer = null;
+ wl.written = 0;
+ }
+ this.rollRequested = false;
+ }
+
+ /* Create a new HFile.Writer.
+ * @param family
+ * @return A WriterLength, containing a new HFile.Writer.
* @throws IOException
*/
- private HFile.Writer getNewWriter(final HFile.Writer writer,
- final Path familydir)
- throws IOException {
- close(writer);
- return new HFile.Writer(fs, StoreFile.getUniqueFile(fs, familydir),
- blocksize, compression, KeyValue.KEY_COMPARATOR);
+ private WriterLength getNewWriter(byte[] family) throws IOException {
+ WriterLength wl = new WriterLength();
+ Path familydir = new Path(outputdir, Bytes.toString(family));
+ wl.writer = new HFile.Writer(fs,
+ StoreFile.getUniqueFile(fs, familydir), blocksize,
+ compression, KeyValue.KEY_COMPARATOR);
+ this.writers.put(family, wl);
+ return wl;
}
private void close(final HFile.Writer w) throws IOException {
@@ -143,8 +174,8 @@ public class HFileOutputFormat extends F
public void close(TaskAttemptContext c)
throws IOException, InterruptedException {
- for (Map.Entry<byte [], WriterLength> e: this.writers.entrySet()) {
- close(e.getValue().writer);
+ for (WriterLength wl: this.writers.values()) {
+ close(wl.writer);
}
}
};
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/PutSortReducer.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/PutSortReducer.java?rev=1043318&r1=1043317&r2=1043318&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/PutSortReducer.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/PutSortReducer.java Wed Dec 8 06:37:06 2010
@@ -19,6 +19,7 @@
*/
package org.apache.hadoop.hbase.mapreduce;
+import java.util.Iterator;
import java.util.List;
import java.util.TreeSet;
@@ -26,6 +27,7 @@ import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.util.StringUtils;
/**
* Emits sorted Puts.
@@ -46,21 +48,37 @@ public class PutSortReducer extends
ImmutableBytesWritable, KeyValue>.Context context)
throws java.io.IOException, InterruptedException
{
- TreeSet<KeyValue> map = new TreeSet<KeyValue>(KeyValue.COMPARATOR);
-
- for (Put p : puts) {
- for (List<KeyValue> kvs : p.getFamilyMap().values()) {
- for (KeyValue kv : kvs) {
- map.add(kv.clone());
+ // although reduce() is called per-row, handle pathological case
+ long threshold = context.getConfiguration().getLong(
+ "putsortreducer.row.threshold", 2L * (1<<30));
+ Iterator<Put> iter = puts.iterator();
+ while (iter.hasNext()) {
+ TreeSet<KeyValue> map = new TreeSet<KeyValue>(KeyValue.COMPARATOR);
+ long curSize = 0;
+ // stop at the end or the RAM threshold
+ while (iter.hasNext() && curSize < threshold) {
+ Put p = iter.next();
+ for (List<KeyValue> kvs : p.getFamilyMap().values()) {
+ for (KeyValue kv : kvs) {
+ map.add(kv);
+ curSize += kv.getValueLength();
+ }
}
}
- }
- context.setStatus("Read " + map.getClass());
- int index = 0;
- for (KeyValue kv : map) {
- context.write(row, kv);
- if (index > 0 && index % 100 == 0)
- context.setStatus("Wrote " + index);
+ context.setStatus("Read " + map.size() + " entries of " + map.getClass()
+ + "(" + StringUtils.humanReadableInt(curSize) + ")");
+ int index = 0;
+ for (KeyValue kv : map) {
+ context.write(row, kv);
+ if (index > 0 && index % 100 == 0)
+ context.setStatus("Wrote " + index);
+ }
+
+ // if we have more entries to process
+ if (iter.hasNext()) {
+ // force flush because we cannot guarantee intra-row sorted order
+ context.write(null, null);
+ }
}
}
}
Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java?rev=1043318&r1=1043317&r2=1043318&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java (original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java Wed Dec 8 06:37:06 2010
@@ -41,6 +41,9 @@ import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.PerformanceEvaluation;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Threads;
@@ -65,7 +68,9 @@ import org.mockito.Mockito;
public class TestHFileOutputFormat {
private final static int ROWSPERSPLIT = 1024;
- private static final byte[] FAMILY_NAME = PerformanceEvaluation.FAMILY_NAME;
+ private static final byte[][] FAMILIES
+ = { Bytes.add(PerformanceEvaluation.FAMILY_NAME, Bytes.toBytes("-A"))
+ , Bytes.add(PerformanceEvaluation.FAMILY_NAME, Bytes.toBytes("-B"))};
private static final byte[] TABLE_NAME = Bytes.toBytes("TestTable");
private HBaseTestingUtility util = new HBaseTestingUtility();
@@ -119,9 +124,11 @@ public class TestHFileOutputFormat {
random.nextBytes(valBytes);
ImmutableBytesWritable key = new ImmutableBytesWritable(keyBytes);
- KeyValue kv = new KeyValue(keyBytes, PerformanceEvaluation.FAMILY_NAME,
- PerformanceEvaluation.QUALIFIER_NAME, valBytes);
- context.write(key, kv);
+ for (byte[] family : TestHFileOutputFormat.FAMILIES) {
+ KeyValue kv = new KeyValue(keyBytes, family,
+ PerformanceEvaluation.QUALIFIER_NAME, valBytes);
+ context.write(key, kv);
+ }
}
}
}
@@ -268,14 +275,12 @@ public class TestHFileOutputFormat {
try {
util.startMiniCluster();
HBaseAdmin admin = new HBaseAdmin(conf);
- HTable table = util.createTable(TABLE_NAME, FAMILY_NAME);
- int numRegions = util.createMultiRegions(
- util.getConfiguration(), table, FAMILY_NAME,
- startKeys);
- assertEquals("Should make 5 regions",
- numRegions, 5);
+ HTable table = util.createTable(TABLE_NAME, FAMILIES);
assertEquals("Should start with empty table",
0, util.countRows(table));
+ int numRegions = util.createMultiRegions(
+ util.getConfiguration(), table, FAMILIES[0], startKeys);
+ assertEquals("Should make 5 regions", numRegions, 5);
// Generate the bulk load files
util.startMiniMapReduceCluster();
@@ -284,6 +289,19 @@ public class TestHFileOutputFormat {
assertEquals("HFOF should not touch actual table",
0, util.countRows(table));
+
+ // Make sure that a directory was created for every CF
+ int dir = 0;
+ for (FileStatus f : testDir.getFileSystem(conf).listStatus(testDir)) {
+ for (byte[] family : FAMILIES) {
+ if (Bytes.toString(family).equals(f.getPath().getName())) {
+ ++dir;
+ }
+ }
+ }
+ assertEquals("Column family not found in FS.", FAMILIES.length, dir);
+
+ // handle the split case
if (shouldChangeRegions) {
LOG.info("Changing regions in table");
admin.disableTable(table.getTableName());
@@ -293,8 +311,8 @@ public class TestHFileOutputFormat {
LOG.info("Waiting on table to finish disabling");
}
byte[][] newStartKeys = generateRandomStartKeys(15);
- util.createMultiRegions(util.getConfiguration(),
- table, FAMILY_NAME, newStartKeys);
+ util.createMultiRegions(
+ util.getConfiguration(), table, FAMILIES[0], newStartKeys);
admin.enableTable(table.getTableName());
while (table.getRegionsInfo().size() != 15 ||
!admin.isTableAvailable(table.getTableName())) {
@@ -310,6 +328,19 @@ public class TestHFileOutputFormat {
int expectedRows = conf.getInt("mapred.map.tasks", 1) * ROWSPERSPLIT;
assertEquals("LoadIncrementalHFiles should put expected data in table",
expectedRows, util.countRows(table));
+ Scan scan = new Scan();
+ ResultScanner results = table.getScanner(scan);
+ int count = 0;
+ for (Result res : results) {
+ count++;
+ assertEquals(FAMILIES.length, res.raw().length);
+ KeyValue first = res.raw()[0];
+ for (KeyValue kv : res.raw()) {
+ assertTrue(KeyValue.COMPARATOR.matchingRows(first, kv));
+ assertTrue(Bytes.equals(first.getValue(), kv.getValue()));
+ }
+ }
+ results.close();
String tableDigestBefore = util.checksumRows(table);
// Cause regions to reopen
@@ -351,11 +382,11 @@ public class TestHFileOutputFormat {
util = new HBaseTestingUtility(conf);
if ("newtable".equals(args[0])) {
byte[] tname = args[1].getBytes();
- HTable table = util.createTable(tname, FAMILY_NAME);
+ HTable table = util.createTable(tname, FAMILIES);
HBaseAdmin admin = new HBaseAdmin(conf);
admin.disableTable(tname);
- util.createMultiRegions(conf, table, FAMILY_NAME,
- generateRandomStartKeys(5));
+ byte[][] startKeys = generateRandomStartKeys(5);
+ util.createMultiRegions(conf, table, FAMILIES[0], startKeys);
admin.enableTable(tname);
} else if ("incremental".equals(args[0])) {
byte[] tname = args[1].getBytes();