You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ap...@apache.org on 2017/08/26 01:39:38 UTC

[38/41] hbase git commit: HBASE-18640 Move mapreduce out of hbase-server into separate module.

http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HRegionPartitioner.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HRegionPartitioner.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HRegionPartitioner.java
new file mode 100644
index 0000000..3c3060b
--- /dev/null
+++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HRegionPartitioner.java
@@ -0,0 +1,140 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.RegionLocator;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapred.TableOutputFormat;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.mapreduce.Partitioner;
+
+/**
+ * This is used to partition the output keys into groups of keys.
+ * Keys are grouped according to the regions that currently exist
+ * so that each reducer fills a single region so load is distributed.
+ *
+ * <p>This class is not suitable as partitioner creating hfiles
+ * for incremental bulk loads as region spread will likely change between time of
+ * hfile creation and load time. See {@link LoadIncrementalHFiles}
+ * and <a href="http://hbase.apache.org/book.html#arch.bulk.load">Bulk Load</a>.</p>
+ *
+ * @param <KEY>  The type of the key.
+ * @param <VALUE>  The type of the value.
+ */
+@InterfaceAudience.Public
+public class HRegionPartitioner<KEY, VALUE>
+extends Partitioner<ImmutableBytesWritable, VALUE>
+implements Configurable {
+
+  private static final Log LOG = LogFactory.getLog(HRegionPartitioner.class);
+  private Configuration conf = null;
+  // Connection and locator are not cleaned up; they just die when partitioner is done.
+  private Connection connection;
+  private RegionLocator locator;
+  private byte[][] startKeys;
+
+  /**
+   * Gets the partition number for a given key (hence record) given the total
+   * number of partitions i.e. number of reduce-tasks for the job.
+   *
+   * <p>Typically a hash function on a all or a subset of the key.</p>
+   *
+   * @param key  The key to be partitioned.
+   * @param value  The entry value.
+   * @param numPartitions  The total number of partitions.
+   * @return The partition number for the <code>key</code>.
+   * @see org.apache.hadoop.mapreduce.Partitioner#getPartition(
+   *   java.lang.Object, java.lang.Object, int)
+   */
+  @Override
+  public int getPartition(ImmutableBytesWritable key,
+      VALUE value, int numPartitions) {
+    byte[] region = null;
+    // Only one region return 0
+    if (this.startKeys.length == 1){
+      return 0;
+    }
+    try {
+      // Not sure if this is cached after a split so we could have problems
+      // here if a region splits while mapping
+      region = this.locator.getRegionLocation(key.get()).getRegionInfo().getStartKey();
+    } catch (IOException e) {
+      LOG.error(e);
+    }
+    for (int i = 0; i < this.startKeys.length; i++){
+      if (Bytes.compareTo(region, this.startKeys[i]) == 0 ){
+        if (i >= numPartitions-1){
+          // cover if we have less reduces then regions.
+          return (Integer.toString(i).hashCode()
+              & Integer.MAX_VALUE) % numPartitions;
+        }
+        return i;
+      }
+    }
+    // if above fails to find start key that match we need to return something
+    return 0;
+  }
+
+  /**
+   * Returns the current configuration.
+   *
+   * @return The current configuration.
+   * @see org.apache.hadoop.conf.Configurable#getConf()
+   */
+  @Override
+  public Configuration getConf() {
+    return conf;
+  }
+
+  /**
+   * Sets the configuration. This is used to determine the start keys for the
+   * given table.
+   *
+   * @param configuration  The configuration to set.
+   * @see org.apache.hadoop.conf.Configurable#setConf(
+   *   org.apache.hadoop.conf.Configuration)
+   */
+  @Override
+  public void setConf(Configuration configuration) {
+    this.conf = HBaseConfiguration.create(configuration);
+    try {
+      this.connection = ConnectionFactory.createConnection(HBaseConfiguration.create(conf));
+      TableName tableName = TableName.valueOf(conf.get(TableOutputFormat.OUTPUT_TABLE));
+      this.locator = this.connection.getRegionLocator(tableName);
+    } catch (IOException e) {
+      LOG.error(e);
+    }
+    try {
+      this.startKeys = this.locator.getStartKeys();
+    } catch (IOException e) {
+      LOG.error(e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HashTable.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HashTable.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HashTable.java
new file mode 100644
index 0000000..2c8caf5
--- /dev/null
+++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HashTable.java
@@ -0,0 +1,747 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.io.MapFile;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.MapFileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner;
+import org.apache.hadoop.util.GenericOptionsParser;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+import org.apache.hadoop.hbase.shaded.com.google.common.base.Charsets;
+import org.apache.hadoop.hbase.shaded.com.google.common.base.Throwables;
+import org.apache.hadoop.hbase.shaded.com.google.common.collect.Ordering;
+
+public class HashTable extends Configured implements Tool {
+
+  private static final Log LOG = LogFactory.getLog(HashTable.class);
+
+  private static final int DEFAULT_BATCH_SIZE = 8000;
+
+  private final static String HASH_BATCH_SIZE_CONF_KEY = "hash.batch.size";
+  final static String PARTITIONS_FILE_NAME = "partitions";
+  final static String MANIFEST_FILE_NAME = "manifest";
+  final static String HASH_DATA_DIR = "hashes";
+  final static String OUTPUT_DATA_FILE_PREFIX = "part-r-";
+  private final static String TMP_MANIFEST_FILE_NAME = "manifest.tmp";
+
+  TableHash tableHash = new TableHash();
+  Path destPath;
+
+  public HashTable(Configuration conf) {
+    super(conf);
+  }
+
+  public static class TableHash {
+
+    Path hashDir;
+
+    String tableName;
+    String families = null;
+    long batchSize = DEFAULT_BATCH_SIZE;
+    int numHashFiles = 0;
+    byte[] startRow = HConstants.EMPTY_START_ROW;
+    byte[] stopRow = HConstants.EMPTY_END_ROW;
+    int scanBatch = 0;
+    int versions = -1;
+    long startTime = 0;
+    long endTime = 0;
+
+    List<ImmutableBytesWritable> partitions;
+
+    public static TableHash read(Configuration conf, Path hashDir) throws IOException {
+      TableHash tableHash = new TableHash();
+      FileSystem fs = hashDir.getFileSystem(conf);
+      tableHash.hashDir = hashDir;
+      tableHash.readPropertiesFile(fs, new Path(hashDir, MANIFEST_FILE_NAME));
+      tableHash.readPartitionFile(fs, conf, new Path(hashDir, PARTITIONS_FILE_NAME));
+      return tableHash;
+    }
+
+    void writePropertiesFile(FileSystem fs, Path path) throws IOException {
+      Properties p = new Properties();
+      p.setProperty("table", tableName);
+      if (families != null) {
+        p.setProperty("columnFamilies", families);
+      }
+      p.setProperty("targetBatchSize", Long.toString(batchSize));
+      p.setProperty("numHashFiles", Integer.toString(numHashFiles));
+      if (!isTableStartRow(startRow)) {
+        p.setProperty("startRowHex", Bytes.toHex(startRow));
+      }
+      if (!isTableEndRow(stopRow)) {
+        p.setProperty("stopRowHex", Bytes.toHex(stopRow));
+      }
+      if (scanBatch > 0) {
+        p.setProperty("scanBatch", Integer.toString(scanBatch));
+      }
+      if (versions >= 0) {
+        p.setProperty("versions", Integer.toString(versions));
+      }
+      if (startTime != 0) {
+        p.setProperty("startTimestamp", Long.toString(startTime));
+      }
+      if (endTime != 0) {
+        p.setProperty("endTimestamp", Long.toString(endTime));
+      }
+
+      try (OutputStreamWriter osw = new OutputStreamWriter(fs.create(path), Charsets.UTF_8)) {
+        p.store(osw, null);
+      }
+    }
+
+    void readPropertiesFile(FileSystem fs, Path path) throws IOException {
+      Properties p = new Properties();
+      try (FSDataInputStream in = fs.open(path)) {
+        try (InputStreamReader isr = new InputStreamReader(in, Charsets.UTF_8)) {
+          p.load(isr);
+        }
+      }
+      tableName = p.getProperty("table");
+      families = p.getProperty("columnFamilies");
+      batchSize = Long.parseLong(p.getProperty("targetBatchSize"));
+      numHashFiles = Integer.parseInt(p.getProperty("numHashFiles"));
+
+      String startRowHex = p.getProperty("startRowHex");
+      if (startRowHex != null) {
+        startRow = Bytes.fromHex(startRowHex);
+      }
+      String stopRowHex = p.getProperty("stopRowHex");
+      if (stopRowHex != null) {
+        stopRow = Bytes.fromHex(stopRowHex);
+      }
+
+      String scanBatchString = p.getProperty("scanBatch");
+      if (scanBatchString != null) {
+        scanBatch = Integer.parseInt(scanBatchString);
+      }
+
+      String versionString = p.getProperty("versions");
+      if (versionString != null) {
+        versions = Integer.parseInt(versionString);
+      }
+
+      String startTimeString = p.getProperty("startTimestamp");
+      if (startTimeString != null) {
+        startTime = Long.parseLong(startTimeString);
+      }
+
+      String endTimeString = p.getProperty("endTimestamp");
+      if (endTimeString != null) {
+        endTime = Long.parseLong(endTimeString);
+      }
+    }
+
+    Scan initScan() throws IOException {
+      Scan scan = new Scan();
+      scan.setCacheBlocks(false);
+      if (startTime != 0 || endTime != 0) {
+        scan.setTimeRange(startTime, endTime == 0 ? HConstants.LATEST_TIMESTAMP : endTime);
+      }
+      if (scanBatch > 0) {
+        scan.setBatch(scanBatch);
+      }
+      if (versions >= 0) {
+        scan.setMaxVersions(versions);
+      }
+      if (!isTableStartRow(startRow)) {
+        scan.setStartRow(startRow);
+      }
+      if (!isTableEndRow(stopRow)) {
+        scan.setStopRow(stopRow);
+      }
+      if(families != null) {
+        for(String fam : families.split(",")) {
+          scan.addFamily(Bytes.toBytes(fam));
+        }
+      }
+      return scan;
+    }
+
+    /**
+     * Choose partitions between row ranges to hash to a single output file
+     * Selects region boundaries that fall within the scan range, and groups them
+     * into the desired number of partitions.
+     */
+    void selectPartitions(Pair<byte[][], byte[][]> regionStartEndKeys) {
+      List<byte[]> startKeys = new ArrayList<>();
+      for (int i = 0; i < regionStartEndKeys.getFirst().length; i++) {
+        byte[] regionStartKey = regionStartEndKeys.getFirst()[i];
+        byte[] regionEndKey = regionStartEndKeys.getSecond()[i];
+
+        // if scan begins after this region, or starts before this region, then drop this region
+        // in other words:
+        //   IF (scan begins before the end of this region
+        //      AND scan ends before the start of this region)
+        //   THEN include this region
+        if ((isTableStartRow(startRow) || isTableEndRow(regionEndKey)
+            || Bytes.compareTo(startRow, regionEndKey) < 0)
+          && (isTableEndRow(stopRow) || isTableStartRow(regionStartKey)
+            || Bytes.compareTo(stopRow, regionStartKey) > 0)) {
+          startKeys.add(regionStartKey);
+        }
+      }
+
+      int numRegions = startKeys.size();
+      if (numHashFiles == 0) {
+        numHashFiles = numRegions / 100;
+      }
+      if (numHashFiles == 0) {
+        numHashFiles = 1;
+      }
+      if (numHashFiles > numRegions) {
+        // can't partition within regions
+        numHashFiles = numRegions;
+      }
+
+      // choose a subset of start keys to group regions into ranges
+      partitions = new ArrayList<>(numHashFiles - 1);
+      // skip the first start key as it is not a partition between ranges.
+      for (long i = 1; i < numHashFiles; i++) {
+        int splitIndex = (int) (numRegions * i / numHashFiles);
+        partitions.add(new ImmutableBytesWritable(startKeys.get(splitIndex)));
+      }
+    }
+
+    void writePartitionFile(Configuration conf, Path path) throws IOException {
+      FileSystem fs = path.getFileSystem(conf);
+      @SuppressWarnings("deprecation")
+      SequenceFile.Writer writer = SequenceFile.createWriter(
+        fs, conf, path, ImmutableBytesWritable.class, NullWritable.class);
+
+      for (int i = 0; i < partitions.size(); i++) {
+        writer.append(partitions.get(i), NullWritable.get());
+      }
+      writer.close();
+    }
+
+    private void readPartitionFile(FileSystem fs, Configuration conf, Path path)
+         throws IOException {
+      @SuppressWarnings("deprecation")
+      SequenceFile.Reader reader = new SequenceFile.Reader(fs, path, conf);
+      ImmutableBytesWritable key = new ImmutableBytesWritable();
+      partitions = new ArrayList<>();
+      while (reader.next(key)) {
+        partitions.add(new ImmutableBytesWritable(key.copyBytes()));
+      }
+      reader.close();
+
+      if (!Ordering.natural().isOrdered(partitions)) {
+        throw new IOException("Partitions are not ordered!");
+      }
+    }
+
+    @Override
+    public String toString() {
+      StringBuilder sb = new StringBuilder();
+      sb.append("tableName=").append(tableName);
+      if (families != null) {
+        sb.append(", families=").append(families);
+      }
+      sb.append(", batchSize=").append(batchSize);
+      sb.append(", numHashFiles=").append(numHashFiles);
+      if (!isTableStartRow(startRow)) {
+        sb.append(", startRowHex=").append(Bytes.toHex(startRow));
+      }
+      if (!isTableEndRow(stopRow)) {
+        sb.append(", stopRowHex=").append(Bytes.toHex(stopRow));
+      }
+      if (scanBatch >= 0) {
+        sb.append(", scanBatch=").append(scanBatch);
+      }
+      if (versions >= 0) {
+        sb.append(", versions=").append(versions);
+      }
+      if (startTime != 0) {
+        sb.append("startTime=").append(startTime);
+      }
+      if (endTime != 0) {
+        sb.append("endTime=").append(endTime);
+      }
+      return sb.toString();
+    }
+
+    static String getDataFileName(int hashFileIndex) {
+      return String.format(HashTable.OUTPUT_DATA_FILE_PREFIX + "%05d", hashFileIndex);
+    }
+
+    /**
+     * Open a TableHash.Reader starting at the first hash at or after the given key.
+     * @throws IOException
+     */
+    public Reader newReader(Configuration conf, ImmutableBytesWritable startKey)
+        throws IOException {
+      return new Reader(conf, startKey);
+    }
+
+    public class Reader implements java.io.Closeable {
+      private final Configuration conf;
+
+      private int hashFileIndex;
+      private MapFile.Reader mapFileReader;
+
+      private boolean cachedNext;
+      private ImmutableBytesWritable key;
+      private ImmutableBytesWritable hash;
+
+      Reader(Configuration conf, ImmutableBytesWritable startKey) throws IOException {
+        this.conf = conf;
+        int partitionIndex = Collections.binarySearch(partitions, startKey);
+        if (partitionIndex >= 0) {
+          // if the key is equal to a partition, then go the file after that partition
+          hashFileIndex = partitionIndex+1;
+        } else {
+          // if the key is between partitions, then go to the file between those partitions
+          hashFileIndex = -1-partitionIndex;
+        }
+        openHashFile();
+
+        // MapFile's don't make it easy to seek() so that the subsequent next() returns
+        // the desired key/value pair.  So we cache it for the first call of next().
+        hash = new ImmutableBytesWritable();
+        key = (ImmutableBytesWritable) mapFileReader.getClosest(startKey, hash);
+        if (key == null) {
+          cachedNext = false;
+          hash = null;
+        } else {
+          cachedNext = true;
+        }
+      }
+
+      /**
+       * Read the next key/hash pair.
+       * Returns true if such a pair exists and false when at the end of the data.
+       */
+      public boolean next() throws IOException {
+        if (cachedNext) {
+          cachedNext = false;
+          return true;
+        }
+        key = new ImmutableBytesWritable();
+        hash = new ImmutableBytesWritable();
+        while (true) {
+          boolean hasNext = mapFileReader.next(key, hash);
+          if (hasNext) {
+            return true;
+          }
+          hashFileIndex++;
+          if (hashFileIndex < TableHash.this.numHashFiles) {
+            mapFileReader.close();
+            openHashFile();
+          } else {
+            key = null;
+            hash = null;
+            return false;
+          }
+        }
+      }
+
+      /**
+       * Get the current key
+       * @return the current key or null if there is no current key
+       */
+      public ImmutableBytesWritable getCurrentKey() {
+        return key;
+      }
+
+      /**
+       * Get the current hash
+       * @return the current hash or null if there is no current hash
+       */
+      public ImmutableBytesWritable getCurrentHash() {
+        return hash;
+      }
+
+      private void openHashFile() throws IOException {
+        if (mapFileReader != null) {
+          mapFileReader.close();
+        }
+        Path dataDir = new Path(TableHash.this.hashDir, HASH_DATA_DIR);
+        Path dataFile = new Path(dataDir, getDataFileName(hashFileIndex));
+        mapFileReader = new MapFile.Reader(dataFile, conf);
+      }
+
+      @Override
+      public void close() throws IOException {
+        mapFileReader.close();
+      }
+    }
+  }
+
+  static boolean isTableStartRow(byte[] row) {
+    return Bytes.equals(HConstants.EMPTY_START_ROW, row);
+  }
+
+  static boolean isTableEndRow(byte[] row) {
+    return Bytes.equals(HConstants.EMPTY_END_ROW, row);
+  }
+
+  public Job createSubmittableJob(String[] args) throws IOException {
+    Path partitionsPath = new Path(destPath, PARTITIONS_FILE_NAME);
+    generatePartitions(partitionsPath);
+
+    Job job = Job.getInstance(getConf(),
+          getConf().get("mapreduce.job.name", "hashTable_" + tableHash.tableName));
+    Configuration jobConf = job.getConfiguration();
+    jobConf.setLong(HASH_BATCH_SIZE_CONF_KEY, tableHash.batchSize);
+    job.setJarByClass(HashTable.class);
+
+    TableMapReduceUtil.initTableMapperJob(tableHash.tableName, tableHash.initScan(),
+        HashMapper.class, ImmutableBytesWritable.class, ImmutableBytesWritable.class, job);
+
+    // use a TotalOrderPartitioner and reducers to group region output into hash files
+    job.setPartitionerClass(TotalOrderPartitioner.class);
+    TotalOrderPartitioner.setPartitionFile(jobConf, partitionsPath);
+    job.setReducerClass(Reducer.class);  // identity reducer
+    job.setNumReduceTasks(tableHash.numHashFiles);
+    job.setOutputKeyClass(ImmutableBytesWritable.class);
+    job.setOutputValueClass(ImmutableBytesWritable.class);
+    job.setOutputFormatClass(MapFileOutputFormat.class);
+    FileOutputFormat.setOutputPath(job, new Path(destPath, HASH_DATA_DIR));
+
+    return job;
+  }
+
+  private void generatePartitions(Path partitionsPath) throws IOException {
+    Connection connection = ConnectionFactory.createConnection(getConf());
+    Pair<byte[][], byte[][]> regionKeys
+      = connection.getRegionLocator(TableName.valueOf(tableHash.tableName)).getStartEndKeys();
+    connection.close();
+
+    tableHash.selectPartitions(regionKeys);
+    LOG.info("Writing " + tableHash.partitions.size() + " partition keys to " + partitionsPath);
+
+    tableHash.writePartitionFile(getConf(), partitionsPath);
+  }
+
+  static class ResultHasher {
+    private MessageDigest digest;
+
+    private boolean batchStarted = false;
+    private ImmutableBytesWritable batchStartKey;
+    private ImmutableBytesWritable batchHash;
+    private long batchSize = 0;
+
+
+    public ResultHasher() {
+      try {
+        digest = MessageDigest.getInstance("MD5");
+      } catch (NoSuchAlgorithmException e) {
+        Throwables.propagate(e);
+      }
+    }
+
+    public void startBatch(ImmutableBytesWritable row) {
+      if (batchStarted) {
+        throw new RuntimeException("Cannot start new batch without finishing existing one.");
+      }
+      batchStarted = true;
+      batchSize = 0;
+      batchStartKey = row;
+      batchHash = null;
+    }
+
+    public void hashResult(Result result) {
+      if (!batchStarted) {
+        throw new RuntimeException("Cannot add to batch that has not been started.");
+      }
+      for (Cell cell : result.rawCells()) {
+        int rowLength = cell.getRowLength();
+        int familyLength = cell.getFamilyLength();
+        int qualifierLength = cell.getQualifierLength();
+        int valueLength = cell.getValueLength();
+        digest.update(cell.getRowArray(), cell.getRowOffset(), rowLength);
+        digest.update(cell.getFamilyArray(), cell.getFamilyOffset(), familyLength);
+        digest.update(cell.getQualifierArray(), cell.getQualifierOffset(), qualifierLength);
+        long ts = cell.getTimestamp();
+        for (int i = 8; i > 0; i--) {
+          digest.update((byte) ts);
+          ts >>>= 8;
+        }
+        digest.update(cell.getValueArray(), cell.getValueOffset(), valueLength);
+
+        batchSize += rowLength + familyLength + qualifierLength + 8 + valueLength;
+      }
+    }
+
+    public void finishBatch() {
+      if (!batchStarted) {
+        throw new RuntimeException("Cannot finish batch that has not started.");
+      }
+      batchStarted = false;
+      batchHash = new ImmutableBytesWritable(digest.digest());
+    }
+
+    public boolean isBatchStarted() {
+      return batchStarted;
+    }
+
+    public ImmutableBytesWritable getBatchStartKey() {
+      return batchStartKey;
+    }
+
+    public ImmutableBytesWritable getBatchHash() {
+      return batchHash;
+    }
+
+    public long getBatchSize() {
+      return batchSize;
+    }
+  }
+
+  public static class HashMapper
+    extends TableMapper<ImmutableBytesWritable, ImmutableBytesWritable> {
+
+    private ResultHasher hasher;
+    private long targetBatchSize;
+
+    private ImmutableBytesWritable currentRow;
+
+    @Override
+    protected void setup(Context context) throws IOException, InterruptedException {
+      targetBatchSize = context.getConfiguration()
+          .getLong(HASH_BATCH_SIZE_CONF_KEY, DEFAULT_BATCH_SIZE);
+      hasher = new ResultHasher();
+
+      TableSplit split = (TableSplit) context.getInputSplit();
+      hasher.startBatch(new ImmutableBytesWritable(split.getStartRow()));
+    }
+
+    @Override
+    protected void map(ImmutableBytesWritable key, Result value, Context context)
+        throws IOException, InterruptedException {
+
+      if (currentRow == null || !currentRow.equals(key)) {
+        currentRow = new ImmutableBytesWritable(key); // not immutable
+
+        if (hasher.getBatchSize() >= targetBatchSize) {
+          hasher.finishBatch();
+          context.write(hasher.getBatchStartKey(), hasher.getBatchHash());
+          hasher.startBatch(currentRow);
+        }
+      }
+
+      hasher.hashResult(value);
+    }
+
+    @Override
+    protected void cleanup(Context context) throws IOException, InterruptedException {
+      hasher.finishBatch();
+      context.write(hasher.getBatchStartKey(), hasher.getBatchHash());
+    }
+  }
+
+  private void writeTempManifestFile() throws IOException {
+    Path tempManifestPath = new Path(destPath, TMP_MANIFEST_FILE_NAME);
+    FileSystem fs = tempManifestPath.getFileSystem(getConf());
+    tableHash.writePropertiesFile(fs, tempManifestPath);
+  }
+
+  private void completeManifest() throws IOException {
+    Path tempManifestPath = new Path(destPath, TMP_MANIFEST_FILE_NAME);
+    Path manifestPath = new Path(destPath, MANIFEST_FILE_NAME);
+    FileSystem fs = tempManifestPath.getFileSystem(getConf());
+    fs.rename(tempManifestPath, manifestPath);
+  }
+
+  private static final int NUM_ARGS = 2;
+  private static void printUsage(final String errorMsg) {
+    if (errorMsg != null && errorMsg.length() > 0) {
+      System.err.println("ERROR: " + errorMsg);
+      System.err.println();
+    }
+    System.err.println("Usage: HashTable [options] <tablename> <outputpath>");
+    System.err.println();
+    System.err.println("Options:");
+    System.err.println(" batchsize     the target amount of bytes to hash in each batch");
+    System.err.println("               rows are added to the batch until this size is reached");
+    System.err.println("               (defaults to " + DEFAULT_BATCH_SIZE + " bytes)");
+    System.err.println(" numhashfiles  the number of hash files to create");
+    System.err.println("               if set to fewer than number of regions then");
+    System.err.println("               the job will create this number of reducers");
+    System.err.println("               (defaults to 1/100 of regions -- at least 1)");
+    System.err.println(" startrow      the start row");
+    System.err.println(" stoprow       the stop row");
+    System.err.println(" starttime     beginning of the time range (unixtime in millis)");
+    System.err.println("               without endtime means from starttime to forever");
+    System.err.println(" endtime       end of the time range.  Ignored if no starttime specified.");
+    System.err.println(" scanbatch     scanner batch size to support intra row scans");
+    System.err.println(" versions      number of cell versions to include");
+    System.err.println(" families      comma-separated list of families to include");
+    System.err.println();
+    System.err.println("Args:");
+    System.err.println(" tablename     Name of the table to hash");
+    System.err.println(" outputpath    Filesystem path to put the output data");
+    System.err.println();
+    System.err.println("Examples:");
+    System.err.println(" To hash 'TestTable' in 32kB batches for a 1 hour window into 50 files:");
+    System.err.println(" $ hbase " +
+        "org.apache.hadoop.hbase.mapreduce.HashTable --batchsize=32000 --numhashfiles=50"
+        + " --starttime=1265875194289 --endtime=1265878794289 --families=cf2,cf3"
+        + " TestTable /hashes/testTable");
+  }
+
+  private boolean doCommandLine(final String[] args) {
+    if (args.length < NUM_ARGS) {
+      printUsage(null);
+      return false;
+    }
+    try {
+
+      tableHash.tableName = args[args.length-2];
+      destPath = new Path(args[args.length-1]);
+
+      for (int i = 0; i < args.length - NUM_ARGS; i++) {
+        String cmd = args[i];
+        if (cmd.equals("-h") || cmd.startsWith("--h")) {
+          printUsage(null);
+          return false;
+        }
+
+        final String batchSizeArgKey = "--batchsize=";
+        if (cmd.startsWith(batchSizeArgKey)) {
+          tableHash.batchSize = Long.parseLong(cmd.substring(batchSizeArgKey.length()));
+          continue;
+        }
+
+        final String numHashFilesArgKey = "--numhashfiles=";
+        if (cmd.startsWith(numHashFilesArgKey)) {
+          tableHash.numHashFiles = Integer.parseInt(cmd.substring(numHashFilesArgKey.length()));
+          continue;
+        }
+
+        final String startRowArgKey = "--startrow=";
+        if (cmd.startsWith(startRowArgKey)) {
+          tableHash.startRow = Bytes.fromHex(cmd.substring(startRowArgKey.length()));
+          continue;
+        }
+
+        final String stopRowArgKey = "--stoprow=";
+        if (cmd.startsWith(stopRowArgKey)) {
+          tableHash.stopRow = Bytes.fromHex(cmd.substring(stopRowArgKey.length()));
+          continue;
+        }
+
+        final String startTimeArgKey = "--starttime=";
+        if (cmd.startsWith(startTimeArgKey)) {
+          tableHash.startTime = Long.parseLong(cmd.substring(startTimeArgKey.length()));
+          continue;
+        }
+
+        final String endTimeArgKey = "--endtime=";
+        if (cmd.startsWith(endTimeArgKey)) {
+          tableHash.endTime = Long.parseLong(cmd.substring(endTimeArgKey.length()));
+          continue;
+        }
+
+        final String scanBatchArgKey = "--scanbatch=";
+        if (cmd.startsWith(scanBatchArgKey)) {
+          tableHash.scanBatch = Integer.parseInt(cmd.substring(scanBatchArgKey.length()));
+          continue;
+        }
+
+        final String versionsArgKey = "--versions=";
+        if (cmd.startsWith(versionsArgKey)) {
+          tableHash.versions = Integer.parseInt(cmd.substring(versionsArgKey.length()));
+          continue;
+        }
+
+        final String familiesArgKey = "--families=";
+        if (cmd.startsWith(familiesArgKey)) {
+          tableHash.families = cmd.substring(familiesArgKey.length());
+          continue;
+        }
+
+        printUsage("Invalid argument '" + cmd + "'");
+        return false;
+      }
+      if ((tableHash.startTime != 0 || tableHash.endTime != 0)
+          && (tableHash.startTime >= tableHash.endTime)) {
+        printUsage("Invalid time range filter: starttime="
+            + tableHash.startTime + " >=  endtime=" + tableHash.endTime);
+        return false;
+      }
+
+    } catch (Exception e) {
+      e.printStackTrace();
+      printUsage("Can't start because " + e.getMessage());
+      return false;
+    }
+    return true;
+  }
+
+  /**
+   * Main entry point.
+   */
+  public static void main(String[] args) throws Exception {
+    int ret = ToolRunner.run(new HashTable(HBaseConfiguration.create()), args);
+    System.exit(ret);
+  }
+
+  @Override
+  public int run(String[] args) throws Exception {
+    String[] otherArgs = new GenericOptionsParser(getConf(), args).getRemainingArgs();
+    if (!doCommandLine(otherArgs)) {
+      return 1;
+    }
+
+    Job job = createSubmittableJob(otherArgs);
+    writeTempManifestFile();
+    if (!job.waitForCompletion(true)) {
+      LOG.info("Map-reduce job failed!");
+      return 1;
+    }
+    completeManifest();
+    return 0;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/IdentityTableMapper.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/IdentityTableMapper.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/IdentityTableMapper.java
new file mode 100644
index 0000000..7103ef8
--- /dev/null
+++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/IdentityTableMapper.java
@@ -0,0 +1,67 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.mapreduce.Job;
+
+/**
+ * Pass the given key and record as-is to the reduce phase.
+ */
+@InterfaceAudience.Public
+public class IdentityTableMapper
+extends TableMapper<ImmutableBytesWritable, Result> {
+
+  /**
+   * Use this before submitting a TableMap job. It will appropriately set up
+   * the job.
+   *
+   * @param table  The table name.
+   * @param scan  The scan with the columns to scan.
+   * @param mapper  The mapper class.
+   * @param job  The job configuration.
+   * @throws IOException When setting up the job fails.
+   */
+  @SuppressWarnings("rawtypes")
+  public static void initJob(String table, Scan scan,
+    Class<? extends TableMapper> mapper, Job job) throws IOException {
+    TableMapReduceUtil.initTableMapperJob(table, scan, mapper,
+      ImmutableBytesWritable.class, Result.class, job);
+  }
+
+  /**
+   * Pass the key, value to reduce.
+   *
+   * @param key  The current key.
+   * @param value  The current value.
+   * @param context  The current context.
+   * @throws IOException When writing the record fails.
+   * @throws InterruptedException When the job is aborted.
+   */
+  public void map(ImmutableBytesWritable key, Result value, Context context)
+  throws IOException, InterruptedException {
+    context.write(key, value);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/IdentityTableReducer.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/IdentityTableReducer.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/IdentityTableReducer.java
new file mode 100644
index 0000000..73475db
--- /dev/null
+++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/IdentityTableReducer.java
@@ -0,0 +1,79 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * Convenience class that simply writes all values (which must be
+ * {@link org.apache.hadoop.hbase.client.Put Put} or
+ * {@link org.apache.hadoop.hbase.client.Delete Delete} instances)
+ * passed to it out to the configured HBase table. This works in combination
+ * with {@link TableOutputFormat} which actually does the writing to HBase.<p>
+ *
+ * Keys are passed along but ignored in TableOutputFormat.  However, they can
+ * be used to control how your values will be divided up amongst the specified
+ * number of reducers. <p>
+ *
+ * You can also use the {@link TableMapReduceUtil} class to set up the two
+ * classes in one step:
+ * <blockquote><code>
+ * TableMapReduceUtil.initTableReducerJob("table", IdentityTableReducer.class, job);
+ * </code></blockquote>
+ * This will also set the proper {@link TableOutputFormat} which is given the
+ * <code>table</code> parameter. The
+ * {@link org.apache.hadoop.hbase.client.Put Put} or
+ * {@link org.apache.hadoop.hbase.client.Delete Delete} define the
+ * row and columns implicitly.
+ */
+@InterfaceAudience.Public
+public class IdentityTableReducer
+extends TableReducer<Writable, Mutation, Writable> {
+
+  @SuppressWarnings("unused")
+  private static final Log LOG = LogFactory.getLog(IdentityTableReducer.class);
+
+  /**
+   * Writes each given record, consisting of the row key and the given values,
+   * to the configured {@link org.apache.hadoop.mapreduce.OutputFormat}.
+   * It is emitting the row key and each {@link org.apache.hadoop.hbase.client.Put Put}
+   * or {@link org.apache.hadoop.hbase.client.Delete Delete} as separate pairs.
+   *
+   * @param key  The current row key.
+   * @param values  The {@link org.apache.hadoop.hbase.client.Put Put} or
+   *   {@link org.apache.hadoop.hbase.client.Delete Delete} list for the given
+   *   row.
+   * @param context  The context of the reduce.
+   * @throws IOException When writing the record fails.
+   * @throws InterruptedException When the job gets interrupted.
+   */
+  @Override
+  public void reduce(Writable key, Iterable<Mutation> values, Context context)
+  throws IOException, InterruptedException {
+    for(Mutation putOrDelete : values) {
+      context.write(key, putOrDelete);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java
new file mode 100644
index 0000000..18dcf35
--- /dev/null
+++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java
@@ -0,0 +1,780 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.UUID;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellComparator;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValueUtil;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.ZooKeeperConnectionException;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Durability;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.RegionLocator;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.WritableComparator;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Partitioner;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.TaskCounter;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.zookeeper.KeeperException;
+
+
+/**
+ * Import data written by {@link Export}.
+ */
+@InterfaceAudience.Public
+public class Import extends Configured implements Tool {
+  private static final Log LOG = LogFactory.getLog(Import.class);
+  final static String NAME = "import";
+  public final static String CF_RENAME_PROP = "HBASE_IMPORTER_RENAME_CFS";
+  public final static String BULK_OUTPUT_CONF_KEY = "import.bulk.output";
+  public final static String FILTER_CLASS_CONF_KEY = "import.filter.class";
+  public final static String FILTER_ARGS_CONF_KEY = "import.filter.args";
+  public final static String TABLE_NAME = "import.table.name";
+  public final static String WAL_DURABILITY = "import.wal.durability";
+  public final static String HAS_LARGE_RESULT= "import.bulk.hasLargeResult";
+
+  private final static String JOB_NAME_CONF_KEY = "mapreduce.job.name";
+
+  public static class KeyValueWritableComparablePartitioner
+      extends Partitioner<KeyValueWritableComparable, KeyValue> {
+    private static KeyValueWritableComparable[] START_KEYS = null;
+    @Override
+    public int getPartition(KeyValueWritableComparable key, KeyValue value,
+        int numPartitions) {
+      for (int i = 0; i < START_KEYS.length; ++i) {
+        if (key.compareTo(START_KEYS[i]) <= 0) {
+          return i;
+        }
+      }
+      return START_KEYS.length;
+    }
+
+  }
+
+  public static class KeyValueWritableComparable
+      implements WritableComparable<KeyValueWritableComparable> {
+
+    private KeyValue kv = null;
+
+    static {
+      // register this comparator
+      WritableComparator.define(KeyValueWritableComparable.class,
+          new KeyValueWritableComparator());
+    }
+
+    public KeyValueWritableComparable() {
+    }
+
+    public KeyValueWritableComparable(KeyValue kv) {
+      this.kv = kv;
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+      KeyValue.write(kv, out);
+    }
+
+    @Override
+    public void readFields(DataInput in) throws IOException {
+      kv = KeyValue.create(in);
+    }
+
+    @Override
+    @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="EQ_COMPARETO_USE_OBJECT_EQUALS",
+      justification="This is wrong, yes, but we should be purging Writables, not fixing them")
+    public int compareTo(KeyValueWritableComparable o) {
+      return CellComparator.COMPARATOR.compare(this.kv, ((KeyValueWritableComparable)o).kv);
+    }
+
+    public static class KeyValueWritableComparator extends WritableComparator {
+
+      @Override
+      public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+        try {
+          KeyValueWritableComparable kv1 = new KeyValueWritableComparable();
+          kv1.readFields(new DataInputStream(new ByteArrayInputStream(b1, s1, l1)));
+          KeyValueWritableComparable kv2 = new KeyValueWritableComparable();
+          kv2.readFields(new DataInputStream(new ByteArrayInputStream(b2, s2, l2)));
+          return compare(kv1, kv2);
+        } catch (IOException e) {
+          throw new RuntimeException(e);
+        }
+      }
+
+    }
+
+  }
+
+  public static class KeyValueReducer
+      extends
+      Reducer<KeyValueWritableComparable, KeyValue, ImmutableBytesWritable, KeyValue> {
+    protected void reduce(
+        KeyValueWritableComparable row,
+        Iterable<KeyValue> kvs,
+        Reducer<KeyValueWritableComparable,
+          KeyValue, ImmutableBytesWritable, KeyValue>.Context context)
+        throws java.io.IOException, InterruptedException {
+      int index = 0;
+      for (KeyValue kv : kvs) {
+        context.write(new ImmutableBytesWritable(kv.getRowArray()), kv);
+        if (++index % 100 == 0)
+          context.setStatus("Wrote " + index + " KeyValues, "
+              + "and the rowkey whose is being wrote is " + Bytes.toString(kv.getRowArray()));
+      }
+    }
+  }
+
+  public static class KeyValueSortImporter
+      extends TableMapper<KeyValueWritableComparable, KeyValue> {
+    private Map<byte[], byte[]> cfRenameMap;
+    private Filter filter;
+    private static final Log LOG = LogFactory.getLog(KeyValueImporter.class);
+
+    /**
+     * @param row  The current table row key.
+     * @param value  The columns.
+     * @param context  The current context.
+     * @throws IOException When something is broken with the data.
+     */
+    @Override
+    public void map(ImmutableBytesWritable row, Result value,
+      Context context)
+    throws IOException {
+      try {
+        if (LOG.isTraceEnabled()) {
+          LOG.trace("Considering the row."
+              + Bytes.toString(row.get(), row.getOffset(), row.getLength()));
+        }
+        if (filter == null
+            || !filter.filterRowKey(CellUtil.createFirstOnRow(row.get(), row.getOffset(),
+                (short) row.getLength()))) {
+          for (Cell kv : value.rawCells()) {
+            kv = filterKv(filter, kv);
+            // skip if we filtered it out
+            if (kv == null) continue;
+            // TODO get rid of ensureKeyValue
+            KeyValue ret = KeyValueUtil.ensureKeyValue(convertKv(kv, cfRenameMap));
+            context.write(new KeyValueWritableComparable(ret.createKeyOnly(false)), ret);
+          }
+        }
+      } catch (InterruptedException e) {
+        e.printStackTrace();
+      }
+    }
+
+    @Override
+    public void setup(Context context) throws IOException {
+      cfRenameMap = createCfRenameMap(context.getConfiguration());
+      filter = instantiateFilter(context.getConfiguration());
+      int reduceNum = context.getNumReduceTasks();
+      Configuration conf = context.getConfiguration();
+      TableName tableName = TableName.valueOf(context.getConfiguration().get(TABLE_NAME));
+      try (Connection conn = ConnectionFactory.createConnection(conf);
+          RegionLocator regionLocator = conn.getRegionLocator(tableName)) {
+        byte[][] startKeys = regionLocator.getStartKeys();
+        if (startKeys.length != reduceNum) {
+          throw new IOException("Region split after job initialization");
+        }
+        KeyValueWritableComparable[] startKeyWraps =
+            new KeyValueWritableComparable[startKeys.length - 1];
+        for (int i = 1; i < startKeys.length; ++i) {
+          startKeyWraps[i - 1] =
+              new KeyValueWritableComparable(KeyValueUtil.createFirstOnRow(startKeys[i]));
+        }
+        KeyValueWritableComparablePartitioner.START_KEYS = startKeyWraps;
+      }
+    }
+  }
+
+  /**
+   * A mapper that just writes out KeyValues.
+   */
+  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="EQ_COMPARETO_USE_OBJECT_EQUALS",
+      justification="Writables are going away and this has been this way forever")
+  public static class KeyValueImporter extends TableMapper<ImmutableBytesWritable, KeyValue> {
+    private Map<byte[], byte[]> cfRenameMap;
+    private Filter filter;
+    private static final Log LOG = LogFactory.getLog(KeyValueImporter.class);
+
+    /**
+     * @param row  The current table row key.
+     * @param value  The columns.
+     * @param context  The current context.
+     * @throws IOException When something is broken with the data.
+     */
+    @Override
+    public void map(ImmutableBytesWritable row, Result value,
+      Context context)
+    throws IOException {
+      try {
+        if (LOG.isTraceEnabled()) {
+          LOG.trace("Considering the row."
+              + Bytes.toString(row.get(), row.getOffset(), row.getLength()));
+        }
+        if (filter == null
+            || !filter.filterRowKey(CellUtil.createFirstOnRow(row.get(), row.getOffset(),
+                (short) row.getLength()))) {
+          for (Cell kv : value.rawCells()) {
+            kv = filterKv(filter, kv);
+            // skip if we filtered it out
+            if (kv == null) continue;
+            // TODO get rid of ensureKeyValue
+            context.write(row, KeyValueUtil.ensureKeyValue(convertKv(kv, cfRenameMap)));
+          }
+        }
+      } catch (InterruptedException e) {
+        e.printStackTrace();
+      }
+    }
+
+    @Override
+    public void setup(Context context) {
+      cfRenameMap = createCfRenameMap(context.getConfiguration());
+      filter = instantiateFilter(context.getConfiguration());
+    }
+  }
+
+  /**
+   * Write table content out to files in hdfs.
+   */
+  public static class Importer extends TableMapper<ImmutableBytesWritable, Mutation> {
+    private Map<byte[], byte[]> cfRenameMap;
+    private List<UUID> clusterIds;
+    private Filter filter;
+    private Durability durability;
+
+    /**
+     * @param row  The current table row key.
+     * @param value  The columns.
+     * @param context  The current context.
+     * @throws IOException When something is broken with the data.
+     */
+    @Override
+    public void map(ImmutableBytesWritable row, Result value,
+      Context context)
+    throws IOException {
+      try {
+        writeResult(row, value, context);
+      } catch (InterruptedException e) {
+        e.printStackTrace();
+      }
+    }
+
+    private void writeResult(ImmutableBytesWritable key, Result result, Context context)
+    throws IOException, InterruptedException {
+      Put put = null;
+      Delete delete = null;
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("Considering the row."
+            + Bytes.toString(key.get(), key.getOffset(), key.getLength()));
+      }
+      if (filter == null
+          || !filter.filterRowKey(CellUtil.createFirstOnRow(key.get(), key.getOffset(),
+              (short) key.getLength()))) {
+        processKV(key, result, context, put, delete);
+      }
+    }
+
+    protected void processKV(ImmutableBytesWritable key, Result result, Context context, Put put,
+        Delete delete) throws IOException, InterruptedException {
+      for (Cell kv : result.rawCells()) {
+        kv = filterKv(filter, kv);
+        // skip if we filter it out
+        if (kv == null) continue;
+
+        kv = convertKv(kv, cfRenameMap);
+        // Deletes and Puts are gathered and written when finished
+        /*
+         * If there are sequence of mutations and tombstones in an Export, and after Import the same
+         * sequence should be restored as it is. If we combine all Delete tombstones into single
+         * request then there is chance of ignoring few DeleteFamily tombstones, because if we
+         * submit multiple DeleteFamily tombstones in single Delete request then we are maintaining
+         * only newest in hbase table and ignoring other. Check - HBASE-12065
+         */
+        if (CellUtil.isDeleteFamily(kv)) {
+          Delete deleteFamily = new Delete(key.get());
+          deleteFamily.add(kv);
+          if (durability != null) {
+            deleteFamily.setDurability(durability);
+          }
+          deleteFamily.setClusterIds(clusterIds);
+          context.write(key, deleteFamily);
+        } else if (CellUtil.isDelete(kv)) {
+          if (delete == null) {
+            delete = new Delete(key.get());
+          }
+          delete.add(kv);
+        } else {
+          if (put == null) {
+            put = new Put(key.get());
+          }
+          addPutToKv(put, kv);
+        }
+      }
+      if (put != null) {
+        if (durability != null) {
+          put.setDurability(durability);
+        }
+        put.setClusterIds(clusterIds);
+        context.write(key, put);
+      }
+      if (delete != null) {
+        if (durability != null) {
+          delete.setDurability(durability);
+        }
+        delete.setClusterIds(clusterIds);
+        context.write(key, delete);
+      }
+    }
+
+    protected void addPutToKv(Put put, Cell kv) throws IOException {
+      put.add(kv);
+    }
+
+    @Override
+    public void setup(Context context) {
+      LOG.info("Setting up " + getClass() + " mapper.");
+      Configuration conf = context.getConfiguration();
+      cfRenameMap = createCfRenameMap(conf);
+      filter = instantiateFilter(conf);
+      String durabilityStr = conf.get(WAL_DURABILITY);
+      if(durabilityStr != null){
+        durability = Durability.valueOf(durabilityStr.toUpperCase(Locale.ROOT));
+        LOG.info("setting WAL durability to " + durability);
+      } else {
+        LOG.info("setting WAL durability to default.");
+      }
+      // TODO: This is kind of ugly doing setup of ZKW just to read the clusterid.
+      ZooKeeperWatcher zkw = null;
+      Exception ex = null;
+      try {
+        zkw = new ZooKeeperWatcher(conf, context.getTaskAttemptID().toString(), null);
+        clusterIds = Collections.singletonList(ZKClusterId.getUUIDForCluster(zkw));
+      } catch (ZooKeeperConnectionException e) {
+        ex = e;
+        LOG.error("Problem connecting to ZooKeper during task setup", e);
+      } catch (KeeperException e) {
+        ex = e;
+        LOG.error("Problem reading ZooKeeper data during task setup", e);
+      } catch (IOException e) {
+        ex = e;
+        LOG.error("Problem setting up task", e);
+      } finally {
+        if (zkw != null) zkw.close();
+      }
+      if (clusterIds == null) {
+        // exit early if setup fails
+        throw new RuntimeException(ex);
+      }
+    }
+  }
+
+  /**
+   * Create a {@link Filter} to apply to all incoming keys ({@link KeyValue KeyValues}) to
+   * optionally not include in the job output
+   * @param conf {@link Configuration} from which to load the filter
+   * @return the filter to use for the task, or <tt>null</tt> if no filter to should be used
+   * @throws IllegalArgumentException if the filter is misconfigured
+   */
+  public static Filter instantiateFilter(Configuration conf) {
+    // get the filter, if it was configured
+    Class<? extends Filter> filterClass = conf.getClass(FILTER_CLASS_CONF_KEY, null, Filter.class);
+    if (filterClass == null) {
+      LOG.debug("No configured filter class, accepting all keyvalues.");
+      return null;
+    }
+    LOG.debug("Attempting to create filter:" + filterClass);
+    String[] filterArgs = conf.getStrings(FILTER_ARGS_CONF_KEY);
+    ArrayList<byte[]> quotedArgs = toQuotedByteArrays(filterArgs);
+    try {
+      Method m = filterClass.getMethod("createFilterFromArguments", ArrayList.class);
+      return (Filter) m.invoke(null, quotedArgs);
+    } catch (IllegalAccessException e) {
+      LOG.error("Couldn't instantiate filter!", e);
+      throw new RuntimeException(e);
+    } catch (SecurityException e) {
+      LOG.error("Couldn't instantiate filter!", e);
+      throw new RuntimeException(e);
+    } catch (NoSuchMethodException e) {
+      LOG.error("Couldn't instantiate filter!", e);
+      throw new RuntimeException(e);
+    } catch (IllegalArgumentException e) {
+      LOG.error("Couldn't instantiate filter!", e);
+      throw new RuntimeException(e);
+    } catch (InvocationTargetException e) {
+      LOG.error("Couldn't instantiate filter!", e);
+      throw new RuntimeException(e);
+    }
+  }
+
+  private static ArrayList<byte[]> toQuotedByteArrays(String... stringArgs) {
+    ArrayList<byte[]> quotedArgs = new ArrayList<>();
+    for (String stringArg : stringArgs) {
+      // all the filters' instantiation methods expected quoted args since they are coming from
+      // the shell, so add them here, though it shouldn't really be needed :-/
+      quotedArgs.add(Bytes.toBytes("'" + stringArg + "'"));
+    }
+    return quotedArgs;
+  }
+
+  /**
+   * Attempt to filter out the keyvalue
+   * @param kv {@link KeyValue} on which to apply the filter
+   * @return <tt>null</tt> if the key should not be written, otherwise returns the original
+   *         {@link KeyValue}
+   */
+  public static Cell filterKv(Filter filter, Cell kv) throws IOException {
+    // apply the filter and skip this kv if the filter doesn't apply
+    if (filter != null) {
+      Filter.ReturnCode code = filter.filterKeyValue(kv);
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("Filter returned:" + code + " for the key value:" + kv);
+      }
+      // if its not an accept type, then skip this kv
+      if (!(code.equals(Filter.ReturnCode.INCLUDE) || code
+          .equals(Filter.ReturnCode.INCLUDE_AND_NEXT_COL))) {
+        return null;
+      }
+    }
+    return kv;
+  }
+
+  // helper: create a new KeyValue based on CF rename map
+  private static Cell convertKv(Cell kv, Map<byte[], byte[]> cfRenameMap) {
+    if(cfRenameMap != null) {
+      // If there's a rename mapping for this CF, create a new KeyValue
+      byte[] newCfName = cfRenameMap.get(CellUtil.cloneFamily(kv));
+      if(newCfName != null) {
+          kv = new KeyValue(kv.getRowArray(), // row buffer
+                  kv.getRowOffset(),        // row offset
+                  kv.getRowLength(),        // row length
+                  newCfName,                // CF buffer
+                  0,                        // CF offset
+                  newCfName.length,         // CF length
+                  kv.getQualifierArray(),   // qualifier buffer
+                  kv.getQualifierOffset(),  // qualifier offset
+                  kv.getQualifierLength(),  // qualifier length
+                  kv.getTimestamp(),        // timestamp
+                  KeyValue.Type.codeToType(kv.getTypeByte()), // KV Type
+                  kv.getValueArray(),       // value buffer
+                  kv.getValueOffset(),      // value offset
+                  kv.getValueLength());     // value length
+      }
+    }
+    return kv;
+  }
+
+  // helper: make a map from sourceCfName to destCfName by parsing a config key
+  private static Map<byte[], byte[]> createCfRenameMap(Configuration conf) {
+    Map<byte[], byte[]> cfRenameMap = null;
+    String allMappingsPropVal = conf.get(CF_RENAME_PROP);
+    if(allMappingsPropVal != null) {
+      // The conf value format should be sourceCf1:destCf1,sourceCf2:destCf2,...
+      String[] allMappings = allMappingsPropVal.split(",");
+      for (String mapping: allMappings) {
+        if(cfRenameMap == null) {
+            cfRenameMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
+        }
+        String [] srcAndDest = mapping.split(":");
+        if(srcAndDest.length != 2) {
+            continue;
+        }
+        cfRenameMap.put(srcAndDest[0].getBytes(), srcAndDest[1].getBytes());
+      }
+    }
+    return cfRenameMap;
+  }
+
+  /**
+   * <p>Sets a configuration property with key {@link #CF_RENAME_PROP} in conf that tells
+   * the mapper how to rename column families.
+   *
+   * <p>Alternately, instead of calling this function, you could set the configuration key
+   * {@link #CF_RENAME_PROP} yourself. The value should look like
+   * <pre>srcCf1:destCf1,srcCf2:destCf2,....</pre>. This would have the same effect on
+   * the mapper behavior.
+   *
+   * @param conf the Configuration in which the {@link #CF_RENAME_PROP} key will be
+   *  set
+   * @param renameMap a mapping from source CF names to destination CF names
+   */
+  static public void configureCfRenaming(Configuration conf,
+          Map<String, String> renameMap) {
+    StringBuilder sb = new StringBuilder();
+    for(Map.Entry<String,String> entry: renameMap.entrySet()) {
+      String sourceCf = entry.getKey();
+      String destCf = entry.getValue();
+
+      if(sourceCf.contains(":") || sourceCf.contains(",") ||
+              destCf.contains(":") || destCf.contains(",")) {
+        throw new IllegalArgumentException("Illegal character in CF names: "
+              + sourceCf + ", " + destCf);
+      }
+
+      if(sb.length() != 0) {
+        sb.append(",");
+      }
+      sb.append(sourceCf + ":" + destCf);
+    }
+    conf.set(CF_RENAME_PROP, sb.toString());
+  }
+
+  /**
+   * Add a Filter to be instantiated on import
+   * @param conf Configuration to update (will be passed to the job)
+   * @param clazz {@link Filter} subclass to instantiate on the server.
+   * @param filterArgs List of arguments to pass to the filter on instantiation
+   */
+  public static void addFilterAndArguments(Configuration conf, Class<? extends Filter> clazz,
+      List<String> filterArgs) throws IOException {
+    conf.set(Import.FILTER_CLASS_CONF_KEY, clazz.getName());
+    conf.setStrings(Import.FILTER_ARGS_CONF_KEY, filterArgs.toArray(new String[filterArgs.size()]));
+  }
+
+  /**
+   * Sets up the actual job.
+   * @param conf The current configuration.
+   * @param args The command line parameters.
+   * @return The newly created job.
+   * @throws IOException When setting up the job fails.
+   */
+  public static Job createSubmittableJob(Configuration conf, String[] args)
+  throws IOException {
+    TableName tableName = TableName.valueOf(args[0]);
+    conf.set(TABLE_NAME, tableName.getNameAsString());
+    Path inputDir = new Path(args[1]);
+    Job job = Job.getInstance(conf, conf.get(JOB_NAME_CONF_KEY, NAME + "_" + tableName));
+    job.setJarByClass(Importer.class);
+    FileInputFormat.setInputPaths(job, inputDir);
+    job.setInputFormatClass(SequenceFileInputFormat.class);
+    String hfileOutPath = conf.get(BULK_OUTPUT_CONF_KEY);
+
+    // make sure we get the filter in the jars
+    try {
+      Class<? extends Filter> filter = conf.getClass(FILTER_CLASS_CONF_KEY, null, Filter.class);
+      if (filter != null) {
+        TableMapReduceUtil.addDependencyJarsForClasses(conf, filter);
+      }
+    } catch (Exception e) {
+      throw new IOException(e);
+    }
+
+    if (hfileOutPath != null && conf.getBoolean(HAS_LARGE_RESULT, false)) {
+      LOG.info("Use Large Result!!");
+      try (Connection conn = ConnectionFactory.createConnection(conf);
+          Table table = conn.getTable(tableName);
+          RegionLocator regionLocator = conn.getRegionLocator(tableName)) {
+        HFileOutputFormat2.configureIncrementalLoad(job, table.getDescriptor(), regionLocator);
+        job.setMapperClass(KeyValueSortImporter.class);
+        job.setReducerClass(KeyValueReducer.class);
+        Path outputDir = new Path(hfileOutPath);
+        FileOutputFormat.setOutputPath(job, outputDir);
+        job.setMapOutputKeyClass(KeyValueWritableComparable.class);
+        job.setMapOutputValueClass(KeyValue.class);
+        job.getConfiguration().setClass("mapreduce.job.output.key.comparator.class",
+            KeyValueWritableComparable.KeyValueWritableComparator.class,
+            RawComparator.class);
+        Path partitionsPath =
+            new Path(TotalOrderPartitioner.getPartitionFile(job.getConfiguration()));
+        FileSystem fs = FileSystem.get(job.getConfiguration());
+        fs.deleteOnExit(partitionsPath);
+        job.setPartitionerClass(KeyValueWritableComparablePartitioner.class);
+        job.setNumReduceTasks(regionLocator.getStartKeys().length);
+        TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(),
+            org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions.class);
+      }
+    } else if (hfileOutPath != null) {
+      LOG.info("writing to hfiles for bulk load.");
+      job.setMapperClass(KeyValueImporter.class);
+      try (Connection conn = ConnectionFactory.createConnection(conf);
+          Table table = conn.getTable(tableName);
+          RegionLocator regionLocator = conn.getRegionLocator(tableName)){
+        job.setReducerClass(KeyValueSortReducer.class);
+        Path outputDir = new Path(hfileOutPath);
+        FileOutputFormat.setOutputPath(job, outputDir);
+        job.setMapOutputKeyClass(ImmutableBytesWritable.class);
+        job.setMapOutputValueClass(KeyValue.class);
+        HFileOutputFormat2.configureIncrementalLoad(job, table.getDescriptor(), regionLocator);
+        TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(),
+            org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions.class);
+      }
+    } else {
+      LOG.info("writing directly to table from Mapper.");
+      // No reducers.  Just write straight to table.  Call initTableReducerJob
+      // because it sets up the TableOutputFormat.
+      job.setMapperClass(Importer.class);
+      TableMapReduceUtil.initTableReducerJob(tableName.getNameAsString(), null, job);
+      job.setNumReduceTasks(0);
+    }
+    return job;
+  }
+
+  /*
+   * @param errorMsg Error message.  Can be null.
+   */
+  private static void usage(final String errorMsg) {
+    if (errorMsg != null && errorMsg.length() > 0) {
+      System.err.println("ERROR: " + errorMsg);
+    }
+    System.err.println("Usage: Import [options] <tablename> <inputdir>");
+    System.err.println("By default Import will load data directly into HBase. To instead generate");
+    System.err.println("HFiles of data to prepare for a bulk data load, pass the option:");
+    System.err.println("  -D" + BULK_OUTPUT_CONF_KEY + "=/path/for/output");
+    System.err.println("If there is a large result that includes too much KeyValue "
+        + "whitch can occur OOME caused by the memery sort in reducer, pass the option:");
+    System.err.println("  -D" + HAS_LARGE_RESULT + "=true");
+    System.err
+        .println(" To apply a generic org.apache.hadoop.hbase.filter.Filter to the input, use");
+    System.err.println("  -D" + FILTER_CLASS_CONF_KEY + "=<name of filter class>");
+    System.err.println("  -D" + FILTER_ARGS_CONF_KEY + "=<comma separated list of args for filter");
+    System.err.println(" NOTE: The filter will be applied BEFORE doing key renames via the "
+        + CF_RENAME_PROP + " property. Futher, filters will only use the"
+        + " Filter#filterRowKey(byte[] buffer, int offset, int length) method to identify "
+        + " whether the current row needs to be ignored completely for processing and "
+        + " Filter#filterKeyValue(KeyValue) method to determine if the KeyValue should be added;"
+        + " Filter.ReturnCode#INCLUDE and #INCLUDE_AND_NEXT_COL will be considered as including"
+        + " the KeyValue.");
+    System.err.println("To import data exported from HBase 0.94, use");
+    System.err.println("  -Dhbase.import.version=0.94");
+    System.err.println("  -D " + JOB_NAME_CONF_KEY
+        + "=jobName - use the specified mapreduce job name for the import");
+    System.err.println("For performance consider the following options:\n"
+        + "  -Dmapreduce.map.speculative=false\n"
+        + "  -Dmapreduce.reduce.speculative=false\n"
+        + "  -D" + WAL_DURABILITY + "=<Used while writing data to hbase."
+            +" Allowed values are the supported durability values"
+            +" like SKIP_WAL/ASYNC_WAL/SYNC_WAL/...>");
+  }
+
+  /**
+   * If the durability is set to {@link Durability#SKIP_WAL} and the data is imported to hbase, we
+   * need to flush all the regions of the table as the data is held in memory and is also not
+   * present in the Write Ahead Log to replay in scenarios of a crash. This method flushes all the
+   * regions of the table in the scenarios of import data to hbase with {@link Durability#SKIP_WAL}
+   */
+  public static void flushRegionsIfNecessary(Configuration conf) throws IOException,
+      InterruptedException {
+    String tableName = conf.get(TABLE_NAME);
+    Admin hAdmin = null;
+    Connection connection = null;
+    String durability = conf.get(WAL_DURABILITY);
+    // Need to flush if the data is written to hbase and skip wal is enabled.
+    if (conf.get(BULK_OUTPUT_CONF_KEY) == null && durability != null
+        && Durability.SKIP_WAL.name().equalsIgnoreCase(durability)) {
+      LOG.info("Flushing all data that skipped the WAL.");
+      try {
+        connection = ConnectionFactory.createConnection(conf);
+        hAdmin = connection.getAdmin();
+        hAdmin.flush(TableName.valueOf(tableName));
+      } finally {
+        if (hAdmin != null) {
+          hAdmin.close();
+        }
+        if (connection != null) {
+          connection.close();
+        }
+      }
+    }
+  }
+
+  @Override
+  public int run(String[] args) throws Exception {
+    if (args.length < 2) {
+      usage("Wrong number of arguments: " + args.length);
+      return -1;
+    }
+    String inputVersionString = System.getProperty(ResultSerialization.IMPORT_FORMAT_VER);
+    if (inputVersionString != null) {
+      getConf().set(ResultSerialization.IMPORT_FORMAT_VER, inputVersionString);
+    }
+    Job job = createSubmittableJob(getConf(), args);
+    boolean isJobSuccessful = job.waitForCompletion(true);
+    if(isJobSuccessful){
+      // Flush all the regions of the table
+      flushRegionsIfNecessary(getConf());
+    }
+    long inputRecords = job.getCounters().findCounter(TaskCounter.MAP_INPUT_RECORDS).getValue();
+    long outputRecords = job.getCounters().findCounter(TaskCounter.MAP_OUTPUT_RECORDS).getValue();
+    if (outputRecords < inputRecords) {
+      System.err.println("Warning, not all records were imported (maybe filtered out).");
+      if (outputRecords == 0) {
+        System.err.println("If the data was exported from HBase 0.94 "+
+            "consider using -Dhbase.import.version=0.94.");
+      }
+    }
+
+    return (isJobSuccessful ? 0 : 1);
+  }
+
+  /**
+   * Main entry point.
+   * @param args The command line parameters.
+   * @throws Exception When running the job fails.
+   */
+  public static void main(String[] args) throws Exception {
+    int errCode = ToolRunner.run(HBaseConfiguration.create(), new Import(), args);
+    System.exit(errCode);
+  }
+
+}