You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by je...@apache.org on 2016/07/16 04:07:56 UTC

hbase git commit: HBASE-3727 MultiHFileOutputFormat (yi liang)

Repository: hbase
Updated Branches:
  refs/heads/master 84b7010a3 -> 60847a2d7


HBASE-3727 MultiHFileOutputFormat (yi liang)


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/60847a2d
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/60847a2d
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/60847a2d

Branch: refs/heads/master
Commit: 60847a2d76163ff40df94a980e1bd3f837ff9d71
Parents: 84b7010
Author: Jerry He <je...@apache.org>
Authored: Fri Jul 15 21:07:12 2016 -0700
Committer: Jerry He <je...@apache.org>
Committed: Fri Jul 15 21:07:12 2016 -0700

----------------------------------------------------------------------
 .../hbase/mapreduce/HFileOutputFormat2.java     | 409 ++++++++++---------
 .../hbase/mapreduce/MultiHFileOutputFormat.java | 102 +++++
 .../mapreduce/TestMultiHFileOutputFormat.java   | 224 ++++++++++
 3 files changed, 546 insertions(+), 189 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/60847a2d/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java
index b8a5475..22a73c9 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java
@@ -67,6 +67,7 @@ import org.apache.hadoop.hbase.regionserver.HStore;
 import org.apache.hadoop.hbase.regionserver.StoreFile;
 import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Text;
@@ -131,224 +132,254 @@ public class HFileOutputFormat2
   }
 
   static <V extends Cell> RecordWriter<ImmutableBytesWritable, V>
-      createRecordWriter(final TaskAttemptContext context)
-          throws IOException {
-
-    // Get the path of the temporary output file
-    final Path outputPath = FileOutputFormat.getOutputPath(context);
-    final Path outputdir = new FileOutputCommitter(outputPath, context).getWorkPath();
-    final Configuration conf = context.getConfiguration();
-    final FileSystem fs = outputdir.getFileSystem(conf);
-    // These configs. are from hbase-*.xml
-    final long maxsize = conf.getLong(HConstants.HREGION_MAX_FILESIZE,
-        HConstants.DEFAULT_MAX_FILE_SIZE);
-    // Invented config.  Add to hbase-*.xml if other than default compression.
-    final String defaultCompressionStr = conf.get("hfile.compression",
-        Compression.Algorithm.NONE.getName());
-    final Algorithm defaultCompression = HFileWriterImpl
-        .compressionByName(defaultCompressionStr);
-    final boolean compactionExclude = conf.getBoolean(
-        "hbase.mapreduce.hfileoutputformat.compaction.exclude", false);
-
-    // create a map from column family to the compression algorithm
-    final Map<byte[], Algorithm> compressionMap = createFamilyCompressionMap(conf);
-    final Map<byte[], BloomType> bloomTypeMap = createFamilyBloomTypeMap(conf);
-    final Map<byte[], Integer> blockSizeMap = createFamilyBlockSizeMap(conf);
-
-    String dataBlockEncodingStr = conf.get(DATABLOCK_ENCODING_OVERRIDE_CONF_KEY);
-    final Map<byte[], DataBlockEncoding> datablockEncodingMap
-        = createFamilyDataBlockEncodingMap(conf);
-    final DataBlockEncoding overriddenEncoding;
-    if (dataBlockEncodingStr != null) {
-      overriddenEncoding = DataBlockEncoding.valueOf(dataBlockEncodingStr);
-    } else {
-      overriddenEncoding = null;
+  createRecordWriter(final TaskAttemptContext context) throws IOException {
+    return new HFileRecordWriter<V>(context, null);
+  }
+
+  protected static class HFileRecordWriter<V extends Cell>
+      extends RecordWriter<ImmutableBytesWritable, V> {
+    private final TaskAttemptContext context;
+    private final Path outputPath;
+    private final Path outputDir;
+    private final Configuration conf;
+    private final FileSystem fs;
+
+    private final long maxsize;
+
+    private final Algorithm defaultCompression;
+    private final boolean compactionExclude;
+
+    private final Map<byte[], Algorithm> compressionMap;
+    private final Map<byte[], BloomType> bloomTypeMap;
+    private final Map<byte[], Integer> blockSizeMap;
+
+    private final Map<byte[], DataBlockEncoding> datablockEncodingMap;
+    private final DataBlockEncoding overriddenEncoding;
+
+    private final Map<byte[], WriterLength> writers;
+    private byte[] previousRow;
+    private final byte[] now;
+    private boolean rollRequested;
+
+    /**
+     * Mapredue job will create a temp path for outputting results. If out != null, it means that
+     * the caller has set the temp working dir; If out == null, it means we need to set it here.
+     * Used by HFileOutputFormat2 and MultiHFileOutputFormat. MultiHFileOutputFormat will give us
+     * temp working dir at the table level and HFileOutputFormat2 has to set it here within this
+     * constructor.
+     */
+    public HFileRecordWriter(final TaskAttemptContext taContext, final Path out)
+        throws IOException {
+      // Get the path of the temporary output file
+      context = taContext;
+
+      if (out == null) {
+        outputPath = FileOutputFormat.getOutputPath(context);
+        outputDir = new FileOutputCommitter(outputPath, context).getWorkPath();
+      } else {
+        outputPath = out;
+        outputDir = outputPath;
+      }
+
+      conf = context.getConfiguration();
+      fs = outputDir.getFileSystem(conf);
+
+      // These configs. are from hbase-*.xml
+      maxsize = conf.getLong(HConstants.HREGION_MAX_FILESIZE, HConstants.DEFAULT_MAX_FILE_SIZE);
+
+      // Invented config. Add to hbase-*.xml if other than default compression.
+      String defaultCompressionStr = conf.get("hfile.compression", Compression.Algorithm.NONE.getName());
+      defaultCompression = HFileWriterImpl.compressionByName(defaultCompressionStr);
+      compactionExclude =
+          conf.getBoolean("hbase.mapreduce.hfileoutputformat.compaction.exclude", false);
+
+      // create a map from column family to the compression algorithm
+      compressionMap = createFamilyCompressionMap(conf);
+      bloomTypeMap = createFamilyBloomTypeMap(conf);
+      blockSizeMap = createFamilyBlockSizeMap(conf);
+
+      // Config for data block encoding
+      String dataBlockEncodingStr = conf.get(DATABLOCK_ENCODING_OVERRIDE_CONF_KEY);
+      datablockEncodingMap = createFamilyDataBlockEncodingMap(conf);
+      if (dataBlockEncodingStr != null) {
+        overriddenEncoding = DataBlockEncoding.valueOf(dataBlockEncodingStr);
+      } else {
+        overriddenEncoding = null;
+      }
+
+      writers = new TreeMap<byte[], WriterLength>(Bytes.BYTES_COMPARATOR);
+      previousRow = HConstants.EMPTY_BYTE_ARRAY;
+      now = Bytes.toBytes(EnvironmentEdgeManager.currentTime());
+      rollRequested = false;
     }
 
-    return new RecordWriter<ImmutableBytesWritable, V>() {
-      // Map of families to writers and how much has been output on the writer.
-      private final Map<byte [], WriterLength> writers =
-        new TreeMap<byte [], WriterLength>(Bytes.BYTES_COMPARATOR);
-      private byte [] previousRow = HConstants.EMPTY_BYTE_ARRAY;
-      private final byte [] now = Bytes.toBytes(System.currentTimeMillis());
-      private boolean rollRequested = false;
-
-      @Override
-      public void write(ImmutableBytesWritable row, V cell)
-          throws IOException {
-        KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
-
-        // null input == user explicitly wants to flush
-        if (row == null && kv == null) {
-          rollWriters();
-          return;
-        }
+    @Override
+    public void write(ImmutableBytesWritable row, V cell) throws IOException {
+      KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
 
-        byte [] rowKey = CellUtil.cloneRow(kv);
-        long length = kv.getLength();
-        byte [] family = CellUtil.cloneFamily(kv);
-        WriterLength wl = this.writers.get(family);
+      // null input == user explicitly wants to flush
+      if (row == null && kv == null) {
+        rollWriters();
+        return;
+      }
 
-        // If this is a new column family, verify that the directory exists
-        if (wl == null) {
-          fs.mkdirs(new Path(outputdir, Bytes.toString(family)));
-        }
+      byte[] rowKey = CellUtil.cloneRow(kv);
+      long length = kv.getLength();
+      byte[] family = CellUtil.cloneFamily(kv);
+      WriterLength wl = this.writers.get(family);
 
-        // If any of the HFiles for the column families has reached
-        // maxsize, we need to roll all the writers
-        if (wl != null && wl.written + length >= maxsize) {
-          this.rollRequested = true;
-        }
+      // If this is a new column family, verify that the directory exists
+      if (wl == null) {
+        fs.mkdirs(new Path(outputDir, Bytes.toString(family)));
+      }
 
-        // This can only happen once a row is finished though
-        if (rollRequested && Bytes.compareTo(this.previousRow, rowKey) != 0) {
-          rollWriters();
-        }
+      // If any of the HFiles for the column families has reached
+      // maxsize, we need to roll all the writers
+      if (wl != null && wl.written + length >= maxsize) {
+        this.rollRequested = true;
+      }
 
-        // create a new WAL writer, if necessary
-        if (wl == null || wl.writer == null) {
-          if (conf.getBoolean(LOCALITY_SENSITIVE_CONF_KEY, DEFAULT_LOCALITY_SENSITIVE)) {
-            HRegionLocation loc = null;
-            String tableName = conf.get(OUTPUT_TABLE_NAME_CONF_KEY);
-            if (tableName != null) {
-              try (Connection connection = ConnectionFactory.createConnection(conf);
-                     RegionLocator locator =
-                       connection.getRegionLocator(TableName.valueOf(tableName))) {
-                loc = locator.getRegionLocation(rowKey);
-              } catch (Throwable e) {
-                LOG.warn("there's something wrong when locating rowkey: " +
-                  Bytes.toString(rowKey), e);
-                loc = null;
-              }
+      // This can only happen once a row is finished though
+      if (rollRequested && Bytes.compareTo(this.previousRow, rowKey) != 0) {
+        rollWriters();
+      }
+
+      // create a new WAL writer, if necessary
+      if (wl == null || wl.writer == null) {
+        if (conf.getBoolean(LOCALITY_SENSITIVE_CONF_KEY, DEFAULT_LOCALITY_SENSITIVE)) {
+          HRegionLocation loc = null;
+          String tableName = conf.get(OUTPUT_TABLE_NAME_CONF_KEY);
+          if (tableName != null) {
+            try (Connection connection = ConnectionFactory.createConnection(conf);
+                RegionLocator locator = connection.getRegionLocator(TableName.valueOf(tableName))) {
+              loc = locator.getRegionLocation(rowKey);
+            } catch (Throwable e) {
+              LOG.warn("there's something wrong when locating rowkey: " + Bytes.toString(rowKey),
+                  e);
+              loc = null;
             }
+          }
 
-            if (null == loc) {
+          if (null == loc) {
+            if (LOG.isTraceEnabled()) {
+              LOG.trace(
+                  "failed to get region location, so use default writer: " + Bytes.toString(rowKey));
+            }
+            wl = getNewWriter(family, conf, null);
+          } else {
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("first rowkey: [" + Bytes.toString(rowKey) + "]");
+            }
+            InetSocketAddress initialIsa = new InetSocketAddress(loc.getHostname(), loc.getPort());
+            if (initialIsa.isUnresolved()) {
               if (LOG.isTraceEnabled()) {
-                LOG.trace("failed to get region location, so use default writer: " +
-                  Bytes.toString(rowKey));
+                LOG.trace("failed to resolve bind address: " + loc.getHostname() + ":"
+                    + loc.getPort() + ", so use default writer");
               }
               wl = getNewWriter(family, conf, null);
             } else {
               if (LOG.isDebugEnabled()) {
-                LOG.debug("first rowkey: [" + Bytes.toString(rowKey) + "]");
-              }
-              InetSocketAddress initialIsa =
-                  new InetSocketAddress(loc.getHostname(), loc.getPort());
-              if (initialIsa.isUnresolved()) {
-                if (LOG.isTraceEnabled()) {
-                  LOG.trace("failed to resolve bind address: " + loc.getHostname() + ":"
-                      + loc.getPort() + ", so use default writer");
-                }
-                wl = getNewWriter(family, conf, null);
-              } else {
-                if(LOG.isDebugEnabled()) {
-                  LOG.debug("use favored nodes writer: " + initialIsa.getHostString());
-                }
-                wl = getNewWriter(family, conf, new InetSocketAddress[] { initialIsa });
+                LOG.debug("use favored nodes writer: " + initialIsa.getHostString());
               }
+              wl = getNewWriter(family, conf, new InetSocketAddress[] { initialIsa });
             }
-          } else {
-            wl = getNewWriter(family, conf, null);
           }
+        } else {
+          wl = getNewWriter(family, conf, null);
         }
+      }
 
-        // we now have the proper WAL writer. full steam ahead
-        kv.updateLatestStamp(this.now);
-        wl.writer.append(kv);
-        wl.written += length;
+      // we now have the proper WAL writer. full steam ahead
+      kv.updateLatestStamp(this.now);
+      wl.writer.append(kv);
+      wl.written += length;
 
-        // Copy the row so we know when a row transition.
-        this.previousRow = rowKey;
-      }
+      // Copy the row so we know when a row transition.
+      this.previousRow = rowKey;
+    }
 
-      private void rollWriters() throws IOException {
-        for (WriterLength wl : this.writers.values()) {
-          if (wl.writer != null) {
-            LOG.info("Writer=" + wl.writer.getPath() +
-                ((wl.written == 0)? "": ", wrote=" + wl.written));
-            close(wl.writer);
-          }
-          wl.writer = null;
-          wl.written = 0;
+    private void rollWriters() throws IOException {
+      for (WriterLength wl : this.writers.values()) {
+        if (wl.writer != null) {
+          LOG.info(
+              "Writer=" + wl.writer.getPath() + ((wl.written == 0) ? "" : ", wrote=" + wl.written));
+          close(wl.writer);
         }
-        this.rollRequested = false;
+        wl.writer = null;
+        wl.written = 0;
       }
+      this.rollRequested = false;
+    }
 
-      /* Create a new StoreFile.Writer.
-       * @param family
-       * @return A WriterLength, containing a new StoreFile.Writer.
-       * @throws IOException
-       */
-      @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="BX_UNBOXING_IMMEDIATELY_REBOXED",
-          justification="Not important")
-      private WriterLength getNewWriter(byte[] family, Configuration conf,
-          InetSocketAddress[] favoredNodes) throws IOException {
-        WriterLength wl = new WriterLength();
-        Path familydir = new Path(outputdir, Bytes.toString(family));
-        Algorithm compression = compressionMap.get(family);
-        compression = compression == null ? defaultCompression : compression;
-        BloomType bloomType = bloomTypeMap.get(family);
-        bloomType = bloomType == null ? BloomType.NONE : bloomType;
-        Integer blockSize = blockSizeMap.get(family);
-        blockSize = blockSize == null ? HConstants.DEFAULT_BLOCKSIZE : blockSize;
-        DataBlockEncoding encoding = overriddenEncoding;
-        encoding = encoding == null ? datablockEncodingMap.get(family) : encoding;
-        encoding = encoding == null ? DataBlockEncoding.NONE : encoding;
-        Configuration tempConf = new Configuration(conf);
-        tempConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0.0f);
-        HFileContextBuilder contextBuilder = new HFileContextBuilder()
-                                    .withCompression(compression)
-                                    .withChecksumType(HStore.getChecksumType(conf))
-                                    .withBytesPerCheckSum(HStore.getBytesPerChecksum(conf))
-                                    .withBlockSize(blockSize);
-
-        if (HFile.getFormatVersion(conf) >= HFile.MIN_FORMAT_VERSION_WITH_TAGS) {
-          contextBuilder.withIncludesTags(true);
-        }
-
-        contextBuilder.withDataBlockEncoding(encoding);
-        HFileContext hFileContext = contextBuilder.build();
-                                    
-        if (null == favoredNodes) {
-          wl.writer =
-              new StoreFileWriter.Builder(conf, new CacheConfig(tempConf), fs)
-                  .withOutputDir(familydir).withBloomType(bloomType)
-                  .withComparator(CellComparator.COMPARATOR).withFileContext(hFileContext).build();
-        } else {
-          wl.writer =
-              new StoreFileWriter.Builder(conf, new CacheConfig(tempConf), new HFileSystem(fs))
-                  .withOutputDir(familydir).withBloomType(bloomType)
-                  .withComparator(CellComparator.COMPARATOR).withFileContext(hFileContext)
-                  .withFavoredNodes(favoredNodes).build();
-        }
+    /*
+     * Create a new StoreFile.Writer.
+     * @param family
+     * @return A WriterLength, containing a new StoreFile.Writer.
+     * @throws IOException
+     */
+    @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "BX_UNBOXING_IMMEDIATELY_REBOXED",
+        justification = "Not important")
+    private WriterLength getNewWriter(byte[] family, Configuration conf,
+        InetSocketAddress[] favoredNodes) throws IOException {
+      WriterLength wl = new WriterLength();
+      Path familyDir = new Path(outputDir, Bytes.toString(family));
+      Algorithm compression = compressionMap.get(family);
+      compression = compression == null ? defaultCompression : compression;
+      BloomType bloomType = bloomTypeMap.get(family);
+      bloomType = bloomType == null ? BloomType.NONE : bloomType;
+      Integer blockSize = blockSizeMap.get(family);
+      blockSize = blockSize == null ? HConstants.DEFAULT_BLOCKSIZE : blockSize;
+      DataBlockEncoding encoding = overriddenEncoding;
+      encoding = encoding == null ? datablockEncodingMap.get(family) : encoding;
+      encoding = encoding == null ? DataBlockEncoding.NONE : encoding;
+      Configuration tempConf = new Configuration(conf);
+      tempConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0.0f);
+      HFileContextBuilder contextBuilder = new HFileContextBuilder().withCompression(compression)
+          .withChecksumType(HStore.getChecksumType(conf))
+          .withBytesPerCheckSum(HStore.getBytesPerChecksum(conf)).withBlockSize(blockSize);
+
+      if (HFile.getFormatVersion(conf) >= HFile.MIN_FORMAT_VERSION_WITH_TAGS) {
+        contextBuilder.withIncludesTags(true);
+      }
 
-        this.writers.put(family, wl);
-        return wl;
+      contextBuilder.withDataBlockEncoding(encoding);
+      HFileContext hFileContext = contextBuilder.build();
+
+      if (null == favoredNodes) {
+        wl.writer = new StoreFileWriter.Builder(conf, new CacheConfig(tempConf), fs)
+            .withOutputDir(familyDir).withBloomType(bloomType)
+            .withComparator(CellComparator.COMPARATOR).withFileContext(hFileContext).build();
+      } else {
+        wl.writer =
+            new StoreFileWriter.Builder(conf, new CacheConfig(tempConf), new HFileSystem(fs))
+                .withOutputDir(familyDir).withBloomType(bloomType)
+                .withComparator(CellComparator.COMPARATOR).withFileContext(hFileContext)
+                .withFavoredNodes(favoredNodes).build();
       }
 
-      private void close(final StoreFileWriter w) throws IOException {
-        if (w != null) {
-          w.appendFileInfo(StoreFile.BULKLOAD_TIME_KEY,
-              Bytes.toBytes(System.currentTimeMillis()));
-          w.appendFileInfo(StoreFile.BULKLOAD_TASK_KEY,
-              Bytes.toBytes(context.getTaskAttemptID().toString()));
-          w.appendFileInfo(StoreFile.MAJOR_COMPACTION_KEY,
-              Bytes.toBytes(true));
-          w.appendFileInfo(StoreFile.EXCLUDE_FROM_MINOR_COMPACTION_KEY,
-              Bytes.toBytes(compactionExclude));
-          w.appendTrackedTimestampsToMetadata();
-          w.close();
-        }
+      this.writers.put(family, wl);
+      return wl;
+    }
+
+    private void close(final StoreFileWriter w) throws IOException {
+      if (w != null) {
+        w.appendFileInfo(StoreFile.BULKLOAD_TIME_KEY, Bytes.toBytes(EnvironmentEdgeManager.currentTime()));
+        w.appendFileInfo(StoreFile.BULKLOAD_TASK_KEY,
+            Bytes.toBytes(context.getTaskAttemptID().toString()));
+        w.appendFileInfo(StoreFile.MAJOR_COMPACTION_KEY, Bytes.toBytes(true));
+        w.appendFileInfo(StoreFile.EXCLUDE_FROM_MINOR_COMPACTION_KEY,
+            Bytes.toBytes(compactionExclude));
+        w.appendTrackedTimestampsToMetadata();
+        w.close();
       }
+    }
 
-      @Override
-      public void close(TaskAttemptContext c)
-      throws IOException, InterruptedException {
-        for (WriterLength wl: this.writers.values()) {
-          close(wl.writer);
-        }
+    @Override
+    public void close(TaskAttemptContext c) throws IOException, InterruptedException {
+      for (WriterLength wl : this.writers.values()) {
+        close(wl.writer);
       }
-    };
+    }
   }
 
   /*
@@ -503,7 +534,7 @@ public class HFileOutputFormat2
     TableMapReduceUtil.initCredentials(job);
     LOG.info("Incremental table " + regionLocator.getName() + " output configured.");
   }
-  
+
   public static void configureIncrementalLoadMap(Job job, HTableDescriptor tableDescriptor) throws
       IOException {
     Configuration conf = job.getConfiguration();

http://git-wip-us.apache.org/repos/asf/hbase/blob/60847a2d/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiHFileOutputFormat.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiHFileOutputFormat.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiHFileOutputFormat.java
new file mode 100644
index 0000000..7c1ebbc
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiHFileOutputFormat.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable
+ * law or agreed to in writing, software distributed under the License is distributed on an "AS IS"
+ * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License
+ * for the specific language governing permissions and limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
+
+import com.google.common.annotations.VisibleForTesting;
+/**
+ * Create 3 level tree directory, first level is using table name as parent directory and then use
+ * family name as child directory, and all related HFiles for one family are under child directory
+ * -tableName1
+ *   -columnFamilyName1
+ *   -columnFamilyName2
+ *     -HFiles
+ * -tableName2
+ *   -columnFamilyName1
+ *     -HFiles
+ *   -columnFamilyName2
+ * <p>
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+@VisibleForTesting
+public class MultiHFileOutputFormat extends FileOutputFormat<ImmutableBytesWritable, Cell> {
+  private static final Log LOG = LogFactory.getLog(MultiHFileOutputFormat.class);
+
+  @Override
+  public RecordWriter<ImmutableBytesWritable, Cell>
+  getRecordWriter(final TaskAttemptContext context) throws IOException, InterruptedException {
+    return createMultiHFileRecordWriter(context);
+  }
+
+  static <V extends Cell> RecordWriter<ImmutableBytesWritable, V>
+  createMultiHFileRecordWriter(final TaskAttemptContext context) throws IOException {
+
+    // Get the path of the output directory
+    final Path outputPath = FileOutputFormat.getOutputPath(context);
+    final Path outputDir = new FileOutputCommitter(outputPath, context).getWorkPath();
+    final Configuration conf = context.getConfiguration();
+    final FileSystem fs = outputDir.getFileSystem(conf);
+
+    // Map of tables to writers
+    final Map<ImmutableBytesWritable, RecordWriter<ImmutableBytesWritable, V>> tableWriters =
+        new HashMap<ImmutableBytesWritable, RecordWriter<ImmutableBytesWritable, V>>();
+
+    return new RecordWriter<ImmutableBytesWritable, V>() {
+      @Override
+      public void write(ImmutableBytesWritable tableName, V cell)
+          throws IOException, InterruptedException {
+        RecordWriter<ImmutableBytesWritable, V> tableWriter = tableWriters.get(tableName);
+        // if there is new table, verify that table directory exists
+        if (tableWriter == null) {
+          // using table name as directory name
+          final Path tableOutputDir = new Path(outputDir, Bytes.toString(tableName.copyBytes()));
+          fs.mkdirs(tableOutputDir);
+          LOG.info("Writing Table '" + tableName.toString() + "' data into following directory"
+              + tableOutputDir.toString());
+
+          // Create writer for one specific table
+          tableWriter = new HFileOutputFormat2.HFileRecordWriter<V>(context, tableOutputDir);
+          // Put table into map
+          tableWriters.put(tableName, tableWriter);
+        }
+        // Write <Row, Cell> into tableWriter
+        // in the original code, it does not use Row
+        tableWriter.write(null, cell);
+      }
+
+      @Override
+      public void close(TaskAttemptContext c) throws IOException, InterruptedException {
+        for (RecordWriter<ImmutableBytesWritable, V> writer : tableWriters.values()) {
+          writer.close(c);
+        }
+      }
+    };
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/60847a2d/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultiHFileOutputFormat.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultiHFileOutputFormat.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultiHFileOutputFormat.java
new file mode 100644
index 0000000..738ae5f
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultiHFileOutputFormat.java
@@ -0,0 +1,224 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable
+ * law or agreed to in writing, software distributed under the License is distributed on an "AS IS"
+ * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License
+ * for the specific language governing permissions and limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import static org.junit.Assert.assertTrue;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Random;
+import java.util.TreeSet;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.*;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.io.hfile.HFile;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.FileOutputCommitter;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+/**
+ * Test for{@link MultiHFileOutputFormat}. Sets up and runs a mapreduce job that output directories and
+ * writes hfiles.
+ */
+@Category(MediumTests.class)
+public class TestMultiHFileOutputFormat {
+    private static final Log LOG = LogFactory.getLog(TestMultiHFileOutputFormat.class);
+
+    private HBaseTestingUtility util = new HBaseTestingUtility();
+
+    private static int ROWSPERSPLIT = 10;
+
+    private static final int KEYLEN_DEFAULT = 10;
+    private static final String KEYLEN_CONF = "randomkv.key.length";
+
+    private static final int VALLEN_DEFAULT = 10;
+    private static final String VALLEN_CONF = "randomkv.val.length";
+
+    private static final byte[][] TABLES =
+        { Bytes.add(Bytes.toBytes(PerformanceEvaluation.TABLE_NAME), Bytes.toBytes("-1")),
+            Bytes.add(Bytes.toBytes(PerformanceEvaluation.TABLE_NAME), Bytes.toBytes("-2")) };
+
+    private static final byte[][] FAMILIES =
+        { Bytes.add(PerformanceEvaluation.FAMILY_NAME, Bytes.toBytes("-A")),
+            Bytes.add(PerformanceEvaluation.FAMILY_NAME, Bytes.toBytes("-B")) };
+
+    private static final byte[] QUALIFIER = Bytes.toBytes("data");
+
+    public static void main(String[] args) throws Exception {
+        new TestMultiHFileOutputFormat().testWritingDataIntoHFiles();
+    }
+
+    /**
+     * Run small MR job. this MR job will write HFile into
+     * testWritingDataIntoHFiles/tableNames/columFamilies/
+     */
+    @Test
+    public void testWritingDataIntoHFiles() throws Exception {
+        Configuration conf = util.getConfiguration();
+        util.startMiniCluster();
+        Path testDir = util.getDataTestDirOnTestFS("testWritingDataIntoHFiles");
+        FileSystem fs = testDir.getFileSystem(conf);
+        LOG.info("testWritingDataIntoHFiles dir writing to dir: " + testDir);
+
+        // Set down this value or we OOME in eclipse.
+        conf.setInt("mapreduce.task.io.sort.mb", 20);
+        // Write a few files by setting max file size.
+        conf.setLong(HConstants.HREGION_MAX_FILESIZE, 64 * 1024);
+
+        try {
+            Job job = Job.getInstance(conf, "testWritingDataIntoHFiles");
+
+            FileOutputFormat.setOutputPath(job, testDir);
+
+            job.setInputFormatClass(NMapInputFormat.class);
+            job.setMapperClass(Random_TableKV_GeneratingMapper.class);
+            job.setMapOutputKeyClass(ImmutableBytesWritable.class);
+            job.setMapOutputValueClass(KeyValue.class);
+            job.setReducerClass(Table_KeyValueSortReducer.class);
+            job.setOutputFormatClass(MultiHFileOutputFormat.class);
+            job.getConfiguration().setStrings("io.serializations", conf.get("io.serializations"),
+                MutationSerialization.class.getName(), ResultSerialization.class.getName(),
+                KeyValueSerialization.class.getName());
+
+            TableMapReduceUtil.addDependencyJars(job);
+            TableMapReduceUtil.initCredentials(job);
+            LOG.info("\nStarting test testWritingDataIntoHFiles\n");
+            assertTrue(job.waitForCompletion(true));
+            LOG.info("\nWaiting on checking MapReduce output\n");
+            assertTrue(checkMROutput(fs, testDir, 0));
+        } finally {
+            testDir.getFileSystem(conf).delete(testDir, true);
+            util.shutdownMiniCluster();
+        }
+    }
+
+    /**
+     * MR will output a 3 level directory, tableName->ColumnFamilyName->HFile this method to check the
+     * created directory is correct or not A recursion method, the testDir had better be small size
+     */
+    private boolean checkMROutput(FileSystem fs, Path testDir, int level)
+        throws FileNotFoundException, IOException {
+        if (level >= 3) {
+            return HFile.isHFileFormat(fs, testDir);
+        }
+        FileStatus[] fStats = fs.listStatus(testDir);
+        if (fStats == null || fStats.length <= 0) {
+            LOG.info("Created directory format is not correct");
+            return false;
+        }
+
+        for (FileStatus stats : fStats) {
+            // skip the _SUCCESS file created by MapReduce
+            if (level == 0 && stats.getPath().getName().endsWith(FileOutputCommitter.SUCCEEDED_FILE_NAME))
+                continue;
+            if (level < 2 && !stats.isDirectory()) {
+                LOG.info("Created directory format is not correct");
+                return false;
+            }
+            boolean flag = checkMROutput(fs, stats.getPath(), level + 1);
+            if (flag == false) return false;
+        }
+        return true;
+    }
+
+    /**
+     * Simple mapper that makes <TableName, KeyValue> output. With no input data
+     */
+    static class Random_TableKV_GeneratingMapper
+        extends Mapper<NullWritable, NullWritable, ImmutableBytesWritable, Cell> {
+
+        private int keyLength;
+        private int valLength;
+
+        @Override
+        protected void setup(Context context) throws IOException, InterruptedException {
+            super.setup(context);
+
+            Configuration conf = context.getConfiguration();
+            keyLength = conf.getInt(KEYLEN_CONF, KEYLEN_DEFAULT);
+            valLength = conf.getInt(VALLEN_CONF, VALLEN_DEFAULT);
+        }
+
+        @Override
+        protected void map(NullWritable n1, NullWritable n2,
+            Mapper<NullWritable, NullWritable, ImmutableBytesWritable, Cell>.Context context)
+            throws java.io.IOException, InterruptedException {
+
+            byte keyBytes[] = new byte[keyLength];
+            byte valBytes[] = new byte[valLength];
+
+            ArrayList<ImmutableBytesWritable> tables = new ArrayList<ImmutableBytesWritable>();
+            for (int i = 0; i < TABLES.length; i++) {
+                tables.add(new ImmutableBytesWritable(TABLES[i]));
+            }
+
+            int taskId = context.getTaskAttemptID().getTaskID().getId();
+            assert taskId < Byte.MAX_VALUE : "Unit tests dont support > 127 tasks!";
+            Random random = new Random();
+
+            for (int i = 0; i < ROWSPERSPLIT; i++) {
+                random.nextBytes(keyBytes);
+                // Ensure that unique tasks generate unique keys
+                keyBytes[keyLength - 1] = (byte) (taskId & 0xFF);
+                random.nextBytes(valBytes);
+
+                for (ImmutableBytesWritable table : tables) {
+                    for (byte[] family : FAMILIES) {
+                        Cell kv = new KeyValue(keyBytes, family, QUALIFIER, valBytes);
+                        context.write(table, kv);
+                    }
+                }
+            }
+        }
+    }
+
+    /**
+     * Simple Reducer that have input <TableName, KeyValue>, with KeyValues have no order. and output
+     * <TableName, KeyValue>, with KeyValues are ordered
+     */
+
+    static class Table_KeyValueSortReducer
+        extends Reducer<ImmutableBytesWritable, KeyValue, ImmutableBytesWritable, KeyValue> {
+        protected void reduce(ImmutableBytesWritable table, java.lang.Iterable<KeyValue> kvs,
+            org.apache.hadoop.mapreduce.Reducer<ImmutableBytesWritable, KeyValue, ImmutableBytesWritable, KeyValue>.Context context)
+            throws java.io.IOException, InterruptedException {
+            TreeSet<KeyValue> map = new TreeSet<KeyValue>(KeyValue.COMPARATOR);
+            for (KeyValue kv : kvs) {
+                try {
+                    map.add(kv.clone());
+                } catch (CloneNotSupportedException e) {
+                    throw new java.io.IOException(e);
+                }
+            }
+            context.setStatus("Read " + map.getClass());
+            int index = 0;
+            for (KeyValue kv : map) {
+                context.write(table, kv);
+                if (++index % 100 == 0) context.setStatus("Wrote " + index);
+            }
+        }
+    }
+
+}