You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by je...@apache.org on 2016/07/16 04:07:56 UTC
hbase git commit: HBASE-3727 MultiHFileOutputFormat (yi liang)
Repository: hbase
Updated Branches:
refs/heads/master 84b7010a3 -> 60847a2d7
HBASE-3727 MultiHFileOutputFormat (yi liang)
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/60847a2d
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/60847a2d
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/60847a2d
Branch: refs/heads/master
Commit: 60847a2d76163ff40df94a980e1bd3f837ff9d71
Parents: 84b7010
Author: Jerry He <je...@apache.org>
Authored: Fri Jul 15 21:07:12 2016 -0700
Committer: Jerry He <je...@apache.org>
Committed: Fri Jul 15 21:07:12 2016 -0700
----------------------------------------------------------------------
.../hbase/mapreduce/HFileOutputFormat2.java | 409 ++++++++++---------
.../hbase/mapreduce/MultiHFileOutputFormat.java | 102 +++++
.../mapreduce/TestMultiHFileOutputFormat.java | 224 ++++++++++
3 files changed, 546 insertions(+), 189 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/60847a2d/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java
index b8a5475..22a73c9 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java
@@ -67,6 +67,7 @@ import org.apache.hadoop.hbase.regionserver.HStore;
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
@@ -131,224 +132,254 @@ public class HFileOutputFormat2
}
static <V extends Cell> RecordWriter<ImmutableBytesWritable, V>
- createRecordWriter(final TaskAttemptContext context)
- throws IOException {
-
- // Get the path of the temporary output file
- final Path outputPath = FileOutputFormat.getOutputPath(context);
- final Path outputdir = new FileOutputCommitter(outputPath, context).getWorkPath();
- final Configuration conf = context.getConfiguration();
- final FileSystem fs = outputdir.getFileSystem(conf);
- // These configs. are from hbase-*.xml
- final long maxsize = conf.getLong(HConstants.HREGION_MAX_FILESIZE,
- HConstants.DEFAULT_MAX_FILE_SIZE);
- // Invented config. Add to hbase-*.xml if other than default compression.
- final String defaultCompressionStr = conf.get("hfile.compression",
- Compression.Algorithm.NONE.getName());
- final Algorithm defaultCompression = HFileWriterImpl
- .compressionByName(defaultCompressionStr);
- final boolean compactionExclude = conf.getBoolean(
- "hbase.mapreduce.hfileoutputformat.compaction.exclude", false);
-
- // create a map from column family to the compression algorithm
- final Map<byte[], Algorithm> compressionMap = createFamilyCompressionMap(conf);
- final Map<byte[], BloomType> bloomTypeMap = createFamilyBloomTypeMap(conf);
- final Map<byte[], Integer> blockSizeMap = createFamilyBlockSizeMap(conf);
-
- String dataBlockEncodingStr = conf.get(DATABLOCK_ENCODING_OVERRIDE_CONF_KEY);
- final Map<byte[], DataBlockEncoding> datablockEncodingMap
- = createFamilyDataBlockEncodingMap(conf);
- final DataBlockEncoding overriddenEncoding;
- if (dataBlockEncodingStr != null) {
- overriddenEncoding = DataBlockEncoding.valueOf(dataBlockEncodingStr);
- } else {
- overriddenEncoding = null;
+ createRecordWriter(final TaskAttemptContext context) throws IOException {
+ return new HFileRecordWriter<V>(context, null);
+ }
+
+ protected static class HFileRecordWriter<V extends Cell>
+ extends RecordWriter<ImmutableBytesWritable, V> {
+ private final TaskAttemptContext context;
+ private final Path outputPath;
+ private final Path outputDir;
+ private final Configuration conf;
+ private final FileSystem fs;
+
+ private final long maxsize;
+
+ private final Algorithm defaultCompression;
+ private final boolean compactionExclude;
+
+ private final Map<byte[], Algorithm> compressionMap;
+ private final Map<byte[], BloomType> bloomTypeMap;
+ private final Map<byte[], Integer> blockSizeMap;
+
+ private final Map<byte[], DataBlockEncoding> datablockEncodingMap;
+ private final DataBlockEncoding overriddenEncoding;
+
+ private final Map<byte[], WriterLength> writers;
+ private byte[] previousRow;
+ private final byte[] now;
+ private boolean rollRequested;
+
+ /**
+ * Mapredue job will create a temp path for outputting results. If out != null, it means that
+ * the caller has set the temp working dir; If out == null, it means we need to set it here.
+ * Used by HFileOutputFormat2 and MultiHFileOutputFormat. MultiHFileOutputFormat will give us
+ * temp working dir at the table level and HFileOutputFormat2 has to set it here within this
+ * constructor.
+ */
+ public HFileRecordWriter(final TaskAttemptContext taContext, final Path out)
+ throws IOException {
+ // Get the path of the temporary output file
+ context = taContext;
+
+ if (out == null) {
+ outputPath = FileOutputFormat.getOutputPath(context);
+ outputDir = new FileOutputCommitter(outputPath, context).getWorkPath();
+ } else {
+ outputPath = out;
+ outputDir = outputPath;
+ }
+
+ conf = context.getConfiguration();
+ fs = outputDir.getFileSystem(conf);
+
+ // These configs. are from hbase-*.xml
+ maxsize = conf.getLong(HConstants.HREGION_MAX_FILESIZE, HConstants.DEFAULT_MAX_FILE_SIZE);
+
+ // Invented config. Add to hbase-*.xml if other than default compression.
+ String defaultCompressionStr = conf.get("hfile.compression", Compression.Algorithm.NONE.getName());
+ defaultCompression = HFileWriterImpl.compressionByName(defaultCompressionStr);
+ compactionExclude =
+ conf.getBoolean("hbase.mapreduce.hfileoutputformat.compaction.exclude", false);
+
+ // create a map from column family to the compression algorithm
+ compressionMap = createFamilyCompressionMap(conf);
+ bloomTypeMap = createFamilyBloomTypeMap(conf);
+ blockSizeMap = createFamilyBlockSizeMap(conf);
+
+ // Config for data block encoding
+ String dataBlockEncodingStr = conf.get(DATABLOCK_ENCODING_OVERRIDE_CONF_KEY);
+ datablockEncodingMap = createFamilyDataBlockEncodingMap(conf);
+ if (dataBlockEncodingStr != null) {
+ overriddenEncoding = DataBlockEncoding.valueOf(dataBlockEncodingStr);
+ } else {
+ overriddenEncoding = null;
+ }
+
+ writers = new TreeMap<byte[], WriterLength>(Bytes.BYTES_COMPARATOR);
+ previousRow = HConstants.EMPTY_BYTE_ARRAY;
+ now = Bytes.toBytes(EnvironmentEdgeManager.currentTime());
+ rollRequested = false;
}
- return new RecordWriter<ImmutableBytesWritable, V>() {
- // Map of families to writers and how much has been output on the writer.
- private final Map<byte [], WriterLength> writers =
- new TreeMap<byte [], WriterLength>(Bytes.BYTES_COMPARATOR);
- private byte [] previousRow = HConstants.EMPTY_BYTE_ARRAY;
- private final byte [] now = Bytes.toBytes(System.currentTimeMillis());
- private boolean rollRequested = false;
-
- @Override
- public void write(ImmutableBytesWritable row, V cell)
- throws IOException {
- KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
-
- // null input == user explicitly wants to flush
- if (row == null && kv == null) {
- rollWriters();
- return;
- }
+ @Override
+ public void write(ImmutableBytesWritable row, V cell) throws IOException {
+ KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
- byte [] rowKey = CellUtil.cloneRow(kv);
- long length = kv.getLength();
- byte [] family = CellUtil.cloneFamily(kv);
- WriterLength wl = this.writers.get(family);
+ // null input == user explicitly wants to flush
+ if (row == null && kv == null) {
+ rollWriters();
+ return;
+ }
- // If this is a new column family, verify that the directory exists
- if (wl == null) {
- fs.mkdirs(new Path(outputdir, Bytes.toString(family)));
- }
+ byte[] rowKey = CellUtil.cloneRow(kv);
+ long length = kv.getLength();
+ byte[] family = CellUtil.cloneFamily(kv);
+ WriterLength wl = this.writers.get(family);
- // If any of the HFiles for the column families has reached
- // maxsize, we need to roll all the writers
- if (wl != null && wl.written + length >= maxsize) {
- this.rollRequested = true;
- }
+ // If this is a new column family, verify that the directory exists
+ if (wl == null) {
+ fs.mkdirs(new Path(outputDir, Bytes.toString(family)));
+ }
- // This can only happen once a row is finished though
- if (rollRequested && Bytes.compareTo(this.previousRow, rowKey) != 0) {
- rollWriters();
- }
+ // If any of the HFiles for the column families has reached
+ // maxsize, we need to roll all the writers
+ if (wl != null && wl.written + length >= maxsize) {
+ this.rollRequested = true;
+ }
- // create a new WAL writer, if necessary
- if (wl == null || wl.writer == null) {
- if (conf.getBoolean(LOCALITY_SENSITIVE_CONF_KEY, DEFAULT_LOCALITY_SENSITIVE)) {
- HRegionLocation loc = null;
- String tableName = conf.get(OUTPUT_TABLE_NAME_CONF_KEY);
- if (tableName != null) {
- try (Connection connection = ConnectionFactory.createConnection(conf);
- RegionLocator locator =
- connection.getRegionLocator(TableName.valueOf(tableName))) {
- loc = locator.getRegionLocation(rowKey);
- } catch (Throwable e) {
- LOG.warn("there's something wrong when locating rowkey: " +
- Bytes.toString(rowKey), e);
- loc = null;
- }
+ // This can only happen once a row is finished though
+ if (rollRequested && Bytes.compareTo(this.previousRow, rowKey) != 0) {
+ rollWriters();
+ }
+
+ // create a new WAL writer, if necessary
+ if (wl == null || wl.writer == null) {
+ if (conf.getBoolean(LOCALITY_SENSITIVE_CONF_KEY, DEFAULT_LOCALITY_SENSITIVE)) {
+ HRegionLocation loc = null;
+ String tableName = conf.get(OUTPUT_TABLE_NAME_CONF_KEY);
+ if (tableName != null) {
+ try (Connection connection = ConnectionFactory.createConnection(conf);
+ RegionLocator locator = connection.getRegionLocator(TableName.valueOf(tableName))) {
+ loc = locator.getRegionLocation(rowKey);
+ } catch (Throwable e) {
+ LOG.warn("there's something wrong when locating rowkey: " + Bytes.toString(rowKey),
+ e);
+ loc = null;
}
+ }
- if (null == loc) {
+ if (null == loc) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace(
+ "failed to get region location, so use default writer: " + Bytes.toString(rowKey));
+ }
+ wl = getNewWriter(family, conf, null);
+ } else {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("first rowkey: [" + Bytes.toString(rowKey) + "]");
+ }
+ InetSocketAddress initialIsa = new InetSocketAddress(loc.getHostname(), loc.getPort());
+ if (initialIsa.isUnresolved()) {
if (LOG.isTraceEnabled()) {
- LOG.trace("failed to get region location, so use default writer: " +
- Bytes.toString(rowKey));
+ LOG.trace("failed to resolve bind address: " + loc.getHostname() + ":"
+ + loc.getPort() + ", so use default writer");
}
wl = getNewWriter(family, conf, null);
} else {
if (LOG.isDebugEnabled()) {
- LOG.debug("first rowkey: [" + Bytes.toString(rowKey) + "]");
- }
- InetSocketAddress initialIsa =
- new InetSocketAddress(loc.getHostname(), loc.getPort());
- if (initialIsa.isUnresolved()) {
- if (LOG.isTraceEnabled()) {
- LOG.trace("failed to resolve bind address: " + loc.getHostname() + ":"
- + loc.getPort() + ", so use default writer");
- }
- wl = getNewWriter(family, conf, null);
- } else {
- if(LOG.isDebugEnabled()) {
- LOG.debug("use favored nodes writer: " + initialIsa.getHostString());
- }
- wl = getNewWriter(family, conf, new InetSocketAddress[] { initialIsa });
+ LOG.debug("use favored nodes writer: " + initialIsa.getHostString());
}
+ wl = getNewWriter(family, conf, new InetSocketAddress[] { initialIsa });
}
- } else {
- wl = getNewWriter(family, conf, null);
}
+ } else {
+ wl = getNewWriter(family, conf, null);
}
+ }
- // we now have the proper WAL writer. full steam ahead
- kv.updateLatestStamp(this.now);
- wl.writer.append(kv);
- wl.written += length;
+ // we now have the proper WAL writer. full steam ahead
+ kv.updateLatestStamp(this.now);
+ wl.writer.append(kv);
+ wl.written += length;
- // Copy the row so we know when a row transition.
- this.previousRow = rowKey;
- }
+ // Copy the row so we know when a row transition.
+ this.previousRow = rowKey;
+ }
- private void rollWriters() throws IOException {
- for (WriterLength wl : this.writers.values()) {
- if (wl.writer != null) {
- LOG.info("Writer=" + wl.writer.getPath() +
- ((wl.written == 0)? "": ", wrote=" + wl.written));
- close(wl.writer);
- }
- wl.writer = null;
- wl.written = 0;
+ private void rollWriters() throws IOException {
+ for (WriterLength wl : this.writers.values()) {
+ if (wl.writer != null) {
+ LOG.info(
+ "Writer=" + wl.writer.getPath() + ((wl.written == 0) ? "" : ", wrote=" + wl.written));
+ close(wl.writer);
}
- this.rollRequested = false;
+ wl.writer = null;
+ wl.written = 0;
}
+ this.rollRequested = false;
+ }
- /* Create a new StoreFile.Writer.
- * @param family
- * @return A WriterLength, containing a new StoreFile.Writer.
- * @throws IOException
- */
- @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="BX_UNBOXING_IMMEDIATELY_REBOXED",
- justification="Not important")
- private WriterLength getNewWriter(byte[] family, Configuration conf,
- InetSocketAddress[] favoredNodes) throws IOException {
- WriterLength wl = new WriterLength();
- Path familydir = new Path(outputdir, Bytes.toString(family));
- Algorithm compression = compressionMap.get(family);
- compression = compression == null ? defaultCompression : compression;
- BloomType bloomType = bloomTypeMap.get(family);
- bloomType = bloomType == null ? BloomType.NONE : bloomType;
- Integer blockSize = blockSizeMap.get(family);
- blockSize = blockSize == null ? HConstants.DEFAULT_BLOCKSIZE : blockSize;
- DataBlockEncoding encoding = overriddenEncoding;
- encoding = encoding == null ? datablockEncodingMap.get(family) : encoding;
- encoding = encoding == null ? DataBlockEncoding.NONE : encoding;
- Configuration tempConf = new Configuration(conf);
- tempConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0.0f);
- HFileContextBuilder contextBuilder = new HFileContextBuilder()
- .withCompression(compression)
- .withChecksumType(HStore.getChecksumType(conf))
- .withBytesPerCheckSum(HStore.getBytesPerChecksum(conf))
- .withBlockSize(blockSize);
-
- if (HFile.getFormatVersion(conf) >= HFile.MIN_FORMAT_VERSION_WITH_TAGS) {
- contextBuilder.withIncludesTags(true);
- }
-
- contextBuilder.withDataBlockEncoding(encoding);
- HFileContext hFileContext = contextBuilder.build();
-
- if (null == favoredNodes) {
- wl.writer =
- new StoreFileWriter.Builder(conf, new CacheConfig(tempConf), fs)
- .withOutputDir(familydir).withBloomType(bloomType)
- .withComparator(CellComparator.COMPARATOR).withFileContext(hFileContext).build();
- } else {
- wl.writer =
- new StoreFileWriter.Builder(conf, new CacheConfig(tempConf), new HFileSystem(fs))
- .withOutputDir(familydir).withBloomType(bloomType)
- .withComparator(CellComparator.COMPARATOR).withFileContext(hFileContext)
- .withFavoredNodes(favoredNodes).build();
- }
+ /*
+ * Create a new StoreFile.Writer.
+ * @param family
+ * @return A WriterLength, containing a new StoreFile.Writer.
+ * @throws IOException
+ */
+ @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "BX_UNBOXING_IMMEDIATELY_REBOXED",
+ justification = "Not important")
+ private WriterLength getNewWriter(byte[] family, Configuration conf,
+ InetSocketAddress[] favoredNodes) throws IOException {
+ WriterLength wl = new WriterLength();
+ Path familyDir = new Path(outputDir, Bytes.toString(family));
+ Algorithm compression = compressionMap.get(family);
+ compression = compression == null ? defaultCompression : compression;
+ BloomType bloomType = bloomTypeMap.get(family);
+ bloomType = bloomType == null ? BloomType.NONE : bloomType;
+ Integer blockSize = blockSizeMap.get(family);
+ blockSize = blockSize == null ? HConstants.DEFAULT_BLOCKSIZE : blockSize;
+ DataBlockEncoding encoding = overriddenEncoding;
+ encoding = encoding == null ? datablockEncodingMap.get(family) : encoding;
+ encoding = encoding == null ? DataBlockEncoding.NONE : encoding;
+ Configuration tempConf = new Configuration(conf);
+ tempConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0.0f);
+ HFileContextBuilder contextBuilder = new HFileContextBuilder().withCompression(compression)
+ .withChecksumType(HStore.getChecksumType(conf))
+ .withBytesPerCheckSum(HStore.getBytesPerChecksum(conf)).withBlockSize(blockSize);
+
+ if (HFile.getFormatVersion(conf) >= HFile.MIN_FORMAT_VERSION_WITH_TAGS) {
+ contextBuilder.withIncludesTags(true);
+ }
- this.writers.put(family, wl);
- return wl;
+ contextBuilder.withDataBlockEncoding(encoding);
+ HFileContext hFileContext = contextBuilder.build();
+
+ if (null == favoredNodes) {
+ wl.writer = new StoreFileWriter.Builder(conf, new CacheConfig(tempConf), fs)
+ .withOutputDir(familyDir).withBloomType(bloomType)
+ .withComparator(CellComparator.COMPARATOR).withFileContext(hFileContext).build();
+ } else {
+ wl.writer =
+ new StoreFileWriter.Builder(conf, new CacheConfig(tempConf), new HFileSystem(fs))
+ .withOutputDir(familyDir).withBloomType(bloomType)
+ .withComparator(CellComparator.COMPARATOR).withFileContext(hFileContext)
+ .withFavoredNodes(favoredNodes).build();
}
- private void close(final StoreFileWriter w) throws IOException {
- if (w != null) {
- w.appendFileInfo(StoreFile.BULKLOAD_TIME_KEY,
- Bytes.toBytes(System.currentTimeMillis()));
- w.appendFileInfo(StoreFile.BULKLOAD_TASK_KEY,
- Bytes.toBytes(context.getTaskAttemptID().toString()));
- w.appendFileInfo(StoreFile.MAJOR_COMPACTION_KEY,
- Bytes.toBytes(true));
- w.appendFileInfo(StoreFile.EXCLUDE_FROM_MINOR_COMPACTION_KEY,
- Bytes.toBytes(compactionExclude));
- w.appendTrackedTimestampsToMetadata();
- w.close();
- }
+ this.writers.put(family, wl);
+ return wl;
+ }
+
+ private void close(final StoreFileWriter w) throws IOException {
+ if (w != null) {
+ w.appendFileInfo(StoreFile.BULKLOAD_TIME_KEY, Bytes.toBytes(EnvironmentEdgeManager.currentTime()));
+ w.appendFileInfo(StoreFile.BULKLOAD_TASK_KEY,
+ Bytes.toBytes(context.getTaskAttemptID().toString()));
+ w.appendFileInfo(StoreFile.MAJOR_COMPACTION_KEY, Bytes.toBytes(true));
+ w.appendFileInfo(StoreFile.EXCLUDE_FROM_MINOR_COMPACTION_KEY,
+ Bytes.toBytes(compactionExclude));
+ w.appendTrackedTimestampsToMetadata();
+ w.close();
}
+ }
- @Override
- public void close(TaskAttemptContext c)
- throws IOException, InterruptedException {
- for (WriterLength wl: this.writers.values()) {
- close(wl.writer);
- }
+ @Override
+ public void close(TaskAttemptContext c) throws IOException, InterruptedException {
+ for (WriterLength wl : this.writers.values()) {
+ close(wl.writer);
}
- };
+ }
}
/*
@@ -503,7 +534,7 @@ public class HFileOutputFormat2
TableMapReduceUtil.initCredentials(job);
LOG.info("Incremental table " + regionLocator.getName() + " output configured.");
}
-
+
public static void configureIncrementalLoadMap(Job job, HTableDescriptor tableDescriptor) throws
IOException {
Configuration conf = job.getConfiguration();
http://git-wip-us.apache.org/repos/asf/hbase/blob/60847a2d/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiHFileOutputFormat.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiHFileOutputFormat.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiHFileOutputFormat.java
new file mode 100644
index 0000000..7c1ebbc
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiHFileOutputFormat.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable
+ * law or agreed to in writing, software distributed under the License is distributed on an "AS IS"
+ * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License
+ * for the specific language governing permissions and limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
+
+import com.google.common.annotations.VisibleForTesting;
+/**
+ * Create 3 level tree directory, first level is using table name as parent directory and then use
+ * family name as child directory, and all related HFiles for one family are under child directory
+ * -tableName1
+ * -columnFamilyName1
+ * -columnFamilyName2
+ * -HFiles
+ * -tableName2
+ * -columnFamilyName1
+ * -HFiles
+ * -columnFamilyName2
+ * <p>
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+@VisibleForTesting
+public class MultiHFileOutputFormat extends FileOutputFormat<ImmutableBytesWritable, Cell> {
+ private static final Log LOG = LogFactory.getLog(MultiHFileOutputFormat.class);
+
+ @Override
+ public RecordWriter<ImmutableBytesWritable, Cell>
+ getRecordWriter(final TaskAttemptContext context) throws IOException, InterruptedException {
+ return createMultiHFileRecordWriter(context);
+ }
+
+ static <V extends Cell> RecordWriter<ImmutableBytesWritable, V>
+ createMultiHFileRecordWriter(final TaskAttemptContext context) throws IOException {
+
+ // Get the path of the output directory
+ final Path outputPath = FileOutputFormat.getOutputPath(context);
+ final Path outputDir = new FileOutputCommitter(outputPath, context).getWorkPath();
+ final Configuration conf = context.getConfiguration();
+ final FileSystem fs = outputDir.getFileSystem(conf);
+
+ // Map of tables to writers
+ final Map<ImmutableBytesWritable, RecordWriter<ImmutableBytesWritable, V>> tableWriters =
+ new HashMap<ImmutableBytesWritable, RecordWriter<ImmutableBytesWritable, V>>();
+
+ return new RecordWriter<ImmutableBytesWritable, V>() {
+ @Override
+ public void write(ImmutableBytesWritable tableName, V cell)
+ throws IOException, InterruptedException {
+ RecordWriter<ImmutableBytesWritable, V> tableWriter = tableWriters.get(tableName);
+ // if there is new table, verify that table directory exists
+ if (tableWriter == null) {
+ // using table name as directory name
+ final Path tableOutputDir = new Path(outputDir, Bytes.toString(tableName.copyBytes()));
+ fs.mkdirs(tableOutputDir);
+ LOG.info("Writing Table '" + tableName.toString() + "' data into following directory"
+ + tableOutputDir.toString());
+
+ // Create writer for one specific table
+ tableWriter = new HFileOutputFormat2.HFileRecordWriter<V>(context, tableOutputDir);
+ // Put table into map
+ tableWriters.put(tableName, tableWriter);
+ }
+ // Write <Row, Cell> into tableWriter
+ // in the original code, it does not use Row
+ tableWriter.write(null, cell);
+ }
+
+ @Override
+ public void close(TaskAttemptContext c) throws IOException, InterruptedException {
+ for (RecordWriter<ImmutableBytesWritable, V> writer : tableWriters.values()) {
+ writer.close(c);
+ }
+ }
+ };
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/60847a2d/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultiHFileOutputFormat.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultiHFileOutputFormat.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultiHFileOutputFormat.java
new file mode 100644
index 0000000..738ae5f
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultiHFileOutputFormat.java
@@ -0,0 +1,224 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable
+ * law or agreed to in writing, software distributed under the License is distributed on an "AS IS"
+ * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License
+ * for the specific language governing permissions and limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import static org.junit.Assert.assertTrue;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Random;
+import java.util.TreeSet;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.*;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.io.hfile.HFile;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.FileOutputCommitter;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+/**
+ * Test for{@link MultiHFileOutputFormat}. Sets up and runs a mapreduce job that output directories and
+ * writes hfiles.
+ */
+@Category(MediumTests.class)
+public class TestMultiHFileOutputFormat {
+ private static final Log LOG = LogFactory.getLog(TestMultiHFileOutputFormat.class);
+
+ private HBaseTestingUtility util = new HBaseTestingUtility();
+
+ private static int ROWSPERSPLIT = 10;
+
+ private static final int KEYLEN_DEFAULT = 10;
+ private static final String KEYLEN_CONF = "randomkv.key.length";
+
+ private static final int VALLEN_DEFAULT = 10;
+ private static final String VALLEN_CONF = "randomkv.val.length";
+
+ private static final byte[][] TABLES =
+ { Bytes.add(Bytes.toBytes(PerformanceEvaluation.TABLE_NAME), Bytes.toBytes("-1")),
+ Bytes.add(Bytes.toBytes(PerformanceEvaluation.TABLE_NAME), Bytes.toBytes("-2")) };
+
+ private static final byte[][] FAMILIES =
+ { Bytes.add(PerformanceEvaluation.FAMILY_NAME, Bytes.toBytes("-A")),
+ Bytes.add(PerformanceEvaluation.FAMILY_NAME, Bytes.toBytes("-B")) };
+
+ private static final byte[] QUALIFIER = Bytes.toBytes("data");
+
+ public static void main(String[] args) throws Exception {
+ new TestMultiHFileOutputFormat().testWritingDataIntoHFiles();
+ }
+
+ /**
+ * Run small MR job. this MR job will write HFile into
+ * testWritingDataIntoHFiles/tableNames/columFamilies/
+ */
+ @Test
+ public void testWritingDataIntoHFiles() throws Exception {
+ Configuration conf = util.getConfiguration();
+ util.startMiniCluster();
+ Path testDir = util.getDataTestDirOnTestFS("testWritingDataIntoHFiles");
+ FileSystem fs = testDir.getFileSystem(conf);
+ LOG.info("testWritingDataIntoHFiles dir writing to dir: " + testDir);
+
+ // Set down this value or we OOME in eclipse.
+ conf.setInt("mapreduce.task.io.sort.mb", 20);
+ // Write a few files by setting max file size.
+ conf.setLong(HConstants.HREGION_MAX_FILESIZE, 64 * 1024);
+
+ try {
+ Job job = Job.getInstance(conf, "testWritingDataIntoHFiles");
+
+ FileOutputFormat.setOutputPath(job, testDir);
+
+ job.setInputFormatClass(NMapInputFormat.class);
+ job.setMapperClass(Random_TableKV_GeneratingMapper.class);
+ job.setMapOutputKeyClass(ImmutableBytesWritable.class);
+ job.setMapOutputValueClass(KeyValue.class);
+ job.setReducerClass(Table_KeyValueSortReducer.class);
+ job.setOutputFormatClass(MultiHFileOutputFormat.class);
+ job.getConfiguration().setStrings("io.serializations", conf.get("io.serializations"),
+ MutationSerialization.class.getName(), ResultSerialization.class.getName(),
+ KeyValueSerialization.class.getName());
+
+ TableMapReduceUtil.addDependencyJars(job);
+ TableMapReduceUtil.initCredentials(job);
+ LOG.info("\nStarting test testWritingDataIntoHFiles\n");
+ assertTrue(job.waitForCompletion(true));
+ LOG.info("\nWaiting on checking MapReduce output\n");
+ assertTrue(checkMROutput(fs, testDir, 0));
+ } finally {
+ testDir.getFileSystem(conf).delete(testDir, true);
+ util.shutdownMiniCluster();
+ }
+ }
+
+ /**
+ * MR will output a 3 level directory, tableName->ColumnFamilyName->HFile this method to check the
+ * created directory is correct or not A recursion method, the testDir had better be small size
+ */
+ private boolean checkMROutput(FileSystem fs, Path testDir, int level)
+ throws FileNotFoundException, IOException {
+ if (level >= 3) {
+ return HFile.isHFileFormat(fs, testDir);
+ }
+ FileStatus[] fStats = fs.listStatus(testDir);
+ if (fStats == null || fStats.length <= 0) {
+ LOG.info("Created directory format is not correct");
+ return false;
+ }
+
+ for (FileStatus stats : fStats) {
+ // skip the _SUCCESS file created by MapReduce
+ if (level == 0 && stats.getPath().getName().endsWith(FileOutputCommitter.SUCCEEDED_FILE_NAME))
+ continue;
+ if (level < 2 && !stats.isDirectory()) {
+ LOG.info("Created directory format is not correct");
+ return false;
+ }
+ boolean flag = checkMROutput(fs, stats.getPath(), level + 1);
+ if (flag == false) return false;
+ }
+ return true;
+ }
+
+ /**
+ * Simple mapper that makes <TableName, KeyValue> output. With no input data
+ */
+ static class Random_TableKV_GeneratingMapper
+ extends Mapper<NullWritable, NullWritable, ImmutableBytesWritable, Cell> {
+
+ private int keyLength;
+ private int valLength;
+
+ @Override
+ protected void setup(Context context) throws IOException, InterruptedException {
+ super.setup(context);
+
+ Configuration conf = context.getConfiguration();
+ keyLength = conf.getInt(KEYLEN_CONF, KEYLEN_DEFAULT);
+ valLength = conf.getInt(VALLEN_CONF, VALLEN_DEFAULT);
+ }
+
+ @Override
+ protected void map(NullWritable n1, NullWritable n2,
+ Mapper<NullWritable, NullWritable, ImmutableBytesWritable, Cell>.Context context)
+ throws java.io.IOException, InterruptedException {
+
+ byte keyBytes[] = new byte[keyLength];
+ byte valBytes[] = new byte[valLength];
+
+ ArrayList<ImmutableBytesWritable> tables = new ArrayList<ImmutableBytesWritable>();
+ for (int i = 0; i < TABLES.length; i++) {
+ tables.add(new ImmutableBytesWritable(TABLES[i]));
+ }
+
+ int taskId = context.getTaskAttemptID().getTaskID().getId();
+ assert taskId < Byte.MAX_VALUE : "Unit tests dont support > 127 tasks!";
+ Random random = new Random();
+
+ for (int i = 0; i < ROWSPERSPLIT; i++) {
+ random.nextBytes(keyBytes);
+ // Ensure that unique tasks generate unique keys
+ keyBytes[keyLength - 1] = (byte) (taskId & 0xFF);
+ random.nextBytes(valBytes);
+
+ for (ImmutableBytesWritable table : tables) {
+ for (byte[] family : FAMILIES) {
+ Cell kv = new KeyValue(keyBytes, family, QUALIFIER, valBytes);
+ context.write(table, kv);
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * Simple Reducer that have input <TableName, KeyValue>, with KeyValues have no order. and output
+ * <TableName, KeyValue>, with KeyValues are ordered
+ */
+
+ static class Table_KeyValueSortReducer
+ extends Reducer<ImmutableBytesWritable, KeyValue, ImmutableBytesWritable, KeyValue> {
+ protected void reduce(ImmutableBytesWritable table, java.lang.Iterable<KeyValue> kvs,
+ org.apache.hadoop.mapreduce.Reducer<ImmutableBytesWritable, KeyValue, ImmutableBytesWritable, KeyValue>.Context context)
+ throws java.io.IOException, InterruptedException {
+ TreeSet<KeyValue> map = new TreeSet<KeyValue>(KeyValue.COMPARATOR);
+ for (KeyValue kv : kvs) {
+ try {
+ map.add(kv.clone());
+ } catch (CloneNotSupportedException e) {
+ throw new java.io.IOException(e);
+ }
+ }
+ context.setStatus("Read " + map.getClass());
+ int index = 0;
+ for (KeyValue kv : map) {
+ context.write(table, kv);
+ if (++index % 100 == 0) context.setStatus("Wrote " + index);
+ }
+ }
+ }
+
+}