You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by bu...@apache.org on 2017/08/27 05:33:45 UTC

[45/50] [abbrv] hbase git commit: HBASE-18640 Move mapreduce out of hbase-server into separate module.

http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/CopyTable.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/CopyTable.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/CopyTable.java
new file mode 100644
index 0000000..9cccf8c
--- /dev/null
+++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/CopyTable.java
@@ -0,0 +1,386 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+/**
+ * Tool used to copy a table to another one which can be on a different setup.
+ * It is also configurable with a start and time as well as a specification
+ * of the region server implementation if different from the local cluster.
+ */
+@InterfaceAudience.Public
+public class CopyTable extends Configured implements Tool {
+  private static final Log LOG = LogFactory.getLog(CopyTable.class);
+
+  final static String NAME = "copytable";
+  long startTime = 0;
+  long endTime = HConstants.LATEST_TIMESTAMP;
+  int batch = Integer.MAX_VALUE;
+  int cacheRow = -1;
+  int versions = -1;
+  String tableName = null;
+  String startRow = null;
+  String stopRow = null;
+  String dstTableName = null;
+  String peerAddress = null;
+  String families = null;
+  boolean allCells = false;
+  static boolean shuffle = false;
+
+  boolean bulkload = false;
+  Path bulkloadDir = null;
+
+  private final static String JOB_NAME_CONF_KEY = "mapreduce.job.name";
+
+  /**
+   * Sets up the actual job.
+   *
+   * @param args  The command line parameters.
+   * @return The newly created job.
+   * @throws IOException When setting up the job fails.
+   */
+  public Job createSubmittableJob(String[] args)
+  throws IOException {
+    if (!doCommandLine(args)) {
+      return null;
+    }
+
+    Job job = Job.getInstance(getConf(), getConf().get(JOB_NAME_CONF_KEY, NAME + "_" + tableName));
+    job.setJarByClass(CopyTable.class);
+    Scan scan = new Scan();
+
+    scan.setBatch(batch);
+    scan.setCacheBlocks(false);
+
+    if (cacheRow > 0) {
+      scan.setCaching(cacheRow);
+    } else {
+      scan.setCaching(getConf().getInt(HConstants.HBASE_CLIENT_SCANNER_CACHING, 100));
+    }
+
+    scan.setTimeRange(startTime, endTime);
+
+    if (allCells) {
+      scan.setRaw(true);
+    }
+    if (shuffle) {
+      job.getConfiguration().set(TableInputFormat.SHUFFLE_MAPS, "true");
+    }
+    if (versions >= 0) {
+      scan.setMaxVersions(versions);
+    }
+
+    if (startRow != null) {
+      scan.setStartRow(Bytes.toBytesBinary(startRow));
+    }
+
+    if (stopRow != null) {
+      scan.setStopRow(Bytes.toBytesBinary(stopRow));
+    }
+
+    if(families != null) {
+      String[] fams = families.split(",");
+      Map<String,String> cfRenameMap = new HashMap<>();
+      for(String fam : fams) {
+        String sourceCf;
+        if(fam.contains(":")) {
+            // fam looks like "sourceCfName:destCfName"
+            String[] srcAndDest = fam.split(":", 2);
+            sourceCf = srcAndDest[0];
+            String destCf = srcAndDest[1];
+            cfRenameMap.put(sourceCf, destCf);
+        } else {
+            // fam is just "sourceCf"
+            sourceCf = fam;
+        }
+        scan.addFamily(Bytes.toBytes(sourceCf));
+      }
+      Import.configureCfRenaming(job.getConfiguration(), cfRenameMap);
+    }
+    job.setNumReduceTasks(0);
+
+    if (bulkload) {
+      TableMapReduceUtil.initTableMapperJob(tableName, scan, Import.KeyValueImporter.class, null,
+        null, job);
+
+      // We need to split the inputs by destination tables so that output of Map can be bulk-loaded.
+      TableInputFormat.configureSplitTable(job, TableName.valueOf(dstTableName));
+
+      FileSystem fs = FileSystem.get(getConf());
+      Random rand = new Random();
+      Path root = new Path(fs.getWorkingDirectory(), "copytable");
+      fs.mkdirs(root);
+      while (true) {
+        bulkloadDir = new Path(root, "" + rand.nextLong());
+        if (!fs.exists(bulkloadDir)) {
+          break;
+        }
+      }
+
+      System.out.println("HFiles will be stored at " + this.bulkloadDir);
+      HFileOutputFormat2.setOutputPath(job, bulkloadDir);
+      try (Connection conn = ConnectionFactory.createConnection(getConf());
+          Admin admin = conn.getAdmin()) {
+        HFileOutputFormat2.configureIncrementalLoadMap(job,
+            admin.listTableDescriptor((TableName.valueOf(dstTableName))));
+      }
+    } else {
+      TableMapReduceUtil.initTableMapperJob(tableName, scan,
+        Import.Importer.class, null, null, job);
+
+      TableMapReduceUtil.initTableReducerJob(dstTableName, null, job, null, peerAddress, null,
+        null);
+    }
+
+    return job;
+  }
+
+  /*
+   * @param errorMsg Error message.  Can be null.
+   */
+  private static void printUsage(final String errorMsg) {
+    if (errorMsg != null && errorMsg.length() > 0) {
+      System.err.println("ERROR: " + errorMsg);
+    }
+    System.err.println("Usage: CopyTable [general options] [--starttime=X] [--endtime=Y] " +
+        "[--new.name=NEW] [--peer.adr=ADR] <tablename>");
+    System.err.println();
+    System.err.println("Options:");
+    System.err.println(" rs.class     hbase.regionserver.class of the peer cluster");
+    System.err.println("              specify if different from current cluster");
+    System.err.println(" rs.impl      hbase.regionserver.impl of the peer cluster");
+    System.err.println(" startrow     the start row");
+    System.err.println(" stoprow      the stop row");
+    System.err.println(" starttime    beginning of the time range (unixtime in millis)");
+    System.err.println("              without endtime means from starttime to forever");
+    System.err.println(" endtime      end of the time range.  Ignored if no starttime specified.");
+    System.err.println(" versions     number of cell versions to copy");
+    System.err.println(" new.name     new table's name");
+    System.err.println(" peer.adr     Address of the peer cluster given in the format");
+    System.err.println("              hbase.zookeeper.quorum:hbase.zookeeper.client"
+        + ".port:zookeeper.znode.parent");
+    System.err.println(" families     comma-separated list of families to copy");
+    System.err.println("              To copy from cf1 to cf2, give sourceCfName:destCfName. ");
+    System.err.println("              To keep the same name, just give \"cfName\"");
+    System.err.println(" all.cells    also copy delete markers and deleted cells");
+    System.err.println(" bulkload     Write input into HFiles and bulk load to the destination "
+        + "table");
+    System.err.println();
+    System.err.println("Args:");
+    System.err.println(" tablename    Name of the table to copy");
+    System.err.println();
+    System.err.println("Examples:");
+    System.err.println(" To copy 'TestTable' to a cluster that uses replication for a 1 hour window:");
+    System.err.println(" $ hbase " +
+        "org.apache.hadoop.hbase.mapreduce.CopyTable --starttime=1265875194289 --endtime=1265878794289 " +
+        "--peer.adr=server1,server2,server3:2181:/hbase --families=myOldCf:myNewCf,cf2,cf3 TestTable ");
+    System.err.println("For performance consider the following general option:\n"
+        + "  It is recommended that you set the following to >=100. A higher value uses more memory but\n"
+        + "  decreases the round trip time to the server and may increase performance.\n"
+        + "    -Dhbase.client.scanner.caching=100\n"
+        + "  The following should always be set to false, to prevent writing data twice, which may produce \n"
+        + "  inaccurate results.\n"
+        + "    -Dmapreduce.map.speculative=false");
+  }
+
+  private boolean doCommandLine(final String[] args) {
+    // Process command-line args. TODO: Better cmd-line processing
+    // (but hopefully something not as painful as cli options).
+    if (args.length < 1) {
+      printUsage(null);
+      return false;
+    }
+    try {
+      for (int i = 0; i < args.length; i++) {
+        String cmd = args[i];
+        if (cmd.equals("-h") || cmd.startsWith("--h")) {
+          printUsage(null);
+          return false;
+        }
+
+        final String startRowArgKey = "--startrow=";
+        if (cmd.startsWith(startRowArgKey)) {
+          startRow = cmd.substring(startRowArgKey.length());
+          continue;
+        }
+
+        final String stopRowArgKey = "--stoprow=";
+        if (cmd.startsWith(stopRowArgKey)) {
+          stopRow = cmd.substring(stopRowArgKey.length());
+          continue;
+        }
+
+        final String startTimeArgKey = "--starttime=";
+        if (cmd.startsWith(startTimeArgKey)) {
+          startTime = Long.parseLong(cmd.substring(startTimeArgKey.length()));
+          continue;
+        }
+
+        final String endTimeArgKey = "--endtime=";
+        if (cmd.startsWith(endTimeArgKey)) {
+          endTime = Long.parseLong(cmd.substring(endTimeArgKey.length()));
+          continue;
+        }
+
+        final String batchArgKey = "--batch=";
+        if (cmd.startsWith(batchArgKey)) {
+          batch = Integer.parseInt(cmd.substring(batchArgKey.length()));
+          continue;
+        }
+
+        final String cacheRowArgKey = "--cacheRow=";
+        if (cmd.startsWith(cacheRowArgKey)) {
+          cacheRow = Integer.parseInt(cmd.substring(cacheRowArgKey.length()));
+          continue;
+        }
+
+        final String versionsArgKey = "--versions=";
+        if (cmd.startsWith(versionsArgKey)) {
+          versions = Integer.parseInt(cmd.substring(versionsArgKey.length()));
+          continue;
+        }
+
+        final String newNameArgKey = "--new.name=";
+        if (cmd.startsWith(newNameArgKey)) {
+          dstTableName = cmd.substring(newNameArgKey.length());
+          continue;
+        }
+
+        final String peerAdrArgKey = "--peer.adr=";
+        if (cmd.startsWith(peerAdrArgKey)) {
+          peerAddress = cmd.substring(peerAdrArgKey.length());
+          continue;
+        }
+
+        final String familiesArgKey = "--families=";
+        if (cmd.startsWith(familiesArgKey)) {
+          families = cmd.substring(familiesArgKey.length());
+          continue;
+        }
+
+        if (cmd.startsWith("--all.cells")) {
+          allCells = true;
+          continue;
+        }
+
+        if (cmd.startsWith("--bulkload")) {
+          bulkload = true;
+          continue;
+        }
+
+        if (cmd.startsWith("--shuffle")) {
+          shuffle = true;
+          continue;
+        }
+
+        if (i == args.length-1) {
+          tableName = cmd;
+        } else {
+          printUsage("Invalid argument '" + cmd + "'");
+          return false;
+        }
+      }
+      if (dstTableName == null && peerAddress == null) {
+        printUsage("At least a new table name or a " +
+            "peer address must be specified");
+        return false;
+      }
+      if ((endTime != 0) && (startTime > endTime)) {
+        printUsage("Invalid time range filter: starttime=" + startTime + " >  endtime=" + endTime);
+        return false;
+      }
+
+      if (bulkload && peerAddress != null) {
+        printUsage("Remote bulkload is not supported!");
+        return false;
+      }
+
+      // set dstTableName if necessary
+      if (dstTableName == null) {
+        dstTableName = tableName;
+      }
+    } catch (Exception e) {
+      e.printStackTrace();
+      printUsage("Can't start because " + e.getMessage());
+      return false;
+    }
+    return true;
+  }
+
+  /**
+   * Main entry point.
+   *
+   * @param args  The command line parameters.
+   * @throws Exception When running the job fails.
+   */
+  public static void main(String[] args) throws Exception {
+    int ret = ToolRunner.run(HBaseConfiguration.create(), new CopyTable(), args);
+    System.exit(ret);
+  }
+
+  @Override
+  public int run(String[] args) throws Exception {
+    Job job = createSubmittableJob(args);
+    if (job == null) return 1;
+    if (!job.waitForCompletion(true)) {
+      LOG.info("Map-reduce job failed!");
+      if (bulkload) {
+        LOG.info("Files are not bulkloaded!");
+      }
+      return 1;
+    }
+    int code = 0;
+    if (bulkload) {
+      code = new LoadIncrementalHFiles(this.getConf()).run(new String[]{this.bulkloadDir.toString(),
+          this.dstTableName});
+      if (code == 0) {
+        // bulkloadDir is deleted only LoadIncrementalHFiles was successful so that one can rerun
+        // LoadIncrementalHFiles.
+        FileSystem fs = FileSystem.get(this.getConf());
+        if (!fs.delete(this.bulkloadDir, true)) {
+          LOG.error("Deleting folder " + bulkloadDir + " failed!");
+          code = 1;
+        }
+      }
+    }
+    return code;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/DefaultVisibilityExpressionResolver.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/DefaultVisibilityExpressionResolver.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/DefaultVisibilityExpressionResolver.java
new file mode 100644
index 0000000..004ee5c
--- /dev/null
+++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/DefaultVisibilityExpressionResolver.java
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import static org.apache.hadoop.hbase.security.visibility.VisibilityConstants.LABELS_TABLE_FAMILY;
+import static org.apache.hadoop.hbase.security.visibility.VisibilityConstants.LABELS_TABLE_NAME;
+import static org.apache.hadoop.hbase.security.visibility.VisibilityConstants.LABEL_QUALIFIER;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.Tag;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.security.visibility.Authorizations;
+import org.apache.hadoop.hbase.security.visibility.VisibilityConstants;
+import org.apache.hadoop.hbase.security.visibility.VisibilityLabelOrdinalProvider;
+import org.apache.hadoop.hbase.security.visibility.VisibilityUtils;
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ * This implementation creates tags by expanding expression using label ordinal. Labels will be
+ * serialized in sorted order of it's ordinal.
+ */
+@InterfaceAudience.Private
+public class DefaultVisibilityExpressionResolver implements VisibilityExpressionResolver {
+  private static final Log LOG = LogFactory.getLog(DefaultVisibilityExpressionResolver.class);
+
+  private Configuration conf;
+  private final Map<String, Integer> labels = new HashMap<>();
+
+  @Override
+  public Configuration getConf() {
+    return this.conf;
+  }
+
+  @Override
+  public void setConf(Configuration conf) {
+    this.conf = conf;
+  }
+
+  @Override
+  public void init() {
+    // Reading all the labels and ordinal.
+    // This scan should be done by user with global_admin privileges.. Ensure that it works
+    Table labelsTable = null;
+    Connection connection = null;
+    try {
+      connection = ConnectionFactory.createConnection(conf);
+      try {
+        labelsTable = connection.getTable(LABELS_TABLE_NAME);
+      } catch (IOException e) {
+        LOG.error("Error opening 'labels' table", e);
+        return;
+      }
+      Scan scan = new Scan();
+      scan.setAuthorizations(new Authorizations(VisibilityUtils.SYSTEM_LABEL));
+      scan.addColumn(LABELS_TABLE_FAMILY, LABEL_QUALIFIER);
+      ResultScanner scanner = null;
+      try {
+        scanner = labelsTable.getScanner(scan);
+        Result next = null;
+        while ((next = scanner.next()) != null) {
+          byte[] row = next.getRow();
+          byte[] value = next.getValue(LABELS_TABLE_FAMILY, LABEL_QUALIFIER);
+          labels.put(Bytes.toString(value), Bytes.toInt(row));
+        }
+      } catch (TableNotFoundException e) {
+        // Table not found. So just return
+        return;
+      } catch (IOException e) {
+        LOG.error("Error scanning 'labels' table", e);
+      } finally {
+        if (scanner != null) scanner.close();
+      }
+    } catch (IOException ioe) {
+      LOG.error("Failed reading 'labels' tags", ioe);
+      return;
+    } finally {
+      if (labelsTable != null) {
+        try {
+          labelsTable.close();
+        } catch (IOException ioe) {
+          LOG.warn("Error closing 'labels' table", ioe);
+        }
+      }
+      if (connection != null)
+        try {
+          connection.close();
+        } catch (IOException ioe) {
+          LOG.warn("Failed close of temporary connection", ioe);
+        }
+    }
+  }
+
+  @Override
+  public List<Tag> createVisibilityExpTags(String visExpression) throws IOException {
+    VisibilityLabelOrdinalProvider provider = new VisibilityLabelOrdinalProvider() {
+      @Override
+      public int getLabelOrdinal(String label) {
+        Integer ordinal = null;
+        ordinal = labels.get(label);
+        if (ordinal != null) {
+          return ordinal.intValue();
+        }
+        return VisibilityConstants.NON_EXIST_LABEL_ORDINAL;
+      }
+
+      @Override
+      public String getLabel(int ordinal) {
+        // Unused
+        throw new UnsupportedOperationException(
+            "getLabel should not be used in VisibilityExpressionResolver");
+      }
+    };
+    return VisibilityUtils.createVisibilityExpTags(visExpression, true, false, null, provider);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/Driver.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/Driver.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/Driver.java
new file mode 100644
index 0000000..9737b55
--- /dev/null
+++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/Driver.java
@@ -0,0 +1,64 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.mapreduce.replication.VerifyReplication;
+import org.apache.hadoop.hbase.snapshot.ExportSnapshot;
+import org.apache.hadoop.util.ProgramDriver;
+
+/**
+ * Driver for hbase mapreduce jobs. Select which to run by passing
+ * name of job to this main.
+ */
+@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS)
+@InterfaceStability.Stable
+public class Driver {
+  /**
+   * @param args
+   * @throws Throwable
+   */
+  public static void main(String[] args) throws Throwable {
+    ProgramDriver pgd = new ProgramDriver();
+
+    pgd.addClass(RowCounter.NAME, RowCounter.class,
+      "Count rows in HBase table.");
+    pgd.addClass(CellCounter.NAME, CellCounter.class,
+      "Count cells in HBase table.");
+    pgd.addClass(Export.NAME, Export.class, "Write table data to HDFS.");
+    pgd.addClass(Import.NAME, Import.class, "Import data written by Export.");
+    pgd.addClass(ImportTsv.NAME, ImportTsv.class, "Import data in TSV format.");
+    pgd.addClass(LoadIncrementalHFiles.NAME, LoadIncrementalHFiles.class,
+                 "Complete a bulk data load.");
+    pgd.addClass(CopyTable.NAME, CopyTable.class,
+        "Export a table from local cluster to peer cluster.");
+    pgd.addClass(VerifyReplication.NAME, VerifyReplication.class, "Compare" +
+        " the data from tables in two different clusters. WARNING: It" +
+        " doesn't work for incrementColumnValues'd cells since the" +
+        " timestamp is changed after being appended to the log.");
+    pgd.addClass(WALPlayer.NAME, WALPlayer.class, "Replay WAL files.");
+    pgd.addClass(ExportSnapshot.NAME, ExportSnapshot.class, "Export" +
+        " the specific snapshot to a given FileSystem.");
+
+    ProgramDriver.class.getMethod("driver", new Class [] {String[].class}).
+      invoke(pgd, new Object[]{args});
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/Export.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/Export.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/Export.java
new file mode 100644
index 0000000..de6cf3a
--- /dev/null
+++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/Export.java
@@ -0,0 +1,197 @@
+/**
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.hadoop.hbase.mapreduce;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.IncompatibleFilterException;
+import org.apache.hadoop.hbase.filter.PrefixFilter;
+import org.apache.hadoop.hbase.filter.RegexStringComparator;
+import org.apache.hadoop.hbase.filter.RowFilter;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+/**
+ * Export an HBase table.
+ * Writes content to sequence files up in HDFS.  Use {@link Import} to read it
+ * back in again.
+ */
+@InterfaceAudience.Public
+public class Export extends Configured implements Tool {
+  private static final Log LOG = LogFactory.getLog(Export.class);
+  final static String NAME = "export";
+  final static String RAW_SCAN = "hbase.mapreduce.include.deleted.rows";
+  final static String EXPORT_BATCHING = "hbase.export.scanner.batch";
+
+  private final static String JOB_NAME_CONF_KEY = "mapreduce.job.name";
+
+  /**
+   * Sets up the actual job.
+   *
+   * @param conf  The current configuration.
+   * @param args  The command line parameters.
+   * @return The newly created job.
+   * @throws IOException When setting up the job fails.
+   */
+  public static Job createSubmittableJob(Configuration conf, String[] args)
+  throws IOException {
+    String tableName = args[0];
+    Path outputDir = new Path(args[1]);
+    Job job = Job.getInstance(conf, conf.get(JOB_NAME_CONF_KEY, NAME + "_" + tableName));
+    job.setJobName(NAME + "_" + tableName);
+    job.setJarByClass(Export.class);
+    // Set optional scan parameters
+    Scan s = getConfiguredScanForJob(conf, args);
+    IdentityTableMapper.initJob(tableName, s, IdentityTableMapper.class, job);
+    // No reducers.  Just write straight to output files.
+    job.setNumReduceTasks(0);
+    job.setOutputFormatClass(SequenceFileOutputFormat.class);
+    job.setOutputKeyClass(ImmutableBytesWritable.class);
+    job.setOutputValueClass(Result.class);
+    FileOutputFormat.setOutputPath(job, outputDir); // job conf doesn't contain the conf so doesn't have a default fs.
+    return job;
+  }
+
+  private static Scan getConfiguredScanForJob(Configuration conf, String[] args) throws IOException {
+    Scan s = new Scan();
+    // Optional arguments.
+    // Set Scan Versions
+    int versions = args.length > 2? Integer.parseInt(args[2]): 1;
+    s.setMaxVersions(versions);
+    // Set Scan Range
+    long startTime = args.length > 3? Long.parseLong(args[3]): 0L;
+    long endTime = args.length > 4? Long.parseLong(args[4]): Long.MAX_VALUE;
+    s.setTimeRange(startTime, endTime);
+    // Set cache blocks
+    s.setCacheBlocks(false);
+    // set Start and Stop row
+    if (conf.get(TableInputFormat.SCAN_ROW_START) != null) {
+      s.setStartRow(Bytes.toBytesBinary(conf.get(TableInputFormat.SCAN_ROW_START)));
+    }
+    if (conf.get(TableInputFormat.SCAN_ROW_STOP) != null) {
+      s.setStopRow(Bytes.toBytesBinary(conf.get(TableInputFormat.SCAN_ROW_STOP)));
+    }
+    // Set Scan Column Family
+    boolean raw = Boolean.parseBoolean(conf.get(RAW_SCAN));
+    if (raw) {
+      s.setRaw(raw);
+    }
+    for (String columnFamily : conf.getTrimmedStrings(TableInputFormat.SCAN_COLUMN_FAMILY)) {
+      s.addFamily(Bytes.toBytes(columnFamily));
+    }
+    // Set RowFilter or Prefix Filter if applicable.
+    Filter exportFilter = getExportFilter(args);
+    if (exportFilter!= null) {
+        LOG.info("Setting Scan Filter for Export.");
+      s.setFilter(exportFilter);
+    }
+
+    int batching = conf.getInt(EXPORT_BATCHING, -1);
+    if (batching !=  -1){
+      try {
+        s.setBatch(batching);
+      } catch (IncompatibleFilterException e) {
+        LOG.error("Batching could not be set", e);
+      }
+    }
+    LOG.info("versions=" + versions + ", starttime=" + startTime +
+      ", endtime=" + endTime + ", keepDeletedCells=" + raw);
+    return s;
+  }
+
+  private static Filter getExportFilter(String[] args) {
+    Filter exportFilter = null;
+    String filterCriteria = (args.length > 5) ? args[5]: null;
+    if (filterCriteria == null) return null;
+    if (filterCriteria.startsWith("^")) {
+      String regexPattern = filterCriteria.substring(1, filterCriteria.length());
+      exportFilter = new RowFilter(CompareOp.EQUAL, new RegexStringComparator(regexPattern));
+    } else {
+      exportFilter = new PrefixFilter(Bytes.toBytesBinary(filterCriteria));
+    }
+    return exportFilter;
+  }
+
+  /*
+   * @param errorMsg Error message.  Can be null.
+   */
+  private static void usage(final String errorMsg) {
+    if (errorMsg != null && errorMsg.length() > 0) {
+      System.err.println("ERROR: " + errorMsg);
+    }
+    System.err.println("Usage: Export [-D <property=value>]* <tablename> <outputdir> [<versions> " +
+      "[<starttime> [<endtime>]] [^[regex pattern] or [Prefix] to filter]]\n");
+    System.err.println("  Note: -D properties will be applied to the conf used. ");
+    System.err.println("  For example: ");
+    System.err.println("   -D mapreduce.output.fileoutputformat.compress=true");
+    System.err.println("   -D mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.GzipCodec");
+    System.err.println("   -D mapreduce.output.fileoutputformat.compress.type=BLOCK");
+    System.err.println("  Additionally, the following SCAN properties can be specified");
+    System.err.println("  to control/limit what is exported..");
+    System.err.println("   -D " + TableInputFormat.SCAN_COLUMN_FAMILY + "=<family1>,<family2>, ...");
+    System.err.println("   -D " + RAW_SCAN + "=true");
+    System.err.println("   -D " + TableInputFormat.SCAN_ROW_START + "=<ROWSTART>");
+    System.err.println("   -D " + TableInputFormat.SCAN_ROW_STOP + "=<ROWSTOP>");
+    System.err.println("   -D " + JOB_NAME_CONF_KEY
+        + "=jobName - use the specified mapreduce job name for the export");
+    System.err.println("For performance consider the following properties:\n"
+        + "   -Dhbase.client.scanner.caching=100\n"
+        + "   -Dmapreduce.map.speculative=false\n"
+        + "   -Dmapreduce.reduce.speculative=false");
+    System.err.println("For tables with very wide rows consider setting the batch size as below:\n"
+        + "   -D" + EXPORT_BATCHING + "=10");
+  }
+
+
+  @Override
+  public int run(String[] args) throws Exception {
+    if (args.length < 2) {
+      usage("Wrong number of arguments: " + args.length);
+      return -1;
+    }
+    Job job = createSubmittableJob(getConf(), args);
+    return (job.waitForCompletion(true) ? 0 : 1);
+  }
+
+  /**
+   * Main entry point.
+   * @param args The command line parameters.
+   * @throws Exception When running the job fails.
+   */
+  public static void main(String[] args) throws Exception {
+    int errCode = ToolRunner.run(HBaseConfiguration.create(), new Export(), args);
+    System.exit(errCode);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/GroupingTableMapper.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/GroupingTableMapper.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/GroupingTableMapper.java
new file mode 100644
index 0000000..dc30c6e
--- /dev/null
+++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/GroupingTableMapper.java
@@ -0,0 +1,177 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import java.io.IOException;
+import java.util.ArrayList;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.mapreduce.Job;
+
+/**
+ * Extract grouping columns from input record.
+ */
+@InterfaceAudience.Public
+public class GroupingTableMapper
+extends TableMapper<ImmutableBytesWritable,Result> implements Configurable {
+
+  /**
+   * JobConf parameter to specify the columns used to produce the key passed to
+   * collect from the map phase.
+   */
+  public static final String GROUP_COLUMNS =
+    "hbase.mapred.groupingtablemap.columns";
+
+  /** The grouping columns. */
+  protected byte [][] columns;
+  /** The current configuration. */
+  private Configuration conf = null;
+
+  /**
+   * Use this before submitting a TableMap job. It will appropriately set up
+   * the job.
+   *
+   * @param table The table to be processed.
+   * @param scan  The scan with the columns etc.
+   * @param groupColumns  A space separated list of columns used to form the
+   * key used in collect.
+   * @param mapper  The mapper class.
+   * @param job  The current job.
+   * @throws IOException When setting up the job fails.
+   */
+  @SuppressWarnings("unchecked")
+  public static void initJob(String table, Scan scan, String groupColumns,
+    Class<? extends TableMapper> mapper, Job job) throws IOException {
+    TableMapReduceUtil.initTableMapperJob(table, scan, mapper,
+        ImmutableBytesWritable.class, Result.class, job);
+    job.getConfiguration().set(GROUP_COLUMNS, groupColumns);
+  }
+
+  /**
+   * Extract the grouping columns from value to construct a new key. Pass the
+   * new key and value to reduce. If any of the grouping columns are not found
+   * in the value, the record is skipped.
+   *
+   * @param key  The current key.
+   * @param value  The current value.
+   * @param context  The current context.
+   * @throws IOException When writing the record fails.
+   * @throws InterruptedException When the job is aborted.
+   */
+  @Override
+  public void map(ImmutableBytesWritable key, Result value, Context context)
+  throws IOException, InterruptedException {
+    byte[][] keyVals = extractKeyValues(value);
+    if(keyVals != null) {
+      ImmutableBytesWritable tKey = createGroupKey(keyVals);
+      context.write(tKey, value);
+    }
+  }
+
+  /**
+   * Extract columns values from the current record. This method returns
+   * null if any of the columns are not found.
+   * <p>
+   * Override this method if you want to deal with nulls differently.
+   *
+   * @param r  The current values.
+   * @return Array of byte values.
+   */
+  protected byte[][] extractKeyValues(Result r) {
+    byte[][] keyVals = null;
+    ArrayList<byte[]> foundList = new ArrayList<>();
+    int numCols = columns.length;
+    if (numCols > 0) {
+      for (Cell value: r.listCells()) {
+        byte [] column = KeyValue.makeColumn(CellUtil.cloneFamily(value),
+            CellUtil.cloneQualifier(value));
+        for (int i = 0; i < numCols; i++) {
+          if (Bytes.equals(column, columns[i])) {
+            foundList.add(CellUtil.cloneValue(value));
+            break;
+          }
+        }
+      }
+      if(foundList.size() == numCols) {
+        keyVals = foundList.toArray(new byte[numCols][]);
+      }
+    }
+    return keyVals;
+  }
+
+  /**
+   * Create a key by concatenating multiple column values.
+   * <p>
+   * Override this function in order to produce different types of keys.
+   *
+   * @param vals  The current key/values.
+   * @return A key generated by concatenating multiple column values.
+   */
+  protected ImmutableBytesWritable createGroupKey(byte[][] vals) {
+    if(vals == null) {
+      return null;
+    }
+    StringBuilder sb =  new StringBuilder();
+    for(int i = 0; i < vals.length; i++) {
+      if(i > 0) {
+        sb.append(" ");
+      }
+      sb.append(Bytes.toString(vals[i]));
+    }
+    return new ImmutableBytesWritable(Bytes.toBytesBinary(sb.toString()));
+  }
+
+  /**
+   * Returns the current configuration.
+   *
+   * @return The current configuration.
+   * @see org.apache.hadoop.conf.Configurable#getConf()
+   */
+  @Override
+  public Configuration getConf() {
+    return conf;
+  }
+
+  /**
+   * Sets the configuration. This is used to set up the grouping details.
+   *
+   * @param configuration  The configuration to set.
+   * @see org.apache.hadoop.conf.Configurable#setConf(
+   *   org.apache.hadoop.conf.Configuration)
+   */
+  @Override
+  public void setConf(Configuration configuration) {
+    this.conf = configuration;
+    String[] cols = conf.get(GROUP_COLUMNS, "").split(" ");
+    columns = new byte[cols.length][];
+    for(int i = 0; i < cols.length; i++) {
+      columns[i] = Bytes.toBytes(cols[i]);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileInputFormat.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileInputFormat.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileInputFormat.java
new file mode 100644
index 0000000..e90d5c1
--- /dev/null
+++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileInputFormat.java
@@ -0,0 +1,174 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.io.hfile.HFile;
+import org.apache.hadoop.hbase.io.hfile.HFile.Reader;
+import org.apache.hadoop.hbase.io.hfile.HFileScanner;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Simple MR input format for HFiles.
+ * This code was borrowed from Apache Crunch project.
+ * Updated to the recent version of HBase.
+ */
+public class HFileInputFormat extends FileInputFormat<NullWritable, Cell> {
+
+  private static final Logger LOG = LoggerFactory.getLogger(HFileInputFormat.class);
+
+  /**
+   * File filter that removes all "hidden" files. This might be something worth removing from
+   * a more general purpose utility; it accounts for the presence of metadata files created
+   * in the way we're doing exports.
+   */
+  static final PathFilter HIDDEN_FILE_FILTER = new PathFilter() {
+    @Override
+    public boolean accept(Path p) {
+      String name = p.getName();
+      return !name.startsWith("_") && !name.startsWith(".");
+    }
+  };
+
+  /**
+   * Record reader for HFiles.
+   */
+  private static class HFileRecordReader extends RecordReader<NullWritable, Cell> {
+
+    private Reader in;
+    protected Configuration conf;
+    private HFileScanner scanner;
+
+    /**
+     * A private cache of the key value so it doesn't need to be loaded twice from the scanner.
+     */
+    private Cell value = null;
+    private long count;
+    private boolean seeked = false;
+
+    @Override
+    public void initialize(InputSplit split, TaskAttemptContext context)
+        throws IOException, InterruptedException {
+      FileSplit fileSplit = (FileSplit) split;
+      conf = context.getConfiguration();
+      Path path = fileSplit.getPath();
+      FileSystem fs = path.getFileSystem(conf);
+      LOG.info("Initialize HFileRecordReader for {}", path);
+      this.in = HFile.createReader(fs, path, conf);
+
+      // The file info must be loaded before the scanner can be used.
+      // This seems like a bug in HBase, but it's easily worked around.
+      this.in.loadFileInfo();
+      this.scanner = in.getScanner(false, false);
+
+    }
+
+
+    @Override
+    public boolean nextKeyValue() throws IOException, InterruptedException {
+      boolean hasNext;
+      if (!seeked) {
+        LOG.info("Seeking to start");
+        hasNext = scanner.seekTo();
+        seeked = true;
+      } else {
+        hasNext = scanner.next();
+      }
+      if (!hasNext) {
+        return false;
+      }
+      value = scanner.getCell();
+      count++;
+      return true;
+    }
+
+    @Override
+    public NullWritable getCurrentKey() throws IOException, InterruptedException {
+      return NullWritable.get();
+    }
+
+    @Override
+    public Cell getCurrentValue() throws IOException, InterruptedException {
+      return value;
+    }
+
+    @Override
+    public float getProgress() throws IOException, InterruptedException {
+      // This would be inaccurate if KVs are not uniformly-sized or we have performed a seek to
+      // the start row, but better than nothing anyway.
+      return 1.0f * count / in.getEntries();
+    }
+
+    @Override
+    public void close() throws IOException {
+      if (in != null) {
+        in.close();
+        in = null;
+      }
+    }
+  }
+
+  @Override
+  protected List<FileStatus> listStatus(JobContext job) throws IOException {
+    List<FileStatus> result = new ArrayList<FileStatus>();
+
+    // Explode out directories that match the original FileInputFormat filters
+    // since HFiles are written to directories where the
+    // directory name is the column name
+    for (FileStatus status : super.listStatus(job)) {
+      if (status.isDirectory()) {
+        FileSystem fs = status.getPath().getFileSystem(job.getConfiguration());
+        for (FileStatus match : fs.listStatus(status.getPath(), HIDDEN_FILE_FILTER)) {
+          result.add(match);
+        }
+      } else {
+        result.add(status);
+      }
+    }
+    return result;
+  }
+
+  @Override
+  public RecordReader<NullWritable, Cell> createRecordReader(InputSplit split, TaskAttemptContext context)
+      throws IOException, InterruptedException {
+    return new HFileRecordReader();
+  }
+
+  @Override
+  protected boolean isSplitable(JobContext context, Path filename) {
+    // This file isn't splittable.
+    return false;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java
new file mode 100644
index 0000000..7fea254
--- /dev/null
+++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java
@@ -0,0 +1,902 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.net.InetSocketAddress;
+import java.net.URLDecoder;
+import java.net.URLEncoder;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.UUID;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellComparator;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.RegionLocator;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.fs.HFileSystem;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.io.compress.Compression;
+import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
+import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+import org.apache.hadoop.hbase.io.hfile.HFile;
+import org.apache.hadoop.hbase.io.hfile.HFileContext;
+import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
+import org.apache.hadoop.hbase.io.hfile.HFileWriterImpl;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValueUtil;
+import org.apache.hadoop.hbase.regionserver.BloomType;
+import org.apache.hadoop.hbase.regionserver.HStore;
+import org.apache.hadoop.hbase.regionserver.StoreFile;
+import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner;
+
+import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
+
+/**
+ * Writes HFiles. Passed Cells must arrive in order.
+ * Writes current time as the sequence id for the file. Sets the major compacted
+ * attribute on created @{link {@link HFile}s. Calling write(null,null) will forcibly roll
+ * all HFiles being written.
+ * <p>
+ * Using this class as part of a MapReduce job is best done
+ * using {@link #configureIncrementalLoad(Job, TableDescriptor, RegionLocator)}.
+ */
+@InterfaceAudience.Public
+public class HFileOutputFormat2
+    extends FileOutputFormat<ImmutableBytesWritable, Cell> {
+  private static final Log LOG = LogFactory.getLog(HFileOutputFormat2.class);
+  static class TableInfo {
+    private TableDescriptor tableDesctiptor;
+    private RegionLocator regionLocator;
+
+    public TableInfo(TableDescriptor tableDesctiptor, RegionLocator regionLocator) {
+      this.tableDesctiptor = tableDesctiptor;
+      this.regionLocator = regionLocator;
+    }
+
+    /**
+     * The modification for the returned HTD doesn't affect the inner TD.
+     * @return A clone of inner table descriptor
+     * @deprecated use {@link #getTableDescriptor}
+     */
+    @Deprecated
+    public HTableDescriptor getHTableDescriptor() {
+      return new HTableDescriptor(tableDesctiptor);
+    }
+
+    public TableDescriptor getTableDescriptor() {
+      return tableDesctiptor;
+    }
+
+    public RegionLocator getRegionLocator() {
+      return regionLocator;
+    }
+  }
+
+  protected static final byte[] tableSeparator = ";".getBytes(StandardCharsets.UTF_8);
+
+  protected static byte[] combineTableNameSuffix(byte[] tableName,
+                                       byte[] suffix ) {
+    return Bytes.add(tableName, tableSeparator, suffix);
+  }
+
+  // The following constants are private since these are used by
+  // HFileOutputFormat2 to internally transfer data between job setup and
+  // reducer run using conf.
+  // These should not be changed by the client.
+  static final String COMPRESSION_FAMILIES_CONF_KEY =
+      "hbase.hfileoutputformat.families.compression";
+  static final String BLOOM_TYPE_FAMILIES_CONF_KEY =
+      "hbase.hfileoutputformat.families.bloomtype";
+  static final String BLOCK_SIZE_FAMILIES_CONF_KEY =
+      "hbase.mapreduce.hfileoutputformat.blocksize";
+  static final String DATABLOCK_ENCODING_FAMILIES_CONF_KEY =
+      "hbase.mapreduce.hfileoutputformat.families.datablock.encoding";
+
+  // This constant is public since the client can modify this when setting
+  // up their conf object and thus refer to this symbol.
+  // It is present for backwards compatibility reasons. Use it only to
+  // override the auto-detection of datablock encoding.
+  public static final String DATABLOCK_ENCODING_OVERRIDE_CONF_KEY =
+      "hbase.mapreduce.hfileoutputformat.datablock.encoding";
+
+  /**
+   * Keep locality while generating HFiles for bulkload. See HBASE-12596
+   */
+  public static final String LOCALITY_SENSITIVE_CONF_KEY =
+      "hbase.bulkload.locality.sensitive.enabled";
+  private static final boolean DEFAULT_LOCALITY_SENSITIVE = true;
+  static final String OUTPUT_TABLE_NAME_CONF_KEY =
+      "hbase.mapreduce.hfileoutputformat.table.name";
+  static final String MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY =
+          "hbase.mapreduce.use.multi.table.hfileoutputformat";
+
+  public static final String STORAGE_POLICY_PROPERTY = "hbase.hstore.storagepolicy";
+  public static final String STORAGE_POLICY_PROPERTY_CF_PREFIX = STORAGE_POLICY_PROPERTY + ".";
+
+  @Override
+  public RecordWriter<ImmutableBytesWritable, Cell> getRecordWriter(
+      final TaskAttemptContext context) throws IOException, InterruptedException {
+    return createRecordWriter(context);
+  }
+
+  protected static byte[] getTableNameSuffixedWithFamily(byte[] tableName, byte[] family) {
+    return combineTableNameSuffix(tableName, family);
+  }
+
+  static <V extends Cell> RecordWriter<ImmutableBytesWritable, V>
+      createRecordWriter(final TaskAttemptContext context)
+          throws IOException {
+
+    // Get the path of the temporary output file
+    final Path outputPath = FileOutputFormat.getOutputPath(context);
+    final Path outputDir = new FileOutputCommitter(outputPath, context).getWorkPath();
+    final Configuration conf = context.getConfiguration();
+    final boolean writeMultipleTables = conf.getBoolean(MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY, false) ;
+    final String writeTableNames = conf.get(OUTPUT_TABLE_NAME_CONF_KEY);
+    if (writeTableNames==null || writeTableNames.isEmpty()) {
+      throw new IllegalArgumentException("Configuration parameter " + OUTPUT_TABLE_NAME_CONF_KEY
+              + " cannot be empty");
+    }
+    final FileSystem fs = outputDir.getFileSystem(conf);
+    // These configs. are from hbase-*.xml
+    final long maxsize = conf.getLong(HConstants.HREGION_MAX_FILESIZE,
+        HConstants.DEFAULT_MAX_FILE_SIZE);
+    // Invented config.  Add to hbase-*.xml if other than default compression.
+    final String defaultCompressionStr = conf.get("hfile.compression",
+        Compression.Algorithm.NONE.getName());
+    final Algorithm defaultCompression = HFileWriterImpl
+        .compressionByName(defaultCompressionStr);
+    final boolean compactionExclude = conf.getBoolean(
+        "hbase.mapreduce.hfileoutputformat.compaction.exclude", false);
+
+    final Set<String> allTableNames = Arrays.stream(writeTableNames.split(
+            Bytes.toString(tableSeparator))).collect(Collectors.toSet());
+
+    // create a map from column family to the compression algorithm
+    final Map<byte[], Algorithm> compressionMap = createFamilyCompressionMap(conf);
+    final Map<byte[], BloomType> bloomTypeMap = createFamilyBloomTypeMap(conf);
+    final Map<byte[], Integer> blockSizeMap = createFamilyBlockSizeMap(conf);
+
+    String dataBlockEncodingStr = conf.get(DATABLOCK_ENCODING_OVERRIDE_CONF_KEY);
+    final Map<byte[], DataBlockEncoding> datablockEncodingMap
+        = createFamilyDataBlockEncodingMap(conf);
+    final DataBlockEncoding overriddenEncoding;
+    if (dataBlockEncodingStr != null) {
+      overriddenEncoding = DataBlockEncoding.valueOf(dataBlockEncodingStr);
+    } else {
+      overriddenEncoding = null;
+    }
+
+    return new RecordWriter<ImmutableBytesWritable, V>() {
+      // Map of families to writers and how much has been output on the writer.
+      private final Map<byte[], WriterLength> writers =
+              new TreeMap<>(Bytes.BYTES_COMPARATOR);
+      private byte[] previousRow = HConstants.EMPTY_BYTE_ARRAY;
+      private final byte[] now = Bytes.toBytes(EnvironmentEdgeManager.currentTime());
+      private boolean rollRequested = false;
+
+      @Override
+      public void write(ImmutableBytesWritable row, V cell)
+          throws IOException {
+        KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
+
+        // null input == user explicitly wants to flush
+        if (row == null && kv == null) {
+          rollWriters();
+          return;
+        }
+
+        byte[] rowKey = CellUtil.cloneRow(kv);
+        long length = kv.getLength();
+        byte[] family = CellUtil.cloneFamily(kv);
+        byte[] tableNameBytes = null;
+        if (writeMultipleTables) {
+          tableNameBytes = MultiTableHFileOutputFormat.getTableName(row.get());
+          if (!allTableNames.contains(Bytes.toString(tableNameBytes))) {
+            throw new IllegalArgumentException("TableName '" + Bytes.toString(tableNameBytes) +
+                    "' not" + " expected");
+          }
+        } else {
+          tableNameBytes = writeTableNames.getBytes(StandardCharsets.UTF_8);
+        }
+        byte[] tableAndFamily = getTableNameSuffixedWithFamily(tableNameBytes, family);
+        WriterLength wl = this.writers.get(tableAndFamily);
+
+        // If this is a new column family, verify that the directory exists
+        if (wl == null) {
+          Path writerPath = null;
+          if (writeMultipleTables) {
+            writerPath = new Path(outputDir, new Path(Bytes.toString(tableNameBytes), Bytes
+                    .toString(family)));
+          }
+          else {
+            writerPath = new Path(outputDir, Bytes.toString(family));
+          }
+          fs.mkdirs(writerPath);
+          configureStoragePolicy(conf, fs, tableAndFamily, writerPath);
+        }
+
+        // If any of the HFiles for the column families has reached
+        // maxsize, we need to roll all the writers
+        if (wl != null && wl.written + length >= maxsize) {
+          this.rollRequested = true;
+        }
+
+        // This can only happen once a row is finished though
+        if (rollRequested && Bytes.compareTo(this.previousRow, rowKey) != 0) {
+          rollWriters();
+        }
+
+        // create a new WAL writer, if necessary
+        if (wl == null || wl.writer == null) {
+          if (conf.getBoolean(LOCALITY_SENSITIVE_CONF_KEY, DEFAULT_LOCALITY_SENSITIVE)) {
+            HRegionLocation loc = null;
+
+            String tableName = Bytes.toString(tableNameBytes);
+            if (tableName != null) {
+              try (Connection connection = ConnectionFactory.createConnection(conf);
+                     RegionLocator locator =
+                       connection.getRegionLocator(TableName.valueOf(tableName))) {
+                loc = locator.getRegionLocation(rowKey);
+              } catch (Throwable e) {
+                LOG.warn("There's something wrong when locating rowkey: " +
+                  Bytes.toString(rowKey) + " for tablename: " + tableName, e);
+                loc = null;
+              } }
+
+            if (null == loc) {
+              if (LOG.isTraceEnabled()) {
+                LOG.trace("failed to get region location, so use default writer for rowkey: " +
+                  Bytes.toString(rowKey));
+              }
+              wl = getNewWriter(tableNameBytes, family, conf, null);
+            } else {
+              if (LOG.isDebugEnabled()) {
+                LOG.debug("first rowkey: [" + Bytes.toString(rowKey) + "]");
+              }
+              InetSocketAddress initialIsa =
+                  new InetSocketAddress(loc.getHostname(), loc.getPort());
+              if (initialIsa.isUnresolved()) {
+                if (LOG.isTraceEnabled()) {
+                  LOG.trace("failed to resolve bind address: " + loc.getHostname() + ":"
+                      + loc.getPort() + ", so use default writer");
+                }
+                wl = getNewWriter(tableNameBytes, family, conf, null);
+              } else {
+                if (LOG.isDebugEnabled()) {
+                  LOG.debug("use favored nodes writer: " + initialIsa.getHostString());
+                }
+                wl = getNewWriter(tableNameBytes, family, conf, new InetSocketAddress[] { initialIsa
+                });
+              }
+            }
+          } else {
+            wl = getNewWriter(tableNameBytes, family, conf, null);
+          }
+        }
+
+        // we now have the proper WAL writer. full steam ahead
+        kv.updateLatestStamp(this.now);
+        wl.writer.append(kv);
+        wl.written += length;
+
+        // Copy the row so we know when a row transition.
+        this.previousRow = rowKey;
+      }
+
+      private void rollWriters() throws IOException {
+        for (WriterLength wl : this.writers.values()) {
+          if (wl.writer != null) {
+            LOG.info(
+                "Writer=" + wl.writer.getPath() + ((wl.written == 0)? "": ", wrote=" + wl.written));
+            close(wl.writer);
+          }
+          wl.writer = null;
+          wl.written = 0;
+        }
+        this.rollRequested = false;
+      }
+
+      /*
+       * Create a new StoreFile.Writer.
+       * @param family
+       * @return A WriterLength, containing a new StoreFile.Writer.
+       * @throws IOException
+       */
+      @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="BX_UNBOXING_IMMEDIATELY_REBOXED",
+          justification="Not important")
+      private WriterLength getNewWriter(byte[] tableName, byte[] family, Configuration
+              conf, InetSocketAddress[] favoredNodes) throws IOException {
+        byte[] tableAndFamily = getTableNameSuffixedWithFamily(tableName, family);
+        Path familydir = new Path(outputDir, Bytes.toString(family));
+        if (writeMultipleTables) {
+          familydir = new Path(outputDir,
+                  new Path(Bytes.toString(tableName), Bytes.toString(family)));
+        }
+        WriterLength wl = new WriterLength();
+        Algorithm compression = compressionMap.get(tableAndFamily);
+        compression = compression == null ? defaultCompression : compression;
+        BloomType bloomType = bloomTypeMap.get(tableAndFamily);
+        bloomType = bloomType == null ? BloomType.NONE : bloomType;
+        Integer blockSize = blockSizeMap.get(tableAndFamily);
+        blockSize = blockSize == null ? HConstants.DEFAULT_BLOCKSIZE : blockSize;
+        DataBlockEncoding encoding = overriddenEncoding;
+        encoding = encoding == null ? datablockEncodingMap.get(tableAndFamily) : encoding;
+        encoding = encoding == null ? DataBlockEncoding.NONE : encoding;
+        Configuration tempConf = new Configuration(conf);
+        tempConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0.0f);
+        HFileContextBuilder contextBuilder = new HFileContextBuilder()
+                                    .withCompression(compression)
+                                    .withChecksumType(HStore.getChecksumType(conf))
+                                    .withBytesPerCheckSum(HStore.getBytesPerChecksum(conf))
+                                    .withBlockSize(blockSize);
+
+        if (HFile.getFormatVersion(conf) >= HFile.MIN_FORMAT_VERSION_WITH_TAGS) {
+          contextBuilder.withIncludesTags(true);
+        }
+
+        contextBuilder.withDataBlockEncoding(encoding);
+        HFileContext hFileContext = contextBuilder.build();
+        if (null == favoredNodes) {
+          wl.writer =
+              new StoreFileWriter.Builder(conf, new CacheConfig(tempConf), fs)
+                  .withOutputDir(familydir).withBloomType(bloomType)
+                  .withComparator(CellComparator.COMPARATOR).withFileContext(hFileContext).build();
+        } else {
+          wl.writer =
+              new StoreFileWriter.Builder(conf, new CacheConfig(tempConf), new HFileSystem(fs))
+                  .withOutputDir(familydir).withBloomType(bloomType)
+                  .withComparator(CellComparator.COMPARATOR).withFileContext(hFileContext)
+                  .withFavoredNodes(favoredNodes).build();
+        }
+
+        this.writers.put(tableAndFamily, wl);
+        return wl;
+      }
+
+      private void close(final StoreFileWriter w) throws IOException {
+        if (w != null) {
+          w.appendFileInfo(StoreFile.BULKLOAD_TIME_KEY,
+              Bytes.toBytes(System.currentTimeMillis()));
+          w.appendFileInfo(StoreFile.BULKLOAD_TASK_KEY,
+              Bytes.toBytes(context.getTaskAttemptID().toString()));
+          w.appendFileInfo(StoreFile.MAJOR_COMPACTION_KEY,
+              Bytes.toBytes(true));
+          w.appendFileInfo(StoreFile.EXCLUDE_FROM_MINOR_COMPACTION_KEY,
+              Bytes.toBytes(compactionExclude));
+          w.appendTrackedTimestampsToMetadata();
+          w.close();
+        }
+      }
+
+      @Override
+      public void close(TaskAttemptContext c)
+      throws IOException, InterruptedException {
+        for (WriterLength wl: this.writers.values()) {
+          close(wl.writer);
+        }
+      }
+    };
+  }
+
+  /**
+   * Configure block storage policy for CF after the directory is created.
+   */
+  static void configureStoragePolicy(final Configuration conf, final FileSystem fs,
+      byte[] tableAndFamily, Path cfPath) {
+    if (null == conf || null == fs || null == tableAndFamily || null == cfPath) {
+      return;
+    }
+
+    String policy =
+        conf.get(STORAGE_POLICY_PROPERTY_CF_PREFIX + Bytes.toString(tableAndFamily),
+          conf.get(STORAGE_POLICY_PROPERTY));
+    FSUtils.setStoragePolicy(fs, cfPath, policy);
+  }
+
+  /*
+   * Data structure to hold a Writer and amount of data written on it.
+   */
+  static class WriterLength {
+    long written = 0;
+    StoreFileWriter writer = null;
+  }
+
+  /**
+   * Return the start keys of all of the regions in this table,
+   * as a list of ImmutableBytesWritable.
+   */
+  private static List<ImmutableBytesWritable> getRegionStartKeys(List<RegionLocator> regionLocators,
+                                                                 boolean writeMultipleTables)
+          throws IOException {
+
+    ArrayList<ImmutableBytesWritable> ret = new ArrayList<>();
+    for(RegionLocator regionLocator : regionLocators)
+    {
+      TableName tableName = regionLocator.getName();
+      LOG.info("Looking up current regions for table " + tableName);
+      byte[][] byteKeys = regionLocator.getStartKeys();
+      for (byte[] byteKey : byteKeys) {
+        byte[] fullKey = byteKey; //HFileOutputFormat2 use case
+        if (writeMultipleTables)
+        {
+          //MultiTableHFileOutputFormat use case
+          fullKey = combineTableNameSuffix(tableName.getName(), byteKey);
+        }
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("SplitPoint startkey for table [" + tableName + "]: [" + Bytes.toStringBinary
+                  (fullKey) + "]");
+        }
+        ret.add(new ImmutableBytesWritable(fullKey));
+      }
+    }
+    return ret;
+  }
+
+  /**
+   * Write out a {@link SequenceFile} that can be read by
+   * {@link TotalOrderPartitioner} that contains the split points in startKeys.
+   */
+  @SuppressWarnings("deprecation")
+  private static void writePartitions(Configuration conf, Path partitionsPath,
+      List<ImmutableBytesWritable> startKeys, boolean writeMultipleTables) throws IOException {
+    LOG.info("Writing partition information to " + partitionsPath);
+    if (startKeys.isEmpty()) {
+      throw new IllegalArgumentException("No regions passed");
+    }
+
+    // We're generating a list of split points, and we don't ever
+    // have keys < the first region (which has an empty start key)
+    // so we need to remove it. Otherwise we would end up with an
+    // empty reducer with index 0
+    TreeSet<ImmutableBytesWritable> sorted = new TreeSet<>(startKeys);
+    ImmutableBytesWritable first = sorted.first();
+    if (writeMultipleTables) {
+      first = new ImmutableBytesWritable(MultiTableHFileOutputFormat.getSuffix(sorted.first
+              ().get()));
+    }
+    if (!first.equals(HConstants.EMPTY_BYTE_ARRAY)) {
+      throw new IllegalArgumentException(
+          "First region of table should have empty start key. Instead has: "
+          + Bytes.toStringBinary(first.get()));
+    }
+    sorted.remove(sorted.first());
+
+    // Write the actual file
+    FileSystem fs = partitionsPath.getFileSystem(conf);
+    SequenceFile.Writer writer = SequenceFile.createWriter(
+      fs, conf, partitionsPath, ImmutableBytesWritable.class,
+      NullWritable.class);
+
+    try {
+      for (ImmutableBytesWritable startKey : sorted) {
+        writer.append(startKey, NullWritable.get());
+      }
+    } finally {
+      writer.close();
+    }
+  }
+
+  /**
+   * Configure a MapReduce Job to perform an incremental load into the given
+   * table. This
+   * <ul>
+   *   <li>Inspects the table to configure a total order partitioner</li>
+   *   <li>Uploads the partitions file to the cluster and adds it to the DistributedCache</li>
+   *   <li>Sets the number of reduce tasks to match the current number of regions</li>
+   *   <li>Sets the output key/value class to match HFileOutputFormat2's requirements</li>
+   *   <li>Sets the reducer up to perform the appropriate sorting (either KeyValueSortReducer or
+   *     PutSortReducer)</li>
+   * </ul>
+   * The user should be sure to set the map output value class to either KeyValue or Put before
+   * running this function.
+   */
+  public static void configureIncrementalLoad(Job job, Table table, RegionLocator regionLocator)
+      throws IOException {
+    configureIncrementalLoad(job, table.getDescriptor(), regionLocator);
+  }
+
+  /**
+   * Configure a MapReduce Job to perform an incremental load into the given
+   * table. This
+   * <ul>
+   *   <li>Inspects the table to configure a total order partitioner</li>
+   *   <li>Uploads the partitions file to the cluster and adds it to the DistributedCache</li>
+   *   <li>Sets the number of reduce tasks to match the current number of regions</li>
+   *   <li>Sets the output key/value class to match HFileOutputFormat2's requirements</li>
+   *   <li>Sets the reducer up to perform the appropriate sorting (either KeyValueSortReducer or
+   *     PutSortReducer)</li>
+   * </ul>
+   * The user should be sure to set the map output value class to either KeyValue or Put before
+   * running this function.
+   */
+  public static void configureIncrementalLoad(Job job, TableDescriptor tableDescriptor,
+      RegionLocator regionLocator) throws IOException {
+    ArrayList<TableInfo> singleTableInfo = new ArrayList<>();
+    singleTableInfo.add(new TableInfo(tableDescriptor, regionLocator));
+    configureIncrementalLoad(job, singleTableInfo, HFileOutputFormat2.class);
+  }
+
+  static void configureIncrementalLoad(Job job, List<TableInfo> multiTableInfo, Class<? extends OutputFormat<?, ?>> cls) throws IOException {
+    Configuration conf = job.getConfiguration();
+    job.setOutputKeyClass(ImmutableBytesWritable.class);
+    job.setOutputValueClass(KeyValue.class);
+    job.setOutputFormatClass(cls);
+
+    if (multiTableInfo.stream().distinct().count() != multiTableInfo.size()) {
+      throw new IllegalArgumentException("Duplicate entries found in TableInfo argument");
+    }
+    boolean writeMultipleTables = false;
+    if (MultiTableHFileOutputFormat.class.equals(cls)) {
+      writeMultipleTables = true;
+      conf.setBoolean(MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY, true);
+    }
+    // Based on the configured map output class, set the correct reducer to properly
+    // sort the incoming values.
+    // TODO it would be nice to pick one or the other of these formats.
+    if (KeyValue.class.equals(job.getMapOutputValueClass())) {
+      job.setReducerClass(KeyValueSortReducer.class);
+    } else if (Put.class.equals(job.getMapOutputValueClass())) {
+      job.setReducerClass(PutSortReducer.class);
+    } else if (Text.class.equals(job.getMapOutputValueClass())) {
+      job.setReducerClass(TextSortReducer.class);
+    } else {
+      LOG.warn("Unknown map output value type:" + job.getMapOutputValueClass());
+    }
+
+    conf.setStrings("io.serializations", conf.get("io.serializations"),
+        MutationSerialization.class.getName(), ResultSerialization.class.getName(),
+        KeyValueSerialization.class.getName());
+
+    if (conf.getBoolean(LOCALITY_SENSITIVE_CONF_KEY, DEFAULT_LOCALITY_SENSITIVE)) {
+      LOG.info("bulkload locality sensitive enabled");
+    }
+
+    /* Now get the region start keys for every table required */
+    List<String> allTableNames = new ArrayList<>(multiTableInfo.size());
+    List<RegionLocator> regionLocators = new ArrayList<>( multiTableInfo.size());
+    List<TableDescriptor> tableDescriptors = new ArrayList<>( multiTableInfo.size());
+
+    for( TableInfo tableInfo : multiTableInfo )
+    {
+      regionLocators.add(tableInfo.getRegionLocator());
+      allTableNames.add(tableInfo.getRegionLocator().getName().getNameAsString());
+      tableDescriptors.add(tableInfo.getTableDescriptor());
+    }
+    // Record tablenames for creating writer by favored nodes, and decoding compression, block size and other attributes of columnfamily per table
+    conf.set(OUTPUT_TABLE_NAME_CONF_KEY, StringUtils.join(allTableNames, Bytes
+            .toString(tableSeparator)));
+    List<ImmutableBytesWritable> startKeys = getRegionStartKeys(regionLocators, writeMultipleTables);
+    // Use table's region boundaries for TOP split points.
+    LOG.info("Configuring " + startKeys.size() + " reduce partitions " +
+        "to match current region count for all tables");
+    job.setNumReduceTasks(startKeys.size());
+
+    configurePartitioner(job, startKeys, writeMultipleTables);
+    // Set compression algorithms based on column families
+
+    conf.set(COMPRESSION_FAMILIES_CONF_KEY, serializeColumnFamilyAttribute(compressionDetails,
+            tableDescriptors));
+    conf.set(BLOCK_SIZE_FAMILIES_CONF_KEY, serializeColumnFamilyAttribute(blockSizeDetails,
+            tableDescriptors));
+    conf.set(BLOOM_TYPE_FAMILIES_CONF_KEY, serializeColumnFamilyAttribute(bloomTypeDetails,
+            tableDescriptors));
+    conf.set(DATABLOCK_ENCODING_FAMILIES_CONF_KEY,
+            serializeColumnFamilyAttribute(dataBlockEncodingDetails, tableDescriptors));
+
+    TableMapReduceUtil.addDependencyJars(job);
+    TableMapReduceUtil.initCredentials(job);
+    LOG.info("Incremental output configured for tables: " + StringUtils.join(allTableNames, ","));
+  }
+
+  public static void configureIncrementalLoadMap(Job job, TableDescriptor tableDescriptor) throws
+      IOException {
+    Configuration conf = job.getConfiguration();
+
+    job.setOutputKeyClass(ImmutableBytesWritable.class);
+    job.setOutputValueClass(KeyValue.class);
+    job.setOutputFormatClass(HFileOutputFormat2.class);
+
+    ArrayList<TableDescriptor> singleTableDescriptor = new ArrayList<>(1);
+    singleTableDescriptor.add(tableDescriptor);
+
+    conf.set(OUTPUT_TABLE_NAME_CONF_KEY, tableDescriptor.getTableName().getNameAsString());
+    // Set compression algorithms based on column families
+    conf.set(COMPRESSION_FAMILIES_CONF_KEY,
+        serializeColumnFamilyAttribute(compressionDetails, singleTableDescriptor));
+    conf.set(BLOCK_SIZE_FAMILIES_CONF_KEY,
+        serializeColumnFamilyAttribute(blockSizeDetails, singleTableDescriptor));
+    conf.set(BLOOM_TYPE_FAMILIES_CONF_KEY,
+        serializeColumnFamilyAttribute(bloomTypeDetails, singleTableDescriptor));
+    conf.set(DATABLOCK_ENCODING_FAMILIES_CONF_KEY,
+        serializeColumnFamilyAttribute(dataBlockEncodingDetails, singleTableDescriptor));
+
+    TableMapReduceUtil.addDependencyJars(job);
+    TableMapReduceUtil.initCredentials(job);
+    LOG.info("Incremental table " + tableDescriptor.getTableName() + " output configured.");
+  }
+
+  /**
+   * Runs inside the task to deserialize column family to compression algorithm
+   * map from the configuration.
+   *
+   * @param conf to read the serialized values from
+   * @return a map from column family to the configured compression algorithm
+   */
+  @VisibleForTesting
+  static Map<byte[], Algorithm> createFamilyCompressionMap(Configuration
+      conf) {
+    Map<byte[], String> stringMap = createFamilyConfValueMap(conf,
+        COMPRESSION_FAMILIES_CONF_KEY);
+    Map<byte[], Algorithm> compressionMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
+    for (Map.Entry<byte[], String> e : stringMap.entrySet()) {
+      Algorithm algorithm = HFileWriterImpl.compressionByName(e.getValue());
+      compressionMap.put(e.getKey(), algorithm);
+    }
+    return compressionMap;
+  }
+
+  /**
+   * Runs inside the task to deserialize column family to bloom filter type
+   * map from the configuration.
+   *
+   * @param conf to read the serialized values from
+   * @return a map from column family to the the configured bloom filter type
+   */
+  @VisibleForTesting
+  static Map<byte[], BloomType> createFamilyBloomTypeMap(Configuration conf) {
+    Map<byte[], String> stringMap = createFamilyConfValueMap(conf,
+        BLOOM_TYPE_FAMILIES_CONF_KEY);
+    Map<byte[], BloomType> bloomTypeMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
+    for (Map.Entry<byte[], String> e : stringMap.entrySet()) {
+      BloomType bloomType = BloomType.valueOf(e.getValue());
+      bloomTypeMap.put(e.getKey(), bloomType);
+    }
+    return bloomTypeMap;
+  }
+
+  /**
+   * Runs inside the task to deserialize column family to block size
+   * map from the configuration.
+   *
+   * @param conf to read the serialized values from
+   * @return a map from column family to the configured block size
+   */
+  @VisibleForTesting
+  static Map<byte[], Integer> createFamilyBlockSizeMap(Configuration conf) {
+    Map<byte[], String> stringMap = createFamilyConfValueMap(conf,
+        BLOCK_SIZE_FAMILIES_CONF_KEY);
+    Map<byte[], Integer> blockSizeMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
+    for (Map.Entry<byte[], String> e : stringMap.entrySet()) {
+      Integer blockSize = Integer.parseInt(e.getValue());
+      blockSizeMap.put(e.getKey(), blockSize);
+    }
+    return blockSizeMap;
+  }
+
+  /**
+   * Runs inside the task to deserialize column family to data block encoding
+   * type map from the configuration.
+   *
+   * @param conf to read the serialized values from
+   * @return a map from column family to HFileDataBlockEncoder for the
+   *         configured data block type for the family
+   */
+  @VisibleForTesting
+  static Map<byte[], DataBlockEncoding> createFamilyDataBlockEncodingMap(
+      Configuration conf) {
+    Map<byte[], String> stringMap = createFamilyConfValueMap(conf,
+        DATABLOCK_ENCODING_FAMILIES_CONF_KEY);
+    Map<byte[], DataBlockEncoding> encoderMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
+    for (Map.Entry<byte[], String> e : stringMap.entrySet()) {
+      encoderMap.put(e.getKey(), DataBlockEncoding.valueOf((e.getValue())));
+    }
+    return encoderMap;
+  }
+
+
+  /**
+   * Run inside the task to deserialize column family to given conf value map.
+   *
+   * @param conf to read the serialized values from
+   * @param confName conf key to read from the configuration
+   * @return a map of column family to the given configuration value
+   */
+  private static Map<byte[], String> createFamilyConfValueMap(
+      Configuration conf, String confName) {
+    Map<byte[], String> confValMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
+    String confVal = conf.get(confName, "");
+    for (String familyConf : confVal.split("&")) {
+      String[] familySplit = familyConf.split("=");
+      if (familySplit.length != 2) {
+        continue;
+      }
+      try {
+        confValMap.put(URLDecoder.decode(familySplit[0], "UTF-8").getBytes(StandardCharsets.UTF_8),
+            URLDecoder.decode(familySplit[1], "UTF-8"));
+      } catch (UnsupportedEncodingException e) {
+        // will not happen with UTF-8 encoding
+        throw new AssertionError(e);
+      }
+    }
+    return confValMap;
+  }
+
+  /**
+   * Configure <code>job</code> with a TotalOrderPartitioner, partitioning against
+   * <code>splitPoints</code>. Cleans up the partitions file after job exists.
+   */
+  static void configurePartitioner(Job job, List<ImmutableBytesWritable> splitPoints, boolean
+          writeMultipleTables)
+      throws IOException {
+    Configuration conf = job.getConfiguration();
+    // create the partitions file
+    FileSystem fs = FileSystem.get(conf);
+    String hbaseTmpFsDir =
+        conf.get(HConstants.TEMPORARY_FS_DIRECTORY_KEY,
+          HConstants.DEFAULT_TEMPORARY_HDFS_DIRECTORY);
+    Path partitionsPath = new Path(hbaseTmpFsDir, "partitions_" + UUID.randomUUID());
+    fs.makeQualified(partitionsPath);
+    writePartitions(conf, partitionsPath, splitPoints, writeMultipleTables);
+    fs.deleteOnExit(partitionsPath);
+
+    // configure job to use it
+    job.setPartitionerClass(TotalOrderPartitioner.class);
+    TotalOrderPartitioner.setPartitionFile(conf, partitionsPath);
+  }
+
+  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "RCN_REDUNDANT_NULLCHECK_OF_NONNULL_VALUE")
+  @VisibleForTesting
+  static String serializeColumnFamilyAttribute(Function<ColumnFamilyDescriptor, String> fn, List<TableDescriptor> allTables)
+      throws UnsupportedEncodingException {
+    StringBuilder attributeValue = new StringBuilder();
+    int i = 0;
+    for (TableDescriptor tableDescriptor : allTables) {
+      if (tableDescriptor == null) {
+        // could happen with mock table instance
+        // CODEREVIEW: Can I set an empty string in conf if mock table instance?
+        return "";
+      }
+      for (ColumnFamilyDescriptor familyDescriptor : tableDescriptor.getColumnFamilies()) {
+        if (i++ > 0) {
+          attributeValue.append('&');
+        }
+        attributeValue.append(URLEncoder.encode(
+            Bytes.toString(combineTableNameSuffix(tableDescriptor.getTableName().getName(), familyDescriptor.getName())),
+            "UTF-8"));
+        attributeValue.append('=');
+        attributeValue.append(URLEncoder.encode(fn.apply(familyDescriptor), "UTF-8"));
+      }
+    }
+    // Get rid of the last ampersand
+    return attributeValue.toString();
+  }
+
+  /**
+   * Serialize column family to compression algorithm map to configuration.
+   * Invoked while configuring the MR job for incremental load.
+   *
+   * @param tableDescriptor to read the properties from
+   * @param conf to persist serialized values into
+   * @throws IOException
+   *           on failure to read column family descriptors
+   */
+  @VisibleForTesting
+  static Function<ColumnFamilyDescriptor, String> compressionDetails = familyDescriptor ->
+          familyDescriptor.getCompressionType().getName();
+
+  /**
+   * Serialize column family to block size map to configuration. Invoked while
+   * configuring the MR job for incremental load.
+   *
+   * @param tableDescriptor
+   *          to read the properties from
+   * @param conf
+   *          to persist serialized values into
+   *
+   * @throws IOException
+   *           on failure to read column family descriptors
+   */
+  @VisibleForTesting
+  static Function<ColumnFamilyDescriptor, String> blockSizeDetails = familyDescriptor -> String
+          .valueOf(familyDescriptor.getBlocksize());
+
+  /**
+   * Serialize column family to bloom type map to configuration. Invoked while
+   * configuring the MR job for incremental load.
+   *
+   * @param tableDescriptor
+   *          to read the properties from
+   * @param conf
+   *          to persist serialized values into
+   *
+   * @throws IOException
+   *           on failure to read column family descriptors
+   */
+  @VisibleForTesting
+  static Function<ColumnFamilyDescriptor, String> bloomTypeDetails = familyDescriptor -> {
+    String bloomType = familyDescriptor.getBloomFilterType().toString();
+    if (bloomType == null) {
+      bloomType = ColumnFamilyDescriptorBuilder.DEFAULT_BLOOMFILTER.name();
+    }
+    return bloomType;
+  };
+
+  /**
+   * Serialize column family to data block encoding map to configuration.
+   * Invoked while configuring the MR job for incremental load.
+   *
+   * @param tableDescriptor
+   *          to read the properties from
+   * @param conf
+   *          to persist serialized values into
+   * @throws IOException
+   *           on failure to read column family descriptors
+   */
+  @VisibleForTesting
+  static Function<ColumnFamilyDescriptor, String> dataBlockEncodingDetails = familyDescriptor -> {
+    DataBlockEncoding encoding = familyDescriptor.getDataBlockEncoding();
+    if (encoding == null) {
+      encoding = DataBlockEncoding.NONE;
+    }
+    return encoding.toString();
+  };
+
+}