You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by li...@apache.org on 2013/08/01 20:18:21 UTC
svn commit: r1509361 - in /hbase/branches/0.89-fb/src:
main/java/org/apache/hadoop/hbase/mapreduce/
test/java/org/apache/hadoop/hbase/mapreduce/
Author: liyin
Date: Thu Aug 1 18:18:20 2013
New Revision: 1509361
URL: http://svn.apache.org/r1509361
Log:
[master]support incremental hfile in task subdir
Author: fan
Summary: support incremental hfile in task subdir
Test Plan: a unit test for creating subdir path
Reviewers: adela, liyintang, manukranthk, aaiyer
Reviewed By: liyintang
CC: hbase-eng@, vinodv
Differential Revision: https://phabricator.fb.com/D896850
Task ID: 2633302
Modified:
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java?rev=1509361&r1=1509360&r2=1509361&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java Thu Aug 1 18:18:20 2013
@@ -21,15 +21,12 @@ package org.apache.hadoop.hbase.mapreduc
import java.io.IOException;
import java.io.UnsupportedEncodingException;
-import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URLDecoder;
import java.net.URLEncoder;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collection;
-import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
@@ -58,13 +55,12 @@ import org.apache.hadoop.hbase.io.hfile.
import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoderImpl;
import org.apache.hadoop.hbase.mapreduce.hadoopbackport.TotalOrderPartitioner;
import org.apache.hadoop.hbase.master.RegionPlacement;
-import org.apache.hadoop.hbase.regionserver.StoreFile.BloomType;
import org.apache.hadoop.hbase.regionserver.StoreFile;
+import org.apache.hadoop.hbase.regionserver.StoreFile.BloomType;
import org.apache.hadoop.hbase.util.BloomFilterFactory;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.RecordWriter;
@@ -93,6 +89,8 @@ public class HFileOutputFormat extends F
static final String TABLE_NAME = "hbase.hfileoutputformat.tablename";
static final String UTF8 = "UTF-8";
+ protected static RecordWriter<ImmutableBytesWritable, KeyValue> latestWriter = null;
+
public RecordWriter<ImmutableBytesWritable, KeyValue> getRecordWriter(final TaskAttemptContext context)
throws IOException, InterruptedException {
// Get the path of the temporary output file
@@ -100,6 +98,7 @@ public class HFileOutputFormat extends F
final Path outputdir = new FileOutputCommitter(outputPath, context).getWorkPath();
final Configuration conf = context.getConfiguration();
final FileSystem fs = outputdir.getFileSystem(conf);
+
// These configs. are from hbase-*.xml
final long maxsize = conf.getLong("hbase.hregion.max.filesize",
HConstants.DEFAULT_MAX_FILE_SIZE);
@@ -125,7 +124,8 @@ public class HFileOutputFormat extends F
final Pair<byte[][], byte[][]> startKeysAndFavoredNodes =
(table == null ? null : table.getStartKeysAndFavoredNodes());
- return new RecordWriter<ImmutableBytesWritable, KeyValue>() {
+ RecordWriter<ImmutableBytesWritable, KeyValue> writer =
+ new RecordWriter<ImmutableBytesWritable, KeyValue>() {
// Map of families to writers and how much has been output on the writer.
private final Map<byte [], WriterLength> writers =
new TreeMap<byte [], WriterLength>(Bytes.BYTES_COMPARATOR);
@@ -266,6 +266,7 @@ public class HFileOutputFormat extends F
*/
w.appendMetadata(HConstants.NO_MIN_FLUSH_TIME, 0, false);
w.close();
+
}
}
@@ -276,6 +277,9 @@ public class HFileOutputFormat extends F
}
}
};
+
+ latestWriter = writer;
+ return writer;
}
/*
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java?rev=1509361&r1=1509360&r2=1509361&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java Thu Aug 1 18:18:20 2013
@@ -74,10 +74,13 @@ public class LoadIncrementalHFiles exten
public static String EXIT_ON_FIRST_FAILURE = "hbase.mapreduce.bulkload.failure.exitOnFirst";
private boolean exitOnFirstFailure;
+ private Configuration conf;
+
public LoadIncrementalHFiles(Configuration conf) {
super(conf);
assignSeqIds = conf.getBoolean(ASSIGN_SEQ_IDS, true);
exitOnFirstFailure = conf.getBoolean(EXIT_ON_FIRST_FAILURE, true);
+ this.conf = conf;
}
public LoadIncrementalHFiles() {
Modified: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java?rev=1509361&r1=1509360&r2=1509361&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java (original)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java Thu Aug 1 18:18:20 2013
@@ -26,16 +26,15 @@ import static org.junit.Assert.assertTru
import static org.junit.Assert.fail;
import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Random;
import java.util.Map.Entry;
+import java.util.Random;
+import com.google.common.collect.Lists;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -46,13 +45,12 @@ import org.apache.hadoop.hbase.HBaseConf
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HServerAddress;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.MiniHBaseCluster;
import org.apache.hadoop.hbase.PerformanceEvaluation;
import org.apache.hadoop.hbase.client.Delete;
-import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
@@ -65,18 +63,14 @@ import org.apache.hadoop.hbase.io.Immuta
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.Compression;
-import org.apache.hadoop.hbase.io.hfile.HFile;
-import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder;
-import org.apache.hadoop.hbase.io.hfile.HFileScanner;
import org.apache.hadoop.hbase.io.hfile.Compression.Algorithm;
+import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.HFile.Reader;
-import org.apache.hadoop.hbase.master.AssignmentPlan;
+import org.apache.hadoop.hbase.io.hfile.HFileScanner;
import org.apache.hadoop.hbase.master.RegionPlacement;
-import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.regionserver.StoreFile.BloomType;
import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
@@ -92,8 +86,6 @@ import org.apache.hadoop.mapreduce.lib.o
import org.junit.Test;
import org.mockito.Mockito;
-import com.google.common.collect.Lists;
-
/**
* Simple test for {@link KeyValueSortReducer} and {@link HFileOutputFormat}.
* Sets up and runs a mapreduce job that writes hfile output.
@@ -253,6 +245,50 @@ public class TestHFileOutputFormat {
}
}
+ static class SimpleKVMapper
+ extends Mapper<NullWritable, NullWritable, ImmutableBytesWritable, KeyValue> {
+
+ static final String testKey = "testKey";
+ static final String testValue = "testValue";
+
+ @Override
+ protected void map (
+ NullWritable n1, NullWritable n2,
+ Mapper<NullWritable, NullWritable,
+ ImmutableBytesWritable, KeyValue>.Context context)
+ throws IOException ,InterruptedException
+ {
+ int taskId = context.getTaskAttemptID().getTaskID().getId();
+ assert taskId < Byte.MAX_VALUE : "Unit tests dont support > 127 tasks!";
+
+ String taskIdString = context.getTaskAttemptID().getTaskID().toString();
+ byte[] keyBytes = Bytes.toBytes(testKey + taskIdString);
+ byte[] valBytes = Bytes.toBytes(testValue);
+ ImmutableBytesWritable key = new ImmutableBytesWritable(keyBytes);
+
+ for (byte[] family : TestHFileOutputFormat.FAMILIES) {
+ KeyValue kv = new KeyValue(keyBytes, family,
+ PerformanceEvaluation.QUALIFIER_NAME, valBytes);
+ context.write(key, kv);
+ }
+ }
+
+ @Override
+ protected void cleanup(Context context) throws IOException, InterruptedException {
+ Configuration conf = context.getConfiguration();
+ HTable table = new HTable(conf, TABLE_NAME);
+
+ Path outputPath = FileOutputFormat.getOutputPath(context);
+ FileSystem fs = outputPath.getFileSystem(conf);
+ Path workOutputPath = FileOutputFormat.getWorkOutputPath(context).makeQualified(fs);
+
+ // Force flushing HFile into working directory
+ HFileOutputFormat.latestWriter.close(context);
+
+ new LoadIncrementalHFiles(conf).doBulkLoad(workOutputPath, table);
+ }
+ };
+
/**
* Test for the union style MR jobs that runs both Put and Delete requests
* @throws Exception on job, sorting, IO or fs errors
@@ -621,6 +657,55 @@ public class TestHFileOutputFormat {
}
/**
+ * Test for uploading map output in cleanup stage of each task
+ * @throws Exception
+ */
+ @Test
+ public void testUploadByTask() throws Exception {
+ try {
+ MiniHBaseCluster cluster = util.startMiniCluster();
+ cluster.getMaster();
+ HTable table = util.createTable(TABLE_NAME, FAMILIES);
+ Configuration conf = table.getConfiguration() ;
+ Path testDir = util.getTestDir("testUploadByTask");
+
+ // Generate the bulk load files
+ util.startMiniMapReduceCluster();
+
+ Job job = new Job(conf, "testUploadByTask");
+ job.setInputFormatClass(NMapInputFormat.class);
+ job.setMapperClass(SimpleKVMapper.class);
+ job.setMapOutputKeyClass(ImmutableBytesWritable.class);
+ job.setMapOutputValueClass(KeyValue.class);
+ job.setOutputFormatClass(HFileOutputFormat.class);
+ job.setNumReduceTasks(0);
+ HFileOutputFormat.configAsMapOutputFormat(job, table);
+ FileOutputFormat.setOutputPath(job, testDir);
+ assertTrue(job.waitForCompletion(true));
+
+ // Ensure data shows up
+ int expectedRows = conf.getInt("mapred.map.tasks", 1);
+ assertEquals("LoadIncrementalHFiles should put a row per task",
+ expectedRows, util.countRows(table));
+ Scan scan = new Scan();
+ ResultScanner results = table.getScanner(scan);
+ for (Result res : results) {
+ assertEquals(FAMILIES.length, res.raw().length);
+ for (KeyValue kv : res.raw()) {
+ assertTrue("Key should start with pre-defined test key",
+ Bytes.toStringBinary(kv.getRow()).startsWith(SimpleKVMapper.testKey));
+ assertEquals("Value should equal to pre-defined value",
+ SimpleKVMapper.testValue, Bytes.toString(kv.getValue()));
+ }
+ }
+ results.close();
+ } finally {
+ util.shutdownMiniMapReduceCluster();
+ util.shutdownMiniCluster();
+ }
+ }
+
+ /**
* Test for
* {@link HFileOutputFormat#createFamilyCompressionMap(Configuration)}. Tests
* that the compression map is correctly deserialized from configuration