You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ap...@apache.org on 2017/08/26 01:39:37 UTC

[37/41] hbase git commit: HBASE-18640 Move mapreduce out of hbase-server into separate module.

http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java
new file mode 100644
index 0000000..b64271e
--- /dev/null
+++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java
@@ -0,0 +1,793 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import static java.lang.String.format;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.TableNotEnabledException;
+import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.RegionLocator;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Base64;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions;
+import org.apache.hadoop.hbase.shaded.com.google.common.base.Splitter;
+import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists;
+
+/**
+ * Tool to import data from a TSV file.
+ *
+ * This tool is rather simplistic - it doesn't do any quoting or
+ * escaping, but is useful for many data loads.
+ *
+ * @see ImportTsv#usage(String)
+ */
+@InterfaceAudience.Public
+public class ImportTsv extends Configured implements Tool {
+
+  protected static final Log LOG = LogFactory.getLog(ImportTsv.class);
+
+  final static String NAME = "importtsv";
+
+  public final static String MAPPER_CONF_KEY = "importtsv.mapper.class";
+  public final static String BULK_OUTPUT_CONF_KEY = "importtsv.bulk.output";
+  public final static String TIMESTAMP_CONF_KEY = "importtsv.timestamp";
+  public final static String JOB_NAME_CONF_KEY = "mapreduce.job.name";
+  // TODO: the rest of these configs are used exclusively by TsvImporterMapper.
+  // Move them out of the tool and let the mapper handle its own validation.
+  public final static String DRY_RUN_CONF_KEY = "importtsv.dry.run";
+  // If true, bad lines are logged to stderr. Default: false.
+  public final static String LOG_BAD_LINES_CONF_KEY = "importtsv.log.bad.lines";
+  public final static String SKIP_LINES_CONF_KEY = "importtsv.skip.bad.lines";
+  public final static String SKIP_EMPTY_COLUMNS = "importtsv.skip.empty.columns";
+  public final static String COLUMNS_CONF_KEY = "importtsv.columns";
+  public final static String SEPARATOR_CONF_KEY = "importtsv.separator";
+  public final static String ATTRIBUTE_SEPERATOR_CONF_KEY = "attributes.seperator";
+  //This config is used to propagate credentials from parent MR jobs which launch
+  //ImportTSV jobs. SEE IntegrationTestImportTsv.
+  public final static String CREDENTIALS_LOCATION = "credentials_location";
+  final static String DEFAULT_SEPARATOR = "\t";
+  final static String DEFAULT_ATTRIBUTES_SEPERATOR = "=>";
+  final static String DEFAULT_MULTIPLE_ATTRIBUTES_SEPERATOR = ",";
+  final static Class DEFAULT_MAPPER = TsvImporterMapper.class;
+  public final static String CREATE_TABLE_CONF_KEY = "create.table";
+  public final static String NO_STRICT_COL_FAMILY = "no.strict";
+  /**
+   * If table didn't exist and was created in dry-run mode, this flag is
+   * flipped to delete it when MR ends.
+   */
+  private static boolean DRY_RUN_TABLE_CREATED;
+
+  public static class TsvParser {
+    /**
+     * Column families and qualifiers mapped to the TSV columns
+     */
+    private final byte[][] families;
+    private final byte[][] qualifiers;
+
+    private final byte separatorByte;
+
+    private int rowKeyColumnIndex;
+
+    private int maxColumnCount;
+
+    // Default value must be negative
+    public static final int DEFAULT_TIMESTAMP_COLUMN_INDEX = -1;
+
+    private int timestampKeyColumnIndex = DEFAULT_TIMESTAMP_COLUMN_INDEX;
+
+    public static final String ROWKEY_COLUMN_SPEC = "HBASE_ROW_KEY";
+
+    public static final String TIMESTAMPKEY_COLUMN_SPEC = "HBASE_TS_KEY";
+
+    public static final String ATTRIBUTES_COLUMN_SPEC = "HBASE_ATTRIBUTES_KEY";
+
+    public static final String CELL_VISIBILITY_COLUMN_SPEC = "HBASE_CELL_VISIBILITY";
+
+    public static final String CELL_TTL_COLUMN_SPEC = "HBASE_CELL_TTL";
+
+    private int attrKeyColumnIndex = DEFAULT_ATTRIBUTES_COLUMN_INDEX;
+
+    public static final int DEFAULT_ATTRIBUTES_COLUMN_INDEX = -1;
+
+    public static final int DEFAULT_CELL_VISIBILITY_COLUMN_INDEX = -1;
+
+    public static final int DEFAULT_CELL_TTL_COLUMN_INDEX = -1;
+
+    private int cellVisibilityColumnIndex = DEFAULT_CELL_VISIBILITY_COLUMN_INDEX;
+
+    private int cellTTLColumnIndex = DEFAULT_CELL_TTL_COLUMN_INDEX;
+
+    /**
+     * @param columnsSpecification the list of columns to parser out, comma separated.
+     * The row key should be the special token TsvParser.ROWKEY_COLUMN_SPEC
+     * @param separatorStr
+     */
+    public TsvParser(String columnsSpecification, String separatorStr) {
+      // Configure separator
+      byte[] separator = Bytes.toBytes(separatorStr);
+      Preconditions.checkArgument(separator.length == 1,
+        "TsvParser only supports single-byte separators");
+      separatorByte = separator[0];
+
+      // Configure columns
+      ArrayList<String> columnStrings = Lists.newArrayList(
+        Splitter.on(',').trimResults().split(columnsSpecification));
+
+      maxColumnCount = columnStrings.size();
+      families = new byte[maxColumnCount][];
+      qualifiers = new byte[maxColumnCount][];
+
+      for (int i = 0; i < columnStrings.size(); i++) {
+        String str = columnStrings.get(i);
+        if (ROWKEY_COLUMN_SPEC.equals(str)) {
+          rowKeyColumnIndex = i;
+          continue;
+        }
+        if (TIMESTAMPKEY_COLUMN_SPEC.equals(str)) {
+          timestampKeyColumnIndex = i;
+          continue;
+        }
+        if (ATTRIBUTES_COLUMN_SPEC.equals(str)) {
+          attrKeyColumnIndex = i;
+          continue;
+        }
+        if (CELL_VISIBILITY_COLUMN_SPEC.equals(str)) {
+          cellVisibilityColumnIndex = i;
+          continue;
+        }
+        if (CELL_TTL_COLUMN_SPEC.equals(str)) {
+          cellTTLColumnIndex = i;
+          continue;
+        }
+        String[] parts = str.split(":", 2);
+        if (parts.length == 1) {
+          families[i] = str.getBytes();
+          qualifiers[i] = HConstants.EMPTY_BYTE_ARRAY;
+        } else {
+          families[i] = parts[0].getBytes();
+          qualifiers[i] = parts[1].getBytes();
+        }
+      }
+    }
+
+    public boolean hasTimestamp() {
+      return timestampKeyColumnIndex != DEFAULT_TIMESTAMP_COLUMN_INDEX;
+    }
+
+    public int getTimestampKeyColumnIndex() {
+      return timestampKeyColumnIndex;
+    }
+
+    public boolean hasAttributes() {
+      return attrKeyColumnIndex != DEFAULT_ATTRIBUTES_COLUMN_INDEX;
+    }
+
+    public boolean hasCellVisibility() {
+      return cellVisibilityColumnIndex != DEFAULT_CELL_VISIBILITY_COLUMN_INDEX;
+    }
+
+    public boolean hasCellTTL() {
+      return cellTTLColumnIndex != DEFAULT_CELL_VISIBILITY_COLUMN_INDEX;
+    }
+
+    public int getAttributesKeyColumnIndex() {
+      return attrKeyColumnIndex;
+    }
+
+    public int getCellVisibilityColumnIndex() {
+      return cellVisibilityColumnIndex;
+    }
+
+    public int getCellTTLColumnIndex() {
+      return cellTTLColumnIndex;
+    }
+
+    public int getRowKeyColumnIndex() {
+      return rowKeyColumnIndex;
+    }
+
+    public byte[] getFamily(int idx) {
+      return families[idx];
+    }
+    public byte[] getQualifier(int idx) {
+      return qualifiers[idx];
+    }
+
+    public ParsedLine parse(byte[] lineBytes, int length)
+    throws BadTsvLineException {
+      // Enumerate separator offsets
+      ArrayList<Integer> tabOffsets = new ArrayList<>(maxColumnCount);
+      for (int i = 0; i < length; i++) {
+        if (lineBytes[i] == separatorByte) {
+          tabOffsets.add(i);
+        }
+      }
+      if (tabOffsets.isEmpty()) {
+        throw new BadTsvLineException("No delimiter");
+      }
+
+      tabOffsets.add(length);
+
+      if (tabOffsets.size() > maxColumnCount) {
+        throw new BadTsvLineException("Excessive columns");
+      } else if (tabOffsets.size() <= getRowKeyColumnIndex()) {
+        throw new BadTsvLineException("No row key");
+      } else if (hasTimestamp()
+          && tabOffsets.size() <= getTimestampKeyColumnIndex()) {
+        throw new BadTsvLineException("No timestamp");
+      } else if (hasAttributes() && tabOffsets.size() <= getAttributesKeyColumnIndex()) {
+        throw new BadTsvLineException("No attributes specified");
+      } else if (hasCellVisibility() && tabOffsets.size() <= getCellVisibilityColumnIndex()) {
+        throw new BadTsvLineException("No cell visibility specified");
+      } else if (hasCellTTL() && tabOffsets.size() <= getCellTTLColumnIndex()) {
+        throw new BadTsvLineException("No cell TTL specified");
+      }
+      return new ParsedLine(tabOffsets, lineBytes);
+    }
+
+    class ParsedLine {
+      private final ArrayList<Integer> tabOffsets;
+      private byte[] lineBytes;
+
+      ParsedLine(ArrayList<Integer> tabOffsets, byte[] lineBytes) {
+        this.tabOffsets = tabOffsets;
+        this.lineBytes = lineBytes;
+      }
+
+      public int getRowKeyOffset() {
+        return getColumnOffset(rowKeyColumnIndex);
+      }
+      public int getRowKeyLength() {
+        return getColumnLength(rowKeyColumnIndex);
+      }
+
+      public long getTimestamp(long ts) throws BadTsvLineException {
+        // Return ts if HBASE_TS_KEY is not configured in column spec
+        if (!hasTimestamp()) {
+          return ts;
+        }
+
+        String timeStampStr = Bytes.toString(lineBytes,
+            getColumnOffset(timestampKeyColumnIndex),
+            getColumnLength(timestampKeyColumnIndex));
+        try {
+          return Long.parseLong(timeStampStr);
+        } catch (NumberFormatException nfe) {
+          // treat this record as bad record
+          throw new BadTsvLineException("Invalid timestamp " + timeStampStr);
+        }
+      }
+
+      private String getAttributes() {
+        if (!hasAttributes()) {
+          return null;
+        } else {
+          return Bytes.toString(lineBytes, getColumnOffset(attrKeyColumnIndex),
+              getColumnLength(attrKeyColumnIndex));
+        }
+      }
+
+      public String[] getIndividualAttributes() {
+        String attributes = getAttributes();
+        if (attributes != null) {
+          return attributes.split(DEFAULT_MULTIPLE_ATTRIBUTES_SEPERATOR);
+        } else {
+          return null;
+        }
+      }
+
+      public int getAttributeKeyOffset() {
+        if (hasAttributes()) {
+          return getColumnOffset(attrKeyColumnIndex);
+        } else {
+          return DEFAULT_ATTRIBUTES_COLUMN_INDEX;
+        }
+      }
+
+      public int getAttributeKeyLength() {
+        if (hasAttributes()) {
+          return getColumnLength(attrKeyColumnIndex);
+        } else {
+          return DEFAULT_ATTRIBUTES_COLUMN_INDEX;
+        }
+      }
+
+      public int getCellVisibilityColumnOffset() {
+        if (hasCellVisibility()) {
+          return getColumnOffset(cellVisibilityColumnIndex);
+        } else {
+          return DEFAULT_CELL_VISIBILITY_COLUMN_INDEX;
+        }
+      }
+
+      public int getCellVisibilityColumnLength() {
+        if (hasCellVisibility()) {
+          return getColumnLength(cellVisibilityColumnIndex);
+        } else {
+          return DEFAULT_CELL_VISIBILITY_COLUMN_INDEX;
+        }
+      }
+
+      public String getCellVisibility() {
+        if (!hasCellVisibility()) {
+          return null;
+        } else {
+          return Bytes.toString(lineBytes, getColumnOffset(cellVisibilityColumnIndex),
+              getColumnLength(cellVisibilityColumnIndex));
+        }
+      }
+
+      public int getCellTTLColumnOffset() {
+        if (hasCellTTL()) {
+          return getColumnOffset(cellTTLColumnIndex);
+        } else {
+          return DEFAULT_CELL_TTL_COLUMN_INDEX;
+        }
+      }
+
+      public int getCellTTLColumnLength() {
+        if (hasCellTTL()) {
+          return getColumnLength(cellTTLColumnIndex);
+        } else {
+          return DEFAULT_CELL_TTL_COLUMN_INDEX;
+        }
+      }
+
+      public long getCellTTL() {
+        if (!hasCellTTL()) {
+          return 0;
+        } else {
+          return Bytes.toLong(lineBytes, getColumnOffset(cellTTLColumnIndex),
+              getColumnLength(cellTTLColumnIndex));
+        }
+      }
+
+      public int getColumnOffset(int idx) {
+        if (idx > 0)
+          return tabOffsets.get(idx - 1) + 1;
+        else
+          return 0;
+      }
+      public int getColumnLength(int idx) {
+        return tabOffsets.get(idx) - getColumnOffset(idx);
+      }
+      public int getColumnCount() {
+        return tabOffsets.size();
+      }
+      public byte[] getLineBytes() {
+        return lineBytes;
+      }
+    }
+
+    public static class BadTsvLineException extends Exception {
+      public BadTsvLineException(String err) {
+        super(err);
+      }
+      private static final long serialVersionUID = 1L;
+    }
+
+    /**
+     * Return starting position and length of row key from the specified line bytes.
+     * @param lineBytes
+     * @param length
+     * @return Pair of row key offset and length.
+     * @throws BadTsvLineException
+     */
+    public Pair<Integer, Integer> parseRowKey(byte[] lineBytes, int length)
+        throws BadTsvLineException {
+      int rkColumnIndex = 0;
+      int startPos = 0, endPos = 0;
+      for (int i = 0; i <= length; i++) {
+        if (i == length || lineBytes[i] == separatorByte) {
+          endPos = i - 1;
+          if (rkColumnIndex++ == getRowKeyColumnIndex()) {
+            if ((endPos + 1) == startPos) {
+              throw new BadTsvLineException("Empty value for ROW KEY.");
+            }
+            break;
+          } else {
+            startPos = endPos + 2;
+          }
+        }
+        if (i == length) {
+          throw new BadTsvLineException(
+              "Row key does not exist as number of columns in the line"
+                  + " are less than row key position.");
+        }
+      }
+      return new Pair<>(startPos, endPos - startPos + 1);
+    }
+  }
+
+  /**
+   * Sets up the actual job.
+   *
+   * @param conf  The current configuration.
+   * @param args  The command line parameters.
+   * @return The newly created job.
+   * @throws IOException When setting up the job fails.
+   */
+  protected static Job createSubmittableJob(Configuration conf, String[] args)
+      throws IOException, ClassNotFoundException {
+    Job job = null;
+    boolean isDryRun = conf.getBoolean(DRY_RUN_CONF_KEY, false);
+    try (Connection connection = ConnectionFactory.createConnection(conf)) {
+      try (Admin admin = connection.getAdmin()) {
+        // Support non-XML supported characters
+        // by re-encoding the passed separator as a Base64 string.
+        String actualSeparator = conf.get(SEPARATOR_CONF_KEY);
+        if (actualSeparator != null) {
+          conf.set(SEPARATOR_CONF_KEY,
+              Base64.encodeBytes(actualSeparator.getBytes()));
+        }
+
+        // See if a non-default Mapper was set
+        String mapperClassName = conf.get(MAPPER_CONF_KEY);
+        Class mapperClass = mapperClassName != null? Class.forName(mapperClassName): DEFAULT_MAPPER;
+
+        TableName tableName = TableName.valueOf(args[0]);
+        Path inputDir = new Path(args[1]);
+        String jobName = conf.get(JOB_NAME_CONF_KEY,NAME + "_" + tableName.getNameAsString());
+        job = Job.getInstance(conf, jobName);
+        job.setJarByClass(mapperClass);
+        FileInputFormat.setInputPaths(job, inputDir);
+        job.setInputFormatClass(TextInputFormat.class);
+        job.setMapperClass(mapperClass);
+        job.setMapOutputKeyClass(ImmutableBytesWritable.class);
+        String hfileOutPath = conf.get(BULK_OUTPUT_CONF_KEY);
+        String[] columns = conf.getStrings(COLUMNS_CONF_KEY);
+        if(StringUtils.isNotEmpty(conf.get(CREDENTIALS_LOCATION))) {
+          String fileLoc = conf.get(CREDENTIALS_LOCATION);
+          Credentials cred = Credentials.readTokenStorageFile(new File(fileLoc), conf);
+          job.getCredentials().addAll(cred);
+        }
+
+        if (hfileOutPath != null) {
+          if (!admin.tableExists(tableName)) {
+            LOG.warn(format("Table '%s' does not exist.", tableName));
+            if ("yes".equalsIgnoreCase(conf.get(CREATE_TABLE_CONF_KEY, "yes"))) {
+              // TODO: this is backwards. Instead of depending on the existence of a table,
+              // create a sane splits file for HFileOutputFormat based on data sampling.
+              createTable(admin, tableName, columns);
+              if (isDryRun) {
+                LOG.warn("Dry run: Table will be deleted at end of dry run.");
+                synchronized (ImportTsv.class) {
+                  DRY_RUN_TABLE_CREATED = true;
+                }
+              }
+            } else {
+              String errorMsg =
+                  format("Table '%s' does not exist and '%s' is set to no.", tableName,
+                      CREATE_TABLE_CONF_KEY);
+              LOG.error(errorMsg);
+              throw new TableNotFoundException(errorMsg);
+            }
+          }
+          try (Table table = connection.getTable(tableName);
+              RegionLocator regionLocator = connection.getRegionLocator(tableName)) {
+            boolean noStrict = conf.getBoolean(NO_STRICT_COL_FAMILY, false);
+            // if no.strict is false then check column family
+            if(!noStrict) {
+              ArrayList<String> unmatchedFamilies = new ArrayList<>();
+              Set<String> cfSet = getColumnFamilies(columns);
+              TableDescriptor tDesc = table.getDescriptor();
+              for (String cf : cfSet) {
+                if(!tDesc.hasColumnFamily(Bytes.toBytes(cf))) {
+                  unmatchedFamilies.add(cf);
+                }
+              }
+              if(unmatchedFamilies.size() > 0) {
+                ArrayList<String> familyNames = new ArrayList<>();
+                for (ColumnFamilyDescriptor family : table.getDescriptor().getColumnFamilies()) {
+                  familyNames.add(family.getNameAsString());
+                }
+                String msg =
+                    "Column Families " + unmatchedFamilies + " specified in " + COLUMNS_CONF_KEY
+                    + " does not match with any of the table " + tableName
+                    + " column families " + familyNames + ".\n"
+                    + "To disable column family check, use -D" + NO_STRICT_COL_FAMILY
+                    + "=true.\n";
+                usage(msg);
+                System.exit(-1);
+              }
+            }
+            if (mapperClass.equals(TsvImporterTextMapper.class)) {
+              job.setMapOutputValueClass(Text.class);
+              job.setReducerClass(TextSortReducer.class);
+            } else {
+              job.setMapOutputValueClass(Put.class);
+              job.setCombinerClass(PutCombiner.class);
+              job.setReducerClass(PutSortReducer.class);
+            }
+            if (!isDryRun) {
+              Path outputDir = new Path(hfileOutPath);
+              FileOutputFormat.setOutputPath(job, outputDir);
+              HFileOutputFormat2.configureIncrementalLoad(job, table.getDescriptor(),
+                  regionLocator);
+            }
+          }
+        } else {
+          if (!admin.tableExists(tableName)) {
+            String errorMsg = format("Table '%s' does not exist.", tableName);
+            LOG.error(errorMsg);
+            throw new TableNotFoundException(errorMsg);
+          }
+          if (mapperClass.equals(TsvImporterTextMapper.class)) {
+            usage(TsvImporterTextMapper.class.toString()
+                + " should not be used for non bulkloading case. use "
+                + TsvImporterMapper.class.toString()
+                + " or custom mapper whose value type is Put.");
+            System.exit(-1);
+          }
+          if (!isDryRun) {
+            // No reducers. Just write straight to table. Call initTableReducerJob
+            // to set up the TableOutputFormat.
+            TableMapReduceUtil.initTableReducerJob(tableName.getNameAsString(), null, job);
+          }
+          job.setNumReduceTasks(0);
+        }
+        if (isDryRun) {
+          job.setOutputFormatClass(NullOutputFormat.class);
+          job.getConfiguration().setStrings("io.serializations",
+              job.getConfiguration().get("io.serializations"),
+              MutationSerialization.class.getName(), ResultSerialization.class.getName(),
+              KeyValueSerialization.class.getName());
+        }
+        TableMapReduceUtil.addDependencyJars(job);
+        TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(),
+            org.apache.hadoop.hbase.shaded.com.google.common.base.Function.class /* Guava used by TsvParser */);
+      }
+    }
+    return job;
+  }
+
+  private static void createTable(Admin admin, TableName tableName, String[] columns)
+      throws IOException {
+    HTableDescriptor htd = new HTableDescriptor(tableName);
+    Set<String> cfSet = getColumnFamilies(columns);
+    for (String cf : cfSet) {
+      HColumnDescriptor hcd = new HColumnDescriptor(Bytes.toBytes(cf));
+      htd.addFamily(hcd);
+    }
+    LOG.warn(format("Creating table '%s' with '%s' columns and default descriptors.",
+      tableName, cfSet));
+    admin.createTable(htd);
+  }
+
+  private static void deleteTable(Configuration conf, String[] args) {
+    TableName tableName = TableName.valueOf(args[0]);
+    try (Connection connection = ConnectionFactory.createConnection(conf);
+         Admin admin = connection.getAdmin()) {
+      try {
+        admin.disableTable(tableName);
+      } catch (TableNotEnabledException e) {
+        LOG.debug("Dry mode: Table: " + tableName + " already disabled, so just deleting it.");
+      }
+      admin.deleteTable(tableName);
+    } catch (IOException e) {
+      LOG.error(format("***Dry run: Failed to delete table '%s'.***%n%s", tableName,
+          e.toString()));
+      return;
+    }
+    LOG.info(format("Dry run: Deleted table '%s'.", tableName));
+  }
+
+  private static Set<String> getColumnFamilies(String[] columns) {
+    Set<String> cfSet = new HashSet<>();
+    for (String aColumn : columns) {
+      if (TsvParser.ROWKEY_COLUMN_SPEC.equals(aColumn)
+          || TsvParser.TIMESTAMPKEY_COLUMN_SPEC.equals(aColumn)
+          || TsvParser.CELL_VISIBILITY_COLUMN_SPEC.equals(aColumn)
+          || TsvParser.CELL_TTL_COLUMN_SPEC.equals(aColumn)
+          || TsvParser.ATTRIBUTES_COLUMN_SPEC.equals(aColumn))
+        continue;
+      // we are only concerned with the first one (in case this is a cf:cq)
+      cfSet.add(aColumn.split(":", 2)[0]);
+    }
+    return cfSet;
+  }
+
+  /*
+   * @param errorMsg Error message.  Can be null.
+   */
+  private static void usage(final String errorMsg) {
+    if (errorMsg != null && errorMsg.length() > 0) {
+      System.err.println("ERROR: " + errorMsg);
+    }
+    String usage =
+      "Usage: " + NAME + " -D"+ COLUMNS_CONF_KEY + "=a,b,c <tablename> <inputdir>\n" +
+      "\n" +
+      "Imports the given input directory of TSV data into the specified table.\n" +
+      "\n" +
+      "The column names of the TSV data must be specified using the -D" + COLUMNS_CONF_KEY + "\n" +
+      "option. This option takes the form of comma-separated column names, where each\n" +
+      "column name is either a simple column family, or a columnfamily:qualifier. The special\n" +
+      "column name " + TsvParser.ROWKEY_COLUMN_SPEC + " is used to designate that this column should be used\n" +
+      "as the row key for each imported record. You must specify exactly one column\n" +
+      "to be the row key, and you must specify a column name for every column that exists in the\n" +
+      "input data. Another special column" + TsvParser.TIMESTAMPKEY_COLUMN_SPEC +
+      " designates that this column should be\n" +
+      "used as timestamp for each record. Unlike " + TsvParser.ROWKEY_COLUMN_SPEC + ", " +
+      TsvParser.TIMESTAMPKEY_COLUMN_SPEC + " is optional." + "\n" +
+      "You must specify at most one column as timestamp key for each imported record.\n" +
+      "Record with invalid timestamps (blank, non-numeric) will be treated as bad record.\n" +
+      "Note: if you use this option, then '" + TIMESTAMP_CONF_KEY + "' option will be ignored.\n" +
+      "\n" +
+      "Other special columns that can be specified are " + TsvParser.CELL_TTL_COLUMN_SPEC +
+      " and " + TsvParser.CELL_VISIBILITY_COLUMN_SPEC + ".\n" +
+      TsvParser.CELL_TTL_COLUMN_SPEC + " designates that this column will be used " +
+      "as a Cell's Time To Live (TTL) attribute.\n" +
+      TsvParser.CELL_VISIBILITY_COLUMN_SPEC + " designates that this column contains the " +
+      "visibility label expression.\n" +
+      "\n" +
+      TsvParser.ATTRIBUTES_COLUMN_SPEC+" can be used to specify Operation Attributes per record.\n"+
+      " Should be specified as key=>value where "+TsvParser.DEFAULT_ATTRIBUTES_COLUMN_INDEX+ " is used \n"+
+      " as the seperator.  Note that more than one OperationAttributes can be specified.\n"+
+      "By default importtsv will load data directly into HBase. To instead generate\n" +
+      "HFiles of data to prepare for a bulk data load, pass the option:\n" +
+      "  -D" + BULK_OUTPUT_CONF_KEY + "=/path/for/output\n" +
+      "  Note: if you do not use this option, then the target table must already exist in HBase\n" +
+      "\n" +
+      "Other options that may be specified with -D include:\n" +
+      "  -D" + DRY_RUN_CONF_KEY + "=true - Dry run mode. Data is not actually populated into" +
+      " table. If table does not exist, it is created but deleted in the end.\n" +
+      "  -D" + SKIP_LINES_CONF_KEY + "=false - fail if encountering an invalid line\n" +
+      "  -D" + LOG_BAD_LINES_CONF_KEY + "=true - logs invalid lines to stderr\n" +
+      "  -D" + SKIP_EMPTY_COLUMNS + "=false - If true then skip empty columns in bulk import\n" +
+      "  '-D" + SEPARATOR_CONF_KEY + "=|' - eg separate on pipes instead of tabs\n" +
+      "  -D" + TIMESTAMP_CONF_KEY + "=currentTimeAsLong - use the specified timestamp for the import\n" +
+      "  -D" + MAPPER_CONF_KEY + "=my.Mapper - A user-defined Mapper to use instead of " +
+      DEFAULT_MAPPER.getName() + "\n" +
+      "  -D" + JOB_NAME_CONF_KEY + "=jobName - use the specified mapreduce job name for the import\n" +
+      "  -D" + CREATE_TABLE_CONF_KEY + "=no - can be used to avoid creation of table by this tool\n" +
+      "  Note: if you set this to 'no', then the target table must already exist in HBase\n" +
+      "  -D" + NO_STRICT_COL_FAMILY + "=true - ignore column family check in hbase table. " +
+      "Default is false\n\n" +
+      "For performance consider the following options:\n" +
+      "  -Dmapreduce.map.speculative=false\n" +
+      "  -Dmapreduce.reduce.speculative=false";
+
+    System.err.println(usage);
+  }
+
+  @Override
+  public int run(String[] args) throws Exception {
+    if (args.length < 2) {
+      usage("Wrong number of arguments: " + args.length);
+      return -1;
+    }
+
+    // When MAPPER_CONF_KEY is null, the user wants to use the provided TsvImporterMapper, so
+    // perform validation on these additional args. When it's not null, user has provided their
+    // own mapper, thus these validation are not relevant.
+    // TODO: validation for TsvImporterMapper, not this tool. Move elsewhere.
+    if (null == getConf().get(MAPPER_CONF_KEY)) {
+      // Make sure columns are specified
+      String[] columns = getConf().getStrings(COLUMNS_CONF_KEY);
+      if (columns == null) {
+        usage("No columns specified. Please specify with -D" +
+            COLUMNS_CONF_KEY+"=...");
+        return -1;
+      }
+
+      // Make sure they specify exactly one column as the row key
+      int rowkeysFound = 0;
+      for (String col : columns) {
+        if (col.equals(TsvParser.ROWKEY_COLUMN_SPEC)) rowkeysFound++;
+      }
+      if (rowkeysFound != 1) {
+        usage("Must specify exactly one column as " + TsvParser.ROWKEY_COLUMN_SPEC);
+        return -1;
+      }
+
+      // Make sure we have at most one column as the timestamp key
+      int tskeysFound = 0;
+      for (String col : columns) {
+        if (col.equals(TsvParser.TIMESTAMPKEY_COLUMN_SPEC))
+          tskeysFound++;
+      }
+      if (tskeysFound > 1) {
+        usage("Must specify at most one column as "
+            + TsvParser.TIMESTAMPKEY_COLUMN_SPEC);
+        return -1;
+      }
+
+      int attrKeysFound = 0;
+      for (String col : columns) {
+        if (col.equals(TsvParser.ATTRIBUTES_COLUMN_SPEC))
+          attrKeysFound++;
+      }
+      if (attrKeysFound > 1) {
+        usage("Must specify at most one column as "
+            + TsvParser.ATTRIBUTES_COLUMN_SPEC);
+        return -1;
+      }
+
+      // Make sure one or more columns are specified excluding rowkey and
+      // timestamp key
+      if (columns.length - (rowkeysFound + tskeysFound + attrKeysFound) < 1) {
+        usage("One or more columns in addition to the row key and timestamp(optional) are required");
+        return -1;
+      }
+    }
+
+    // If timestamp option is not specified, use current system time.
+    long timstamp = getConf().getLong(TIMESTAMP_CONF_KEY, System.currentTimeMillis());
+
+    // Set it back to replace invalid timestamp (non-numeric) with current
+    // system time
+    getConf().setLong(TIMESTAMP_CONF_KEY, timstamp);
+
+    synchronized (ImportTsv.class) {
+      DRY_RUN_TABLE_CREATED = false;
+    }
+    Job job = createSubmittableJob(getConf(), args);
+    boolean success = job.waitForCompletion(true);
+    boolean delete = false;
+    synchronized (ImportTsv.class) {
+      delete = DRY_RUN_TABLE_CREATED;
+    }
+    if (delete) {
+      deleteTable(getConf(), args);
+    }
+    return success ? 0 : 1;
+  }
+
+  public static void main(String[] args) throws Exception {
+    int status = ToolRunner.run(HBaseConfiguration.create(), new ImportTsv(), args);
+    System.exit(status);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/JarFinder.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/JarFinder.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/JarFinder.java
new file mode 100644
index 0000000..953df62
--- /dev/null
+++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/JarFinder.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions;
+
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URL;
+import java.net.URLDecoder;
+import java.text.MessageFormat;
+import java.util.Enumeration;
+import java.util.jar.JarFile;
+import java.util.jar.JarOutputStream;
+import java.util.jar.Manifest;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipOutputStream;
+
+/**
+ * Finds the Jar for a class. If the class is in a directory in the
+ * classpath, it creates a Jar on the fly with the contents of the directory
+ * and returns the path to that Jar. If a Jar is created, it is created in
+ * the system temporary directory.
+ *
+ * This file was forked from hadoop/common/branches/branch-2@1377176.
+ */
+public class JarFinder {
+
+  private static void copyToZipStream(File file, ZipEntry entry,
+                              ZipOutputStream zos) throws IOException {
+    InputStream is = new FileInputStream(file);
+    try {
+      zos.putNextEntry(entry);
+      byte[] arr = new byte[4096];
+      int read = is.read(arr);
+      while (read > -1) {
+        zos.write(arr, 0, read);
+        read = is.read(arr);
+      }
+    } finally {
+      try {
+        is.close();
+      } finally {
+        zos.closeEntry();
+      }
+    }
+  }
+
+  public static void jarDir(File dir, String relativePath, ZipOutputStream zos)
+    throws IOException {
+    Preconditions.checkNotNull(relativePath, "relativePath");
+    Preconditions.checkNotNull(zos, "zos");
+
+    // by JAR spec, if there is a manifest, it must be the first entry in the
+    // ZIP.
+    File manifestFile = new File(dir, JarFile.MANIFEST_NAME);
+    ZipEntry manifestEntry = new ZipEntry(JarFile.MANIFEST_NAME);
+    if (!manifestFile.exists()) {
+      zos.putNextEntry(manifestEntry);
+      new Manifest().write(new BufferedOutputStream(zos));
+      zos.closeEntry();
+    } else {
+      copyToZipStream(manifestFile, manifestEntry, zos);
+    }
+    zos.closeEntry();
+    zipDir(dir, relativePath, zos, true);
+    zos.close();
+  }
+
+  private static void zipDir(File dir, String relativePath, ZipOutputStream zos,
+                             boolean start) throws IOException {
+    String[] dirList = dir.list();
+    if (dirList == null) {
+      return;
+    }
+    for (String aDirList : dirList) {
+      File f = new File(dir, aDirList);
+      if (!f.isHidden()) {
+        if (f.isDirectory()) {
+          if (!start) {
+            ZipEntry dirEntry = new ZipEntry(relativePath + f.getName() + "/");
+            zos.putNextEntry(dirEntry);
+            zos.closeEntry();
+          }
+          String filePath = f.getPath();
+          File file = new File(filePath);
+          zipDir(file, relativePath + f.getName() + "/", zos, false);
+        }
+        else {
+          String path = relativePath + f.getName();
+          if (!path.equals(JarFile.MANIFEST_NAME)) {
+            ZipEntry anEntry = new ZipEntry(path);
+            copyToZipStream(f, anEntry, zos);
+          }
+        }
+      }
+    }
+  }
+
+  private static void createJar(File dir, File jarFile) throws IOException {
+    Preconditions.checkNotNull(dir, "dir");
+    Preconditions.checkNotNull(jarFile, "jarFile");
+    File jarDir = jarFile.getParentFile();
+    if (!jarDir.exists()) {
+      if (!jarDir.mkdirs()) {
+        throw new IOException(MessageFormat.format("could not create dir [{0}]",
+                                                   jarDir));
+      }
+    }
+    try (FileOutputStream fos = new FileOutputStream(jarFile);
+         JarOutputStream jos = new JarOutputStream(fos)) {
+      jarDir(dir, "", jos);
+    }
+  }
+
+  /**
+   * Returns the full path to the Jar containing the class. It always return a
+   * JAR.
+   *
+   * @param klass class.
+   *
+   * @return path to the Jar containing the class.
+   */
+  public static String getJar(Class klass) {
+    Preconditions.checkNotNull(klass, "klass");
+    ClassLoader loader = klass.getClassLoader();
+    if (loader != null) {
+      String class_file = klass.getName().replaceAll("\\.", "/") + ".class";
+      try {
+        for (Enumeration itr = loader.getResources(class_file);
+             itr.hasMoreElements(); ) {
+          URL url = (URL) itr.nextElement();
+          String path = url.getPath();
+          if (path.startsWith("file:")) {
+            path = path.substring("file:".length());
+          }
+          path = URLDecoder.decode(path, "UTF-8");
+          if ("jar".equals(url.getProtocol())) {
+            path = URLDecoder.decode(path, "UTF-8");
+            return path.replaceAll("!.*$", "");
+          }
+          else if ("file".equals(url.getProtocol())) {
+            String klassName = klass.getName();
+            klassName = klassName.replace(".", "/") + ".class";
+            path = path.substring(0, path.length() - klassName.length());
+            File baseDir = new File(path);
+            File testDir = new File(System.getProperty("test.build.dir", "target/test-dir"));
+            testDir = testDir.getAbsoluteFile();
+            if (!testDir.exists()) {
+              testDir.mkdirs();
+            }
+            File tempJar = File.createTempFile("hadoop-", "", testDir);
+            tempJar = new File(tempJar.getAbsolutePath() + ".jar");
+            tempJar.deleteOnExit();
+            createJar(baseDir, tempJar);
+            return tempJar.getAbsolutePath();
+          }
+        }
+      }
+      catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+    }
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/KeyValueSerialization.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/KeyValueSerialization.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/KeyValueSerialization.java
new file mode 100644
index 0000000..241608b
--- /dev/null
+++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/KeyValueSerialization.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValueUtil;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.io.serializer.Deserializer;
+import org.apache.hadoop.io.serializer.Serialization;
+import org.apache.hadoop.io.serializer.Serializer;
+
+@InterfaceAudience.Public
+public class KeyValueSerialization implements Serialization<KeyValue> {
+  @Override
+  public boolean accept(Class<?> c) {
+    return KeyValue.class.isAssignableFrom(c);
+  }
+
+  @Override
+  public KeyValueDeserializer getDeserializer(Class<KeyValue> t) {
+    return new KeyValueDeserializer();
+  }
+
+  @Override
+  public KeyValueSerializer getSerializer(Class<KeyValue> c) {
+    return new KeyValueSerializer();
+  }
+
+  public static class KeyValueDeserializer implements Deserializer<KeyValue> {
+    private DataInputStream dis;
+
+    @Override
+    public void close() throws IOException {
+      this.dis.close();
+    }
+
+    @Override
+    public KeyValue deserialize(KeyValue ignore) throws IOException {
+      // I can't overwrite the passed in KV, not from a proto kv, not just yet.  TODO
+      return KeyValueUtil.create(this.dis);
+    }
+
+    @Override
+    public void open(InputStream is) throws IOException {
+      this.dis = new DataInputStream(is);
+    }
+  }
+
+  public static class KeyValueSerializer implements Serializer<KeyValue> {
+    private DataOutputStream dos;
+
+    @Override
+    public void close() throws IOException {
+      this.dos.close();
+    }
+
+    @Override
+    public void open(OutputStream os) throws IOException {
+      this.dos = new DataOutputStream(os);
+    }
+
+    @Override
+    public void serialize(KeyValue kv) throws IOException {
+      KeyValueUtil.write(kv, this.dos);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/KeyValueSortReducer.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/KeyValueSortReducer.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/KeyValueSortReducer.java
new file mode 100644
index 0000000..997e5a8
--- /dev/null
+++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/KeyValueSortReducer.java
@@ -0,0 +1,57 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import java.util.TreeSet;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.CellComparator;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.mapreduce.Reducer;
+
+/**
+ * Emits sorted KeyValues.
+ * Reads in all KeyValues from passed Iterator, sorts them, then emits
+ * KeyValues in sorted order.  If lots of columns per row, it will use lots of
+ * memory sorting.
+ * @see HFileOutputFormat2
+ */
+@InterfaceAudience.Public
+public class KeyValueSortReducer
+    extends Reducer<ImmutableBytesWritable, KeyValue, ImmutableBytesWritable, KeyValue> {
+  protected void reduce(ImmutableBytesWritable row, Iterable<KeyValue> kvs,
+      Reducer<ImmutableBytesWritable, KeyValue, ImmutableBytesWritable, KeyValue>.Context context)
+  throws java.io.IOException, InterruptedException {
+    TreeSet<KeyValue> map = new TreeSet<>(CellComparator.COMPARATOR);
+    for (KeyValue kv: kvs) {
+      try {
+        map.add(kv.clone());
+      } catch (CloneNotSupportedException e) {
+        throw new java.io.IOException(e);
+      }
+    }
+    context.setStatus("Read " + map.getClass());
+    int index = 0;
+    for (KeyValue kv: map) {
+      context.write(row, kv);
+      if (++index % 100 == 0) context.setStatus("Wrote " + index);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableHFileOutputFormat.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableHFileOutputFormat.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableHFileOutputFormat.java
new file mode 100644
index 0000000..9f783f1
--- /dev/null
+++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableHFileOutputFormat.java
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.mapreduce;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.mapreduce.Job;
+
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.util.List;
+import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
+
+/**
+ * Create 3 level tree directory, first level is using table name as parent
+ * directory and then use family name as child directory, and all related HFiles
+ * for one family are under child directory
+ * -tableName1
+ *     -columnFamilyName1
+ *     -columnFamilyName2
+ *         -HFiles
+ * -tableName2
+ *     -columnFamilyName1
+ *         -HFiles
+ *     -columnFamilyName2
+ */
+@InterfaceAudience.Public
+@VisibleForTesting
+public class MultiTableHFileOutputFormat extends HFileOutputFormat2 {
+  private static final Log LOG = LogFactory.getLog(MultiTableHFileOutputFormat.class);
+
+  /**
+   * Creates a composite key to use as a mapper output key when using
+   * MultiTableHFileOutputFormat.configureIncrementaLoad to set up bulk ingest job
+   *
+   * @param tableName Name of the Table - Eg: TableName.getNameAsString()
+   * @param suffix    Usually represents a rowkey when creating a mapper key or column family
+   * @return          byte[] representation of composite key
+   */
+  public static byte[] createCompositeKey(byte[] tableName,
+                                          byte[] suffix) {
+    return combineTableNameSuffix(tableName, suffix);
+  }
+
+  /**
+   * Alternate api which accepts an ImmutableBytesWritable for the suffix
+   * @see MultiTableHFileOutputFormat#createCompositeKey(byte[], byte[])
+   */
+  public static byte[] createCompositeKey(byte[] tableName,
+                                          ImmutableBytesWritable suffix) {
+    return combineTableNameSuffix(tableName, suffix.get());
+  }
+
+  /**
+   * Alternate api which accepts a String for the tableName and ImmutableBytesWritable for the
+   * suffix
+   * @see MultiTableHFileOutputFormat#createCompositeKey(byte[], byte[])
+   */
+  public static byte[] createCompositeKey(String tableName,
+                                          ImmutableBytesWritable suffix) {
+    return combineTableNameSuffix(tableName.getBytes(Charset.forName("UTF-8")), suffix.get());
+  }
+
+  /**
+   * Analogous to
+   * {@link HFileOutputFormat2#configureIncrementalLoad(Job, TableDescriptor, RegionLocator)},
+   * this function will configure the requisite number of reducers to write HFiles for multple
+   * tables simultaneously
+   *
+   * @param job                   See {@link org.apache.hadoop.mapreduce.Job}
+   * @param multiTableDescriptors Table descriptor and region locator pairs
+   * @throws IOException
+   */
+  public static void configureIncrementalLoad(Job job, List<TableInfo>
+      multiTableDescriptors)
+      throws IOException {
+    MultiTableHFileOutputFormat.configureIncrementalLoad(job, multiTableDescriptors,
+            MultiTableHFileOutputFormat.class);
+  }
+
+  final private static int validateCompositeKey(byte[] keyBytes) {
+
+    int separatorIdx = Bytes.indexOf(keyBytes, tableSeparator);
+
+    // Either the separator was not found or a tablename wasn't present or a key wasn't present
+    if (separatorIdx == -1) {
+      throw new IllegalArgumentException("Invalid format for composite key [" + Bytes
+              .toStringBinary(keyBytes) + "]. Cannot extract tablename and suffix from key");
+    }
+    return separatorIdx;
+  }
+
+  protected static byte[] getTableName(byte[] keyBytes) {
+    int separatorIdx = validateCompositeKey(keyBytes);
+    return Bytes.copy(keyBytes, 0, separatorIdx);
+  }
+
+  protected static byte[] getSuffix(byte[] keyBytes) {
+    int separatorIdx = validateCompositeKey(keyBytes);
+    return Bytes.copy(keyBytes, separatorIdx+1, keyBytes.length - separatorIdx - 1);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableInputFormat.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableInputFormat.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableInputFormat.java
new file mode 100644
index 0000000..f8fb6dc
--- /dev/null
+++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableInputFormat.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Scan;
+
+/**
+ * Convert HBase tabular data from multiple scanners into a format that
+ * is consumable by Map/Reduce.
+ *
+ * <p>
+ * Usage example
+ * </p>
+ *
+ * <pre>
+ * List&lt;Scan&gt; scans = new ArrayList&lt;Scan&gt;();
+ *
+ * Scan scan1 = new Scan();
+ * scan1.setStartRow(firstRow1);
+ * scan1.setStopRow(lastRow1);
+ * scan1.setAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME, table1);
+ * scans.add(scan1);
+ *
+ * Scan scan2 = new Scan();
+ * scan2.setStartRow(firstRow2);
+ * scan2.setStopRow(lastRow2);
+ * scan1.setAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME, table2);
+ * scans.add(scan2);
+ *
+ * TableMapReduceUtil.initTableMapperJob(scans, TableMapper.class, Text.class,
+ *     IntWritable.class, job);
+ * </pre>
+ */
+@InterfaceAudience.Public
+public class MultiTableInputFormat extends MultiTableInputFormatBase implements
+    Configurable {
+
+  /** Job parameter that specifies the scan list. */
+  public static final String SCANS = "hbase.mapreduce.scans";
+
+  /** The configuration. */
+  private Configuration conf = null;
+
+  /**
+   * Returns the current configuration.
+   *
+   * @return The current configuration.
+   * @see org.apache.hadoop.conf.Configurable#getConf()
+   */
+  @Override
+  public Configuration getConf() {
+    return conf;
+  }
+
+  /**
+   * Sets the configuration. This is used to set the details for the tables to
+   *  be scanned.
+   *
+   * @param configuration The configuration to set.
+   * @see org.apache.hadoop.conf.Configurable#setConf(
+   *        org.apache.hadoop.conf.Configuration)
+   */
+  @Override
+  public void setConf(Configuration configuration) {
+    this.conf = configuration;
+    String[] rawScans = conf.getStrings(SCANS);
+    if (rawScans.length <= 0) {
+      throw new IllegalArgumentException("There must be at least 1 scan configuration set to : "
+          + SCANS);
+    }
+    List<Scan> scans = new ArrayList<>();
+
+    for (int i = 0; i < rawScans.length; i++) {
+      try {
+        scans.add(TableMapReduceUtil.convertStringToScan(rawScans[i]));
+      } catch (IOException e) {
+        throw new RuntimeException("Failed to convert Scan : " + rawScans[i] + " to string", e);
+      }
+    }
+    this.setScans(scans);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableInputFormatBase.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableInputFormatBase.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableInputFormatBase.java
new file mode 100644
index 0000000..5d541a6
--- /dev/null
+++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableInputFormatBase.java
@@ -0,0 +1,296 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import java.io.IOException;
+import java.text.MessageFormat;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.RegionLocator;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import java.util.Map;
+import java.util.HashMap;
+import java.util.Iterator;
+/**
+ * A base for {@link MultiTableInputFormat}s. Receives a list of
+ * {@link Scan} instances that define the input tables and
+ * filters etc. Subclasses may use other TableRecordReader implementations.
+ */
+@InterfaceAudience.Public
+public abstract class MultiTableInputFormatBase extends
+    InputFormat<ImmutableBytesWritable, Result> {
+
+  private static final Log LOG = LogFactory.getLog(MultiTableInputFormatBase.class);
+
+  /** Holds the set of scans used to define the input. */
+  private List<Scan> scans;
+
+  /** The reader scanning the table, can be a custom one. */
+  private TableRecordReader tableRecordReader = null;
+
+  /**
+   * Builds a TableRecordReader. If no TableRecordReader was provided, uses the
+   * default.
+   *
+   * @param split The split to work with.
+   * @param context The current context.
+   * @return The newly created record reader.
+   * @throws IOException When creating the reader fails.
+   * @throws InterruptedException when record reader initialization fails
+   * @see org.apache.hadoop.mapreduce.InputFormat#createRecordReader(
+   *      org.apache.hadoop.mapreduce.InputSplit,
+   *      org.apache.hadoop.mapreduce.TaskAttemptContext)
+   */
+  @Override
+  public RecordReader<ImmutableBytesWritable, Result> createRecordReader(
+      InputSplit split, TaskAttemptContext context)
+      throws IOException, InterruptedException {
+    TableSplit tSplit = (TableSplit) split;
+    LOG.info(MessageFormat.format("Input split length: {0} bytes.", tSplit.getLength()));
+
+    if (tSplit.getTable() == null) {
+      throw new IOException("Cannot create a record reader because of a"
+          + " previous error. Please look at the previous logs lines from"
+          + " the task's full log for more details.");
+    }
+    final Connection connection = ConnectionFactory.createConnection(context.getConfiguration());
+    Table table = connection.getTable(tSplit.getTable());
+
+    if (this.tableRecordReader == null) {
+      this.tableRecordReader = new TableRecordReader();
+    }
+    final TableRecordReader trr = this.tableRecordReader;
+
+    try {
+      Scan sc = tSplit.getScan();
+      sc.setStartRow(tSplit.getStartRow());
+      sc.setStopRow(tSplit.getEndRow());
+      trr.setScan(sc);
+      trr.setTable(table);
+      return new RecordReader<ImmutableBytesWritable, Result>() {
+
+        @Override
+        public void close() throws IOException {
+          trr.close();
+          connection.close();
+        }
+
+        @Override
+        public ImmutableBytesWritable getCurrentKey() throws IOException, InterruptedException {
+          return trr.getCurrentKey();
+        }
+
+        @Override
+        public Result getCurrentValue() throws IOException, InterruptedException {
+          return trr.getCurrentValue();
+        }
+
+        @Override
+        public float getProgress() throws IOException, InterruptedException {
+          return trr.getProgress();
+        }
+
+        @Override
+        public void initialize(InputSplit inputsplit, TaskAttemptContext context)
+            throws IOException, InterruptedException {
+          trr.initialize(inputsplit, context);
+        }
+
+        @Override
+        public boolean nextKeyValue() throws IOException, InterruptedException {
+          return trr.nextKeyValue();
+        }
+      };
+    } catch (IOException ioe) {
+      // If there is an exception make sure that all
+      // resources are closed and released.
+      trr.close();
+      connection.close();
+      throw ioe;
+    }
+  }
+
+  /**
+   * Calculates the splits that will serve as input for the map tasks. The
+   * number of splits matches the number of regions in a table.
+   *
+   * @param context The current job context.
+   * @return The list of input splits.
+   * @throws IOException When creating the list of splits fails.
+   * @see org.apache.hadoop.mapreduce.InputFormat#getSplits(org.apache.hadoop.mapreduce.JobContext)
+   */
+  @Override
+  public List<InputSplit> getSplits(JobContext context) throws IOException {
+    if (scans.isEmpty()) {
+      throw new IOException("No scans were provided.");
+    }
+
+    Map<TableName, List<Scan>> tableMaps = new HashMap<>();
+    for (Scan scan : scans) {
+      byte[] tableNameBytes = scan.getAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME);
+      if (tableNameBytes == null)
+        throw new IOException("A scan object did not have a table name");
+
+      TableName tableName = TableName.valueOf(tableNameBytes);
+
+      List<Scan> scanList = tableMaps.get(tableName);
+      if (scanList == null) {
+        scanList = new ArrayList<>();
+        tableMaps.put(tableName, scanList);
+      }
+      scanList.add(scan);
+    }
+
+    List<InputSplit> splits = new ArrayList<>();
+    Iterator iter = tableMaps.entrySet().iterator();
+    while (iter.hasNext()) {
+      Map.Entry<TableName, List<Scan>> entry = (Map.Entry<TableName, List<Scan>>) iter.next();
+      TableName tableName = entry.getKey();
+      List<Scan> scanList = entry.getValue();
+
+      try (Connection conn = ConnectionFactory.createConnection(context.getConfiguration());
+        Table table = conn.getTable(tableName);
+        RegionLocator regionLocator = conn.getRegionLocator(tableName)) {
+        RegionSizeCalculator sizeCalculator = new RegionSizeCalculator(
+                regionLocator, conn.getAdmin());
+        Pair<byte[][], byte[][]> keys = regionLocator.getStartEndKeys();
+        for (Scan scan : scanList) {
+          if (keys == null || keys.getFirst() == null || keys.getFirst().length == 0) {
+            throw new IOException("Expecting at least one region for table : "
+                    + tableName.getNameAsString());
+          }
+          int count = 0;
+
+          byte[] startRow = scan.getStartRow();
+          byte[] stopRow = scan.getStopRow();
+
+          for (int i = 0; i < keys.getFirst().length; i++) {
+            if (!includeRegionInSplit(keys.getFirst()[i], keys.getSecond()[i])) {
+              continue;
+            }
+
+            if ((startRow.length == 0 || keys.getSecond()[i].length == 0 ||
+                    Bytes.compareTo(startRow, keys.getSecond()[i]) < 0) &&
+                    (stopRow.length == 0 || Bytes.compareTo(stopRow,
+                            keys.getFirst()[i]) > 0)) {
+              byte[] splitStart = startRow.length == 0 ||
+                      Bytes.compareTo(keys.getFirst()[i], startRow) >= 0 ?
+                      keys.getFirst()[i] : startRow;
+              byte[] splitStop = (stopRow.length == 0 ||
+                      Bytes.compareTo(keys.getSecond()[i], stopRow) <= 0) &&
+                      keys.getSecond()[i].length > 0 ?
+                      keys.getSecond()[i] : stopRow;
+
+              HRegionLocation hregionLocation = regionLocator.getRegionLocation(
+                      keys.getFirst()[i], false);
+              String regionHostname = hregionLocation.getHostname();
+              HRegionInfo regionInfo = hregionLocation.getRegionInfo();
+              String encodedRegionName = regionInfo.getEncodedName();
+              long regionSize = sizeCalculator.getRegionSize(
+                      regionInfo.getRegionName());
+
+              TableSplit split = new TableSplit(table.getName(),
+                      scan, splitStart, splitStop, regionHostname,
+                      encodedRegionName, regionSize);
+
+              splits.add(split);
+
+              if (LOG.isDebugEnabled())
+                LOG.debug("getSplits: split -> " + (count++) + " -> " + split);
+            }
+          }
+        }
+      }
+    }
+
+    return splits;
+  }
+
+  /**
+   * Test if the given region is to be included in the InputSplit while
+   * splitting the regions of a table.
+   * <p>
+   * This optimization is effective when there is a specific reasoning to
+   * exclude an entire region from the M-R job, (and hence, not contributing to
+   * the InputSplit), given the start and end keys of the same. <br>
+   * Useful when we need to remember the last-processed top record and revisit
+   * the [last, current) interval for M-R processing, continuously. In addition
+   * to reducing InputSplits, reduces the load on the region server as well, due
+   * to the ordering of the keys. <br>
+   * <br>
+   * Note: It is possible that <code>endKey.length() == 0 </code> , for the last
+   * (recent) region. <br>
+   * Override this method, if you want to bulk exclude regions altogether from
+   * M-R. By default, no region is excluded( i.e. all regions are included).
+   *
+   * @param startKey Start key of the region
+   * @param endKey End key of the region
+   * @return true, if this region needs to be included as part of the input
+   *         (default).
+   */
+  protected boolean includeRegionInSplit(final byte[] startKey,
+      final byte[] endKey) {
+    return true;
+  }
+
+  /**
+   * Allows subclasses to get the list of {@link Scan} objects.
+   */
+  protected List<Scan> getScans() {
+    return this.scans;
+  }
+
+  /**
+   * Allows subclasses to set the list of {@link Scan} objects.
+   *
+   * @param scans The list of {@link Scan} used to define the input
+   */
+  protected void setScans(List<Scan> scans) {
+    this.scans = scans;
+  }
+
+  /**
+   * Allows subclasses to set the {@link TableRecordReader}.
+   *
+   * @param tableRecordReader A different {@link TableRecordReader}
+   *          implementation.
+   */
+  protected void setTableRecordReader(TableRecordReader tableRecordReader) {
+    this.tableRecordReader = tableRecordReader;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableOutputFormat.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableOutputFormat.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableOutputFormat.java
new file mode 100644
index 0000000..4cc784f
--- /dev/null
+++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableOutputFormat.java
@@ -0,0 +1,176 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.BufferedMutator;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Durability;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+/**
+ * <p>
+ * Hadoop output format that writes to one or more HBase tables. The key is
+ * taken to be the table name while the output value <em>must</em> be either a
+ * {@link Put} or a {@link Delete} instance. All tables must already exist, and
+ * all Puts and Deletes must reference only valid column families.
+ * </p>
+ *
+ * <p>
+ * Write-ahead logging (WAL) for Puts can be disabled by setting
+ * {@link #WAL_PROPERTY} to {@link #WAL_OFF}. Default value is {@link #WAL_ON}.
+ * Note that disabling write-ahead logging is only appropriate for jobs where
+ * loss of data due to region server failure can be tolerated (for example,
+ * because it is easy to rerun a bulk import).
+ * </p>
+ */
+@InterfaceAudience.Public
+public class MultiTableOutputFormat extends OutputFormat<ImmutableBytesWritable, Mutation> {
+  /** Set this to {@link #WAL_OFF} to turn off write-ahead logging (WAL) */
+  public static final String WAL_PROPERTY = "hbase.mapreduce.multitableoutputformat.wal";
+  /** Property value to use write-ahead logging */
+  public static final boolean WAL_ON = true;
+  /** Property value to disable write-ahead logging */
+  public static final boolean WAL_OFF = false;
+  /**
+   * Record writer for outputting to multiple HTables.
+   */
+  protected static class MultiTableRecordWriter extends
+      RecordWriter<ImmutableBytesWritable, Mutation> {
+    private static final Log LOG = LogFactory.getLog(MultiTableRecordWriter.class);
+    Connection connection;
+    Map<ImmutableBytesWritable, BufferedMutator> mutatorMap = new HashMap<>();
+    Configuration conf;
+    boolean useWriteAheadLogging;
+
+    /**
+     * @param conf
+     *          HBaseConfiguration to used
+     * @param useWriteAheadLogging
+     *          whether to use write ahead logging. This can be turned off (
+     *          <tt>false</tt>) to improve performance when bulk loading data.
+     */
+    public MultiTableRecordWriter(Configuration conf,
+        boolean useWriteAheadLogging) throws IOException {
+      LOG.debug("Created new MultiTableRecordReader with WAL "
+          + (useWriteAheadLogging ? "on" : "off"));
+      this.conf = conf;
+      this.useWriteAheadLogging = useWriteAheadLogging;
+    }
+
+    /**
+     * @param tableName
+     *          the name of the table, as a string
+     * @return the named mutator
+     * @throws IOException
+     *           if there is a problem opening a table
+     */
+    BufferedMutator getBufferedMutator(ImmutableBytesWritable tableName) throws IOException {
+      if(this.connection == null){
+        this.connection = ConnectionFactory.createConnection(conf);
+      }
+      if (!mutatorMap.containsKey(tableName)) {
+        LOG.debug("Opening HTable \"" + Bytes.toString(tableName.get())+ "\" for writing");
+
+        BufferedMutator mutator =
+            connection.getBufferedMutator(TableName.valueOf(tableName.get()));
+        mutatorMap.put(tableName, mutator);
+      }
+      return mutatorMap.get(tableName);
+    }
+
+    @Override
+    public void close(TaskAttemptContext context) throws IOException {
+      for (BufferedMutator mutator : mutatorMap.values()) {
+        mutator.close();
+      }
+      if (connection != null) {
+        connection.close();
+      }
+    }
+
+    /**
+     * Writes an action (Put or Delete) to the specified table.
+     *
+     * @param tableName
+     *          the table being updated.
+     * @param action
+     *          the update, either a put or a delete.
+     * @throws IllegalArgumentException
+     *          if the action is not a put or a delete.
+     */
+    @Override
+    public void write(ImmutableBytesWritable tableName, Mutation action) throws IOException {
+      BufferedMutator mutator = getBufferedMutator(tableName);
+      // The actions are not immutable, so we defensively copy them
+      if (action instanceof Put) {
+        Put put = new Put((Put) action);
+        put.setDurability(useWriteAheadLogging ? Durability.SYNC_WAL
+            : Durability.SKIP_WAL);
+        mutator.mutate(put);
+      } else if (action instanceof Delete) {
+        Delete delete = new Delete((Delete) action);
+        mutator.mutate(delete);
+      } else
+        throw new IllegalArgumentException(
+            "action must be either Delete or Put");
+    }
+  }
+
+  @Override
+  public void checkOutputSpecs(JobContext context) throws IOException,
+      InterruptedException {
+    // we can't know ahead of time if it's going to blow up when the user
+    // passes a table name that doesn't exist, so nothing useful here.
+  }
+
+  @Override
+  public OutputCommitter getOutputCommitter(TaskAttemptContext context)
+      throws IOException, InterruptedException {
+    return new TableOutputCommitter();
+  }
+
+  @Override
+  public RecordWriter<ImmutableBytesWritable, Mutation> getRecordWriter(TaskAttemptContext context)
+      throws IOException, InterruptedException {
+    Configuration conf = context.getConfiguration();
+    return new MultiTableRecordWriter(HBaseConfiguration.create(conf),
+        conf.getBoolean(WAL_PROPERTY, WAL_ON));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableSnapshotInputFormat.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableSnapshotInputFormat.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableSnapshotInputFormat.java
new file mode 100644
index 0000000..e7538a8
--- /dev/null
+++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableSnapshotInputFormat.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.mapreduce;
+
+import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * MultiTableSnapshotInputFormat generalizes
+ * {@link TableSnapshotInputFormat}
+ * allowing a MapReduce job to run over one or more table snapshots, with one or more scans
+ * configured for each.
+ * Internally, the input format delegates to
+ * {@link TableSnapshotInputFormat}
+ * and thus has the same performance advantages;
+ * see {@link TableSnapshotInputFormat} for
+ * more details.
+ * Usage is similar to TableSnapshotInputFormat, with the following exception:
+ * initMultiTableSnapshotMapperJob takes in a map
+ * from snapshot name to a collection of scans. For each snapshot in the map, each corresponding
+ * scan will be applied;
+ * the overall dataset for the job is defined by the concatenation of the regions and tables
+ * included in each snapshot/scan
+ * pair.
+ * {@link TableMapReduceUtil#initMultiTableSnapshotMapperJob
+ * (java.util.Map, Class, Class, Class, org.apache.hadoop.mapreduce.Job, boolean, org.apache
+ * .hadoop.fs.Path)}
+ * can be used to configure the job.
+ * <pre>{@code
+ * Job job = new Job(conf);
+ * Map<String, Collection<Scan>> snapshotScans = ImmutableMap.of(
+ *    "snapshot1", ImmutableList.of(new Scan(Bytes.toBytes("a"), Bytes.toBytes("b"))),
+ *    "snapshot2", ImmutableList.of(new Scan(Bytes.toBytes("1"), Bytes.toBytes("2")))
+ * );
+ * Path restoreDir = new Path("/tmp/snapshot_restore_dir")
+ * TableMapReduceUtil.initTableSnapshotMapperJob(
+ *     snapshotScans, MyTableMapper.class, MyMapKeyOutput.class,
+ *      MyMapOutputValueWritable.class, job, true, restoreDir);
+ * }
+ * </pre>
+ * Internally, this input format restores each snapshot into a subdirectory of the given tmp
+ * directory. Input splits and
+ * record readers are created as described in {@link org.apache.hadoop.hbase.mapreduce
+ * .TableSnapshotInputFormat}
+ * (one per region).
+ * See {@link TableSnapshotInputFormat} for more notes on
+ * permissioning; the
+ * same caveats apply here.
+ *
+ * @see TableSnapshotInputFormat
+ * @see org.apache.hadoop.hbase.client.TableSnapshotScanner
+ */
+@InterfaceAudience.Public
+public class MultiTableSnapshotInputFormat extends TableSnapshotInputFormat {
+
+  private final MultiTableSnapshotInputFormatImpl delegate;
+
+  public MultiTableSnapshotInputFormat() {
+    this.delegate = new MultiTableSnapshotInputFormatImpl();
+  }
+
+  @Override
+  public List<InputSplit> getSplits(JobContext jobContext)
+      throws IOException, InterruptedException {
+    List<TableSnapshotInputFormatImpl.InputSplit> splits =
+        delegate.getSplits(jobContext.getConfiguration());
+    List<InputSplit> rtn = Lists.newArrayListWithCapacity(splits.size());
+
+    for (TableSnapshotInputFormatImpl.InputSplit split : splits) {
+      rtn.add(new TableSnapshotInputFormat.TableSnapshotRegionSplit(split));
+    }
+
+    return rtn;
+  }
+
+  public static void setInput(Configuration configuration,
+      Map<String, Collection<Scan>> snapshotScans, Path tmpRestoreDir) throws IOException {
+    new MultiTableSnapshotInputFormatImpl().setInput(configuration, snapshotScans, tmpRestoreDir);
+  }
+}