You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ap...@apache.org on 2015/06/13 03:24:38 UTC

[3/5] hbase git commit: HBASE-13639 SyncTable - rsync for HBase tables

HBASE-13639 SyncTable - rsync for HBase tables

Signed-off-by: Andrew Purtell <ap...@apache.org>
Amending-Author: Andrew Purtell <ap...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/ae121ee7
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/ae121ee7
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/ae121ee7

Branch: refs/heads/branch-1
Commit: ae121ee7e9825a138e1fff4b07164e20cdd5b27c
Parents: de83a78
Author: Dave Latham <da...@yahoo-inc.com>
Authored: Fri Jun 12 16:00:01 2015 -0700
Committer: Andrew Purtell <ap...@apache.org>
Committed: Fri Jun 12 16:00:01 2015 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/hbase/CellComparator.java |   4 +-
 .../java/org/apache/hadoop/hbase/CellUtil.java  |   4 +
 .../org/apache/hadoop/hbase/util/Bytes.java     |  54 +-
 .../org/apache/hadoop/hbase/util/TestBytes.java |  48 ++
 .../hadoop/hbase/mapreduce/HashTable.java       | 747 ++++++++++++++++++
 .../hadoop/hbase/mapreduce/SyncTable.java       | 773 +++++++++++++++++++
 .../hadoop/hbase/mapreduce/TestHashTable.java   | 192 +++++
 .../hadoop/hbase/mapreduce/TestSyncTable.java   | 334 ++++++++
 8 files changed, 2141 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/ae121ee7/hbase-common/src/main/java/org/apache/hadoop/hbase/CellComparator.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/CellComparator.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/CellComparator.java
index ca4cfb1..6536192 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/CellComparator.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/CellComparator.java
@@ -245,9 +245,7 @@ public class CellComparator implements Comparator<Cell>, Serializable {
    *         0 if both timestamps are equal
    */
   public static int compareTimestamps(final Cell left, final Cell right) {
-    long ltimestamp = left.getTimestamp();
-    long rtimestamp = right.getTimestamp();
-    return compareTimestamps(ltimestamp, rtimestamp);
+    return compareTimestamps(left.getTimestamp(), right.getTimestamp());
   }
 
   /********************* hashCode ************************/

http://git-wip-us.apache.org/repos/asf/hbase/blob/ae121ee7/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java
index b0eece8..591045f 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java
@@ -450,6 +450,10 @@ public final class CellUtil {
         buf.length);
   }
 
+  public static boolean matchingTimestamp(Cell a, Cell b) {
+    return CellComparator.compareTimestamps(a, b) == 0;
+  }
+
   /**
    * @return True if a delete type, a {@link KeyValue.Type#Delete} or a
    *         {KeyValue.Type#DeleteFamily} or a

http://git-wip-us.apache.org/repos/asf/hbase/blob/ae121ee7/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Bytes.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Bytes.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Bytes.java
index ff54ebe..004ed27 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Bytes.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Bytes.java
@@ -457,7 +457,7 @@ public class Bytes {
     if (off + len > b.length) len = b.length - off;
     for (int i = off; i < off + len ; ++i ) {
       int ch = b[i] & 0xFF;
-      if ( (ch >= '0' && ch <= '9')
+      if ((ch >= '0' && ch <= '9')
           || (ch >= 'A' && ch <= 'Z')
           || (ch >= 'a' && ch <= 'z')
           || " `~!@#$%^&*()-_=+[]{}|;:'\",.<>/?".indexOf(ch) >= 0 ) {
@@ -2339,14 +2339,47 @@ public class Bytes {
     }
     return result;
   }
-  
+
+  private static final char[] HEX_CHARS = {
+    '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f'
+  };
+
+  /**
+   * Convert a byte range into a hex string
+   */
+  public static String toHex(byte[] b, int offset, int length) {
+    checkArgument(length <= Integer.MAX_VALUE / 2);
+    int numChars = length * 2;
+    char[] ch = new char[numChars];
+    for (int i = 0; i < numChars; i += 2)
+    {
+      byte d = b[offset + i/2];
+      ch[i] = HEX_CHARS[(d >> 4) & 0x0F];
+      ch[i+1] = HEX_CHARS[d & 0x0F];
+    }
+    return new String(ch);
+  }
+
   /**
    * Convert a byte array into a hex string
-   * @param b
    */
   public static String toHex(byte[] b) {
-    checkArgument(b.length > 0, "length must be greater than 0");
-    return String.format("%x", new BigInteger(1, b));
+    return toHex(b, 0, b.length);
+  }
+
+  private static int hexCharToNibble(char ch) {
+    if (ch <= '9' && ch >= '0') {
+      return ch - '0';
+    } else if (ch >= 'a' && ch <= 'f') {
+      return ch - 'a' + 10;
+    } else if (ch >= 'A' && ch <= 'F') {
+      return ch - 'A' + 10;
+    }
+    throw new IllegalArgumentException("Invalid hex char: " + ch);
+  }
+
+  private static byte hexCharsToByte(char c1, char c2) {
+    return (byte) ((hexCharToNibble(c1) << 4) | hexCharToNibble(c2));
   }
 
   /**
@@ -2355,14 +2388,11 @@ public class Bytes {
    * @param hex
    */
   public static byte[] fromHex(String hex) {
-    checkArgument(hex.length() > 0, "length must be greater than 0");
     checkArgument(hex.length() % 2 == 0, "length must be a multiple of 2");
-    // Make sure letters are upper case
-    hex = hex.toUpperCase();
-    byte[] b = new byte[hex.length() / 2];
-    for (int i = 0; i < b.length; i++) {
-      b[i] = (byte)((toBinaryFromHex((byte)hex.charAt(2 * i)) << 4) +
-        toBinaryFromHex((byte)hex.charAt((2 * i + 1))));
+    int len = hex.length();
+    byte[] b = new byte[len / 2];
+    for (int i = 0; i < len; i += 2) {
+        b[i / 2] = hexCharsToByte(hex.charAt(i),hex.charAt(i+1));
     }
     return b;
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/ae121ee7/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestBytes.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestBytes.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestBytes.java
index 75baa03..3ec0afb 100644
--- a/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestBytes.java
+++ b/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestBytes.java
@@ -24,7 +24,9 @@ import java.io.DataOutputStream;
 import java.io.IOException;
 import java.math.BigDecimal;
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.List;
 import java.util.Random;
 
 import junit.framework.TestCase;
@@ -501,5 +503,51 @@ public class TestBytes extends TestCase {
       Assert.assertEquals(i, b[i]);
     }
   }
+  
+  public void testToFromHex() {
+    List<String> testStrings = new ArrayList<String>();
+    testStrings.addAll(Arrays.asList(new String[] {
+        "",
+        "00",
+        "A0",
+        "ff",
+        "FFffFFFFFFFFFF",
+        "12",
+        "0123456789abcdef",
+        "283462839463924623984692834692346ABCDFEDDCA0",
+      }));
+    for (String testString : testStrings)
+    {
+      byte[] byteData = Bytes.fromHex(testString);
+      Assert.assertEquals(testString.length() / 2, byteData.length);
+      String result = Bytes.toHex(byteData);
+      Assert.assertTrue(testString.equalsIgnoreCase(result));
+    }
+    
+    List<byte[]> testByteData = new ArrayList<byte[]>();
+    testByteData.addAll(Arrays.asList(new byte[][] {
+      new byte[0],
+      new byte[1],
+      new byte[10],
+      new byte[] {1, 2, 3, 4, 5},
+      new byte[] {(byte) 0xFF},
+    }));
+    Random r = new Random();
+    for (int i = 0; i < 20; i++)
+    {
+      
+      byte[] bytes = new byte[r.nextInt(100)];
+      r.nextBytes(bytes);
+      testByteData.add(bytes);
+    }
+    
+    for (byte[] testData : testByteData)
+    {
+      String hexString = Bytes.toHex(testData);
+      Assert.assertEquals(testData.length * 2, hexString.length());
+      byte[] result = Bytes.fromHex(hexString);
+      Assert.assertArrayEquals(testData, result);
+    }
+  }
 }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/ae121ee7/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HashTable.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HashTable.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HashTable.java
new file mode 100644
index 0000000..20ae4a6
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HashTable.java
@@ -0,0 +1,747 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.io.MapFile;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.MapFileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner;
+import org.apache.hadoop.util.GenericOptionsParser;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+import com.google.common.base.Charsets;
+import com.google.common.base.Throwables;
+import com.google.common.collect.Ordering;
+
+public class HashTable extends Configured implements Tool {
+
+  private static final Log LOG = LogFactory.getLog(HashTable.class);
+  
+  private static final int DEFAULT_BATCH_SIZE = 8000;
+  
+  private final static String HASH_BATCH_SIZE_CONF_KEY = "hash.batch.size";
+  final static String PARTITIONS_FILE_NAME = "partitions";
+  final static String MANIFEST_FILE_NAME = "manifest";
+  final static String HASH_DATA_DIR = "hashes";
+  final static String OUTPUT_DATA_FILE_PREFIX = "part-r-";
+  private final static String TMP_MANIFEST_FILE_NAME = "manifest.tmp";
+  
+  TableHash tableHash = new TableHash();
+  Path destPath;
+  
+  public HashTable(Configuration conf) {
+    super(conf);
+  }
+  
+  public static class TableHash {
+    
+    Path hashDir;
+    
+    String tableName;
+    String families = null;
+    long batchSize = DEFAULT_BATCH_SIZE;
+    int numHashFiles = 0;
+    byte[] startRow = HConstants.EMPTY_START_ROW;
+    byte[] stopRow = HConstants.EMPTY_END_ROW;
+    int scanBatch = 0;
+    int versions = -1;
+    long startTime = 0;
+    long endTime = 0;
+    
+    List<ImmutableBytesWritable> partitions;
+    
+    public static TableHash read(Configuration conf, Path hashDir) throws IOException {
+      TableHash tableHash = new TableHash();
+      FileSystem fs = hashDir.getFileSystem(conf);
+      tableHash.hashDir = hashDir;
+      tableHash.readPropertiesFile(fs, new Path(hashDir, MANIFEST_FILE_NAME));
+      tableHash.readPartitionFile(fs, conf, new Path(hashDir, PARTITIONS_FILE_NAME));
+      return tableHash;
+    }
+    
+    void writePropertiesFile(FileSystem fs, Path path) throws IOException {
+      Properties p = new Properties();
+      p.setProperty("table", tableName);
+      if (families != null) {
+        p.setProperty("columnFamilies", families);
+      }
+      p.setProperty("targetBatchSize", Long.toString(batchSize));
+      p.setProperty("numHashFiles", Integer.toString(numHashFiles));
+      if (!isTableStartRow(startRow)) {
+        p.setProperty("startRowHex", Bytes.toHex(startRow));
+      }
+      if (!isTableEndRow(stopRow)) {
+        p.setProperty("stopRowHex", Bytes.toHex(stopRow));
+      }
+      if (scanBatch > 0) {
+        p.setProperty("scanBatch", Integer.toString(scanBatch));
+      }
+      if (versions >= 0) {
+        p.setProperty("versions", Integer.toString(versions));
+      }
+      if (startTime != 0) {
+        p.setProperty("startTimestamp", Long.toString(startTime));
+      }
+      if (endTime != 0) {
+        p.setProperty("endTimestamp", Long.toString(endTime));
+      }
+      
+      FSDataOutputStream out = fs.create(path);
+      p.store(new OutputStreamWriter(out, Charsets.UTF_8), null);
+      out.close();
+    }
+    
+    void readPropertiesFile(FileSystem fs, Path path) throws IOException {
+      FSDataInputStream in = fs.open(path);
+      Properties p = new Properties();
+      p.load(new InputStreamReader(in, Charsets.UTF_8));
+      in.close();
+      
+      tableName = p.getProperty("table");
+      families = p.getProperty("columnFamilies");
+      batchSize = Long.parseLong(p.getProperty("targetBatchSize"));
+      numHashFiles = Integer.parseInt(p.getProperty("numHashFiles"));
+      
+      String startRowHex = p.getProperty("startRowHex");
+      if (startRowHex != null) {
+        startRow = Bytes.fromHex(startRowHex);
+      }
+      String stopRowHex = p.getProperty("stopRowHex");
+      if (stopRowHex != null) {
+        stopRow = Bytes.fromHex(stopRowHex);
+      }
+      
+      String scanBatchString = p.getProperty("scanBatch");
+      if (scanBatchString != null) {
+        scanBatch = Integer.parseInt(scanBatchString);
+      }
+      
+      String versionString = p.getProperty("versions");
+      if (versionString != null) {
+        versions = Integer.parseInt(versionString);
+      }
+      
+      String startTimeString = p.getProperty("startTimestamp");
+      if (startTimeString != null) {
+        startTime = Long.parseLong(startTimeString);
+      }
+      
+      String endTimeString = p.getProperty("endTimestamp");
+      if (endTimeString != null) {
+        endTime = Long.parseLong(endTimeString);
+      }
+    }
+    
+    Scan initScan() throws IOException {
+      Scan scan = new Scan();
+      scan.setCacheBlocks(false);
+      if (startTime != 0 || endTime != 0) {
+        scan.setTimeRange(startTime, endTime == 0 ? HConstants.LATEST_TIMESTAMP : endTime);
+      }
+      if (scanBatch > 0) {
+        scan.setBatch(scanBatch);
+      }
+      if (versions >= 0) {
+        scan.setMaxVersions(versions);
+      }
+      if (!isTableStartRow(startRow)) {
+        scan.setStartRow(startRow);
+      }
+      if (!isTableEndRow(stopRow)) {
+        scan.setStopRow(stopRow);
+      }
+      if(families != null) {
+        for(String fam : families.split(",")) {
+          scan.addFamily(Bytes.toBytes(fam));
+        }
+      }
+      return scan;
+    }
+    
+    /**
+     * Choose partitions between row ranges to hash to a single output file
+     * Selects region boundaries that fall within the scan range, and groups them
+     * into the desired number of partitions.
+     */
+    void selectPartitions(Pair<byte[][], byte[][]> regionStartEndKeys) {
+      List<byte[]> startKeys = new ArrayList<byte[]>();
+      for (int i = 0; i < regionStartEndKeys.getFirst().length; i++) {
+        byte[] regionStartKey = regionStartEndKeys.getFirst()[i];
+        byte[] regionEndKey = regionStartEndKeys.getSecond()[i];
+        
+        // if scan begins after this region, or starts before this region, then drop this region
+        // in other words:
+        //   IF (scan begins before the end of this region
+        //      AND scan ends before the start of this region)
+        //   THEN include this region
+        if ((isTableStartRow(startRow) || isTableEndRow(regionEndKey)
+            || Bytes.compareTo(startRow, regionEndKey) < 0)
+          && (isTableEndRow(stopRow) || isTableStartRow(regionStartKey)
+            || Bytes.compareTo(stopRow, regionStartKey) > 0)) {
+          startKeys.add(regionStartKey);
+        }
+      }
+      
+      int numRegions = startKeys.size();
+      if (numHashFiles == 0) {
+        numHashFiles = numRegions / 100;
+      }
+      if (numHashFiles == 0) {
+        numHashFiles = 1;
+      }
+      if (numHashFiles > numRegions) {
+        // can't partition within regions
+        numHashFiles = numRegions;
+      }
+      
+      // choose a subset of start keys to group regions into ranges
+      partitions = new ArrayList<ImmutableBytesWritable>(numHashFiles - 1);
+      // skip the first start key as it is not a partition between ranges.
+      for (long i = 1; i < numHashFiles; i++) {
+        int splitIndex = (int) (numRegions * i / numHashFiles);
+        partitions.add(new ImmutableBytesWritable(startKeys.get(splitIndex)));
+      }
+    }
+    
+    void writePartitionFile(Configuration conf, Path path) throws IOException {
+      FileSystem fs = path.getFileSystem(conf);
+      @SuppressWarnings("deprecation")
+      SequenceFile.Writer writer = SequenceFile.createWriter(
+        fs, conf, path, ImmutableBytesWritable.class, NullWritable.class);
+      
+      for (int i = 0; i < partitions.size(); i++) {
+        writer.append(partitions.get(i), NullWritable.get());
+      }
+      writer.close();
+    }
+    
+    private void readPartitionFile(FileSystem fs, Configuration conf, Path path)
+         throws IOException {
+      @SuppressWarnings("deprecation")
+      SequenceFile.Reader reader = new SequenceFile.Reader(fs, path, conf);
+      ImmutableBytesWritable key = new ImmutableBytesWritable();
+      partitions = new ArrayList<ImmutableBytesWritable>();
+      while (reader.next(key)) {
+        partitions.add(new ImmutableBytesWritable(key.copyBytes()));
+      }
+      reader.close();
+      
+      if (!Ordering.natural().isOrdered(partitions)) {
+        throw new IOException("Partitions are not ordered!");
+      }
+    }
+
+    @Override
+    public String toString() {
+      StringBuilder sb = new StringBuilder();
+      sb.append("tableName=").append(tableName);
+      if (families != null) {
+        sb.append(", families=").append(families);
+      }
+      sb.append(", batchSize=").append(batchSize);
+      sb.append(", numHashFiles=").append(numHashFiles);
+      if (!isTableStartRow(startRow)) {
+        sb.append(", startRowHex=").append(Bytes.toHex(startRow));
+      }
+      if (!isTableEndRow(stopRow)) {
+        sb.append(", stopRowHex=").append(Bytes.toHex(stopRow));
+      }
+      if (scanBatch >= 0) {
+        sb.append(", scanBatch=").append(scanBatch);
+      }
+      if (versions >= 0) {
+        sb.append(", versions=").append(versions);
+      }
+      if (startTime != 0) {
+        sb.append("startTime=").append(startTime);
+      }
+      if (endTime != 0) {
+        sb.append("endTime=").append(endTime);
+      }
+      return sb.toString();
+    }
+    
+    static String getDataFileName(int hashFileIndex) {
+      return String.format(HashTable.OUTPUT_DATA_FILE_PREFIX + "%05d", hashFileIndex);
+    }
+    
+    /**
+     * Open a TableHash.Reader starting at the first hash at or after the given key.
+     * @throws IOException 
+     */
+    public Reader newReader(Configuration conf, ImmutableBytesWritable startKey)
+        throws IOException {
+      return new Reader(conf, startKey);
+    }
+    
+    public class Reader implements java.io.Closeable {
+      private final Configuration conf;
+      
+      private int hashFileIndex;
+      private MapFile.Reader mapFileReader;
+      
+      private boolean cachedNext;
+      private ImmutableBytesWritable key;
+      private ImmutableBytesWritable hash;
+      
+      Reader(Configuration conf, ImmutableBytesWritable startKey) throws IOException {
+        this.conf = conf;
+        int partitionIndex = Collections.binarySearch(partitions, startKey);
+        if (partitionIndex >= 0) {
+          // if the key is equal to a partition, then go the file after that partition
+          hashFileIndex = partitionIndex+1;
+        } else {
+          // if the key is between partitions, then go to the file between those partitions
+          hashFileIndex = -1-partitionIndex;
+        }
+        openHashFile();
+        
+        // MapFile's don't make it easy to seek() so that the subsequent next() returns
+        // the desired key/value pair.  So we cache it for the first call of next().
+        hash = new ImmutableBytesWritable();
+        key = (ImmutableBytesWritable) mapFileReader.getClosest(startKey, hash);
+        if (key == null) {
+          cachedNext = false;
+          hash = null;
+        } else {
+          cachedNext = true;
+        }
+      }
+      
+      /**
+       * Read the next key/hash pair.
+       * Returns true if such a pair exists and false when at the end of the data.
+       */
+      public boolean next() throws IOException {
+        if (cachedNext) {
+          cachedNext = false;
+          return true;
+        }
+        key = new ImmutableBytesWritable();
+        hash = new ImmutableBytesWritable();
+        while (true) {
+          boolean hasNext = mapFileReader.next(key, hash);
+          if (hasNext) {
+            return true;
+          }
+          hashFileIndex++;
+          if (hashFileIndex < TableHash.this.numHashFiles) {
+            mapFileReader.close();
+            openHashFile();
+          } else {
+            key = null;
+            hash = null;
+            return false;
+          }
+        }
+      }
+      
+      /**
+       * Get the current key
+       * @return the current key or null if there is no current key
+       */
+      public ImmutableBytesWritable getCurrentKey() {
+        return key;
+      }
+      
+      /**
+       * Get the current hash
+       * @return the current hash or null if there is no current hash
+       */
+      public ImmutableBytesWritable getCurrentHash() {
+        return hash;
+      }
+      
+      private void openHashFile() throws IOException {
+        if (mapFileReader != null) {
+          mapFileReader.close();
+        }
+        Path dataDir = new Path(TableHash.this.hashDir, HASH_DATA_DIR);
+        Path dataFile = new Path(dataDir, getDataFileName(hashFileIndex));
+        mapFileReader = new MapFile.Reader(dataFile, conf);
+      }
+
+      @Override
+      public void close() throws IOException {
+        mapFileReader.close();
+      }
+    }
+  }
+  
+  static boolean isTableStartRow(byte[] row) {
+    return Bytes.equals(HConstants.EMPTY_START_ROW, row);
+  }
+  
+  static boolean isTableEndRow(byte[] row) {
+    return Bytes.equals(HConstants.EMPTY_END_ROW, row);
+  }
+  
+  public Job createSubmittableJob(String[] args) throws IOException {
+    Path partitionsPath = new Path(destPath, PARTITIONS_FILE_NAME);
+    generatePartitions(partitionsPath);
+    
+    Job job = Job.getInstance(getConf(),
+          getConf().get("mapreduce.job.name", "hashTable_" + tableHash.tableName));
+    Configuration jobConf = job.getConfiguration();
+    jobConf.setLong(HASH_BATCH_SIZE_CONF_KEY, tableHash.batchSize);
+    job.setJarByClass(HashTable.class);
+
+    TableMapReduceUtil.initTableMapperJob(tableHash.tableName, tableHash.initScan(),
+        HashMapper.class, ImmutableBytesWritable.class, ImmutableBytesWritable.class, job);
+    
+    // use a TotalOrderPartitioner and reducers to group region output into hash files
+    job.setPartitionerClass(TotalOrderPartitioner.class);
+    TotalOrderPartitioner.setPartitionFile(jobConf, partitionsPath);
+    job.setReducerClass(Reducer.class);  // identity reducer
+    job.setNumReduceTasks(tableHash.numHashFiles);
+    job.setOutputKeyClass(ImmutableBytesWritable.class);
+    job.setOutputValueClass(ImmutableBytesWritable.class);
+    job.setOutputFormatClass(MapFileOutputFormat.class);
+    FileOutputFormat.setOutputPath(job, new Path(destPath, HASH_DATA_DIR));
+    
+    return job;
+  }
+  
+  private void generatePartitions(Path partitionsPath) throws IOException {
+    Connection connection = ConnectionFactory.createConnection(getConf());
+    Pair<byte[][], byte[][]> regionKeys
+      = connection.getRegionLocator(TableName.valueOf(tableHash.tableName)).getStartEndKeys();
+    connection.close();
+    
+    tableHash.selectPartitions(regionKeys);
+    LOG.info("Writing " + tableHash.partitions.size() + " partition keys to " + partitionsPath);
+    
+    tableHash.writePartitionFile(getConf(), partitionsPath);
+  }
+  
+  static class ResultHasher {
+    private MessageDigest digest;
+    
+    private boolean batchStarted = false;
+    private ImmutableBytesWritable batchStartKey;
+    private ImmutableBytesWritable batchHash;
+    private long batchSize = 0;
+    
+    
+    public ResultHasher() {
+      try {
+        digest = MessageDigest.getInstance("MD5");
+      } catch (NoSuchAlgorithmException e) {
+        Throwables.propagate(e);
+      }
+    }
+    
+    public void startBatch(ImmutableBytesWritable row) {
+      if (batchStarted) {
+        throw new RuntimeException("Cannot start new batch without finishing existing one.");
+      }
+      batchStarted = true;
+      batchSize = 0;
+      batchStartKey = row;
+      batchHash = null;
+    }
+    
+    public void hashResult(Result result) {
+      if (!batchStarted) {
+        throw new RuntimeException("Cannot add to batch that has not been started.");
+      }
+      for (Cell cell : result.rawCells()) {
+        int rowLength = cell.getRowLength();
+        int familyLength = cell.getFamilyLength();
+        int qualifierLength = cell.getQualifierLength();
+        int valueLength = cell.getValueLength();
+        digest.update(cell.getRowArray(), cell.getRowOffset(), rowLength);
+        digest.update(cell.getFamilyArray(), cell.getFamilyOffset(), familyLength);
+        digest.update(cell.getQualifierArray(), cell.getQualifierOffset(), qualifierLength);
+        long ts = cell.getTimestamp();
+        for (int i = 8; i > 0; i--) {
+          digest.update((byte) ts);
+          ts >>>= 8;
+        }
+        digest.update(cell.getValueArray(), cell.getValueOffset(), valueLength);
+        
+        batchSize += rowLength + familyLength + qualifierLength + 8 + valueLength;
+      }
+    }
+    
+    public void finishBatch() {
+      if (!batchStarted) {
+        throw new RuntimeException("Cannot finish batch that has not started.");
+      }
+      batchStarted = false;
+      batchHash = new ImmutableBytesWritable(digest.digest());
+    }
+
+    public boolean isBatchStarted() {
+      return batchStarted;
+    }
+
+    public ImmutableBytesWritable getBatchStartKey() {
+      return batchStartKey;
+    }
+
+    public ImmutableBytesWritable getBatchHash() {
+      return batchHash;
+    }
+
+    public long getBatchSize() {
+      return batchSize;
+    }
+  }
+  
+  public static class HashMapper
+    extends TableMapper<ImmutableBytesWritable, ImmutableBytesWritable> {
+    
+    private ResultHasher hasher;
+    private long targetBatchSize;
+    
+    private ImmutableBytesWritable currentRow;
+    
+    @Override
+    protected void setup(Context context) throws IOException, InterruptedException {
+      targetBatchSize = context.getConfiguration()
+          .getLong(HASH_BATCH_SIZE_CONF_KEY, DEFAULT_BATCH_SIZE);
+      hasher = new ResultHasher();
+      
+      TableSplit split = (TableSplit) context.getInputSplit();
+      hasher.startBatch(new ImmutableBytesWritable(split.getStartRow()));
+    }
+    
+    @Override
+    protected void map(ImmutableBytesWritable key, Result value, Context context)
+        throws IOException, InterruptedException {
+      
+      if (currentRow == null || !currentRow.equals(key)) {
+        currentRow = new ImmutableBytesWritable(key); // not immutable
+        
+        if (hasher.getBatchSize() >= targetBatchSize) {
+          hasher.finishBatch();
+          context.write(hasher.getBatchStartKey(), hasher.getBatchHash());
+          hasher.startBatch(currentRow);
+        }
+      }
+      
+      hasher.hashResult(value);
+    }
+
+    @Override
+    protected void cleanup(Context context) throws IOException, InterruptedException {
+      hasher.finishBatch();
+      context.write(hasher.getBatchStartKey(), hasher.getBatchHash());
+    }
+  }
+  
+  private void writeTempManifestFile() throws IOException {
+    Path tempManifestPath = new Path(destPath, TMP_MANIFEST_FILE_NAME);
+    FileSystem fs = tempManifestPath.getFileSystem(getConf());
+    tableHash.writePropertiesFile(fs, tempManifestPath);
+  }
+  
+  private void completeManifest() throws IOException {
+    Path tempManifestPath = new Path(destPath, TMP_MANIFEST_FILE_NAME);
+    Path manifestPath = new Path(destPath, MANIFEST_FILE_NAME);
+    FileSystem fs = tempManifestPath.getFileSystem(getConf());
+    fs.rename(tempManifestPath, manifestPath);
+  }
+  
+  private static final int NUM_ARGS = 2;
+  private static void printUsage(final String errorMsg) {
+    if (errorMsg != null && errorMsg.length() > 0) {
+      System.err.println("ERROR: " + errorMsg);
+      System.err.println();
+    }
+    System.err.println("Usage: HashTable [options] <tablename> <outputpath>");
+    System.err.println();
+    System.err.println("Options:");
+    System.err.println(" batchsize     the target amount of bytes to hash in each batch");
+    System.err.println("               rows are added to the batch until this size is reached");
+    System.err.println("               (defaults to " + DEFAULT_BATCH_SIZE + " bytes)");
+    System.err.println(" numhashfiles  the number of hash files to create");
+    System.err.println("               if set to fewer than number of regions then");
+    System.err.println("               the job will create this number of reducers");
+    System.err.println("               (defaults to 1/100 of regions -- at least 1)");
+    System.err.println(" startrow      the start row");
+    System.err.println(" stoprow       the stop row");
+    System.err.println(" starttime     beginning of the time range (unixtime in millis)");
+    System.err.println("               without endtime means from starttime to forever");
+    System.err.println(" endtime       end of the time range.  Ignored if no starttime specified.");
+    System.err.println(" scanbatch     scanner batch size to support intra row scans");
+    System.err.println(" versions      number of cell versions to include");
+    System.err.println(" families      comma-separated list of families to include");
+    System.err.println();
+    System.err.println("Args:");
+    System.err.println(" tablename     Name of the table to hash");
+    System.err.println(" outputpath    Filesystem path to put the output data");
+    System.err.println();
+    System.err.println("Examples:");
+    System.err.println(" To hash 'TestTable' in 32kB batches for a 1 hour window into 50 files:");
+    System.err.println(" $ bin/hbase " +
+        "org.apache.hadoop.hbase.mapreduce.HashTable --batchsize=32000 --numhashfiles=50"
+        + " --starttime=1265875194289 --endtime=1265878794289 --families=cf2,cf3"
+        + " TestTable /hashes/testTable");
+  }
+
+  private boolean doCommandLine(final String[] args) {
+    if (args.length < NUM_ARGS) {
+      printUsage(null);
+      return false;
+    }
+    try {
+      
+      tableHash.tableName = args[args.length-2];
+      destPath = new Path(args[args.length-1]);
+      
+      for (int i = 0; i < args.length - NUM_ARGS; i++) {
+        String cmd = args[i];
+        if (cmd.equals("-h") || cmd.startsWith("--h")) {
+          printUsage(null);
+          return false;
+        }
+        
+        final String batchSizeArgKey = "--batchsize=";
+        if (cmd.startsWith(batchSizeArgKey)) {
+          tableHash.batchSize = Long.parseLong(cmd.substring(batchSizeArgKey.length()));
+          continue;
+        }
+        
+        final String numHashFilesArgKey = "--numhashfiles=";
+        if (cmd.startsWith(numHashFilesArgKey)) {
+          tableHash.numHashFiles = Integer.parseInt(cmd.substring(numHashFilesArgKey.length()));
+          continue;
+        }
+         
+        final String startRowArgKey = "--startrow=";
+        if (cmd.startsWith(startRowArgKey)) {
+          tableHash.startRow = Bytes.fromHex(cmd.substring(startRowArgKey.length()));
+          continue;
+        }
+        
+        final String stopRowArgKey = "--stoprow=";
+        if (cmd.startsWith(stopRowArgKey)) {
+          tableHash.stopRow = Bytes.fromHex(cmd.substring(stopRowArgKey.length()));
+          continue;
+        }
+        
+        final String startTimeArgKey = "--starttime=";
+        if (cmd.startsWith(startTimeArgKey)) {
+          tableHash.startTime = Long.parseLong(cmd.substring(startTimeArgKey.length()));
+          continue;
+        }
+
+        final String endTimeArgKey = "--endtime=";
+        if (cmd.startsWith(endTimeArgKey)) {
+          tableHash.endTime = Long.parseLong(cmd.substring(endTimeArgKey.length()));
+          continue;
+        }
+
+        final String scanBatchArgKey = "--scanbatch=";
+        if (cmd.startsWith(scanBatchArgKey)) {
+          tableHash.scanBatch = Integer.parseInt(cmd.substring(scanBatchArgKey.length()));
+          continue;
+        }
+
+        final String versionsArgKey = "--versions=";
+        if (cmd.startsWith(versionsArgKey)) {
+          tableHash.versions = Integer.parseInt(cmd.substring(versionsArgKey.length()));
+          continue;
+        }
+
+        final String familiesArgKey = "--families=";
+        if (cmd.startsWith(familiesArgKey)) {
+          tableHash.families = cmd.substring(familiesArgKey.length());
+          continue;
+        }
+
+        printUsage("Invalid argument '" + cmd + "'");
+        return false;
+      }
+      if ((tableHash.startTime != 0 || tableHash.endTime != 0)
+          && (tableHash.startTime >= tableHash.endTime)) {
+        printUsage("Invalid time range filter: starttime="
+            + tableHash.startTime + " >=  endtime=" + tableHash.endTime);
+        return false;
+      }
+      
+    } catch (Exception e) {
+      e.printStackTrace();
+      printUsage("Can't start because " + e.getMessage());
+      return false;
+    }
+    return true;
+  }
+
+  /**
+   * Main entry point.
+   */
+  public static void main(String[] args) throws Exception {
+    int ret = ToolRunner.run(new HashTable(HBaseConfiguration.create()), args);
+    System.exit(ret);
+  }
+
+  @Override
+  public int run(String[] args) throws Exception {
+    String[] otherArgs = new GenericOptionsParser(getConf(), args).getRemainingArgs();
+    if (!doCommandLine(otherArgs)) {
+      return 1;
+    }
+
+    Job job = createSubmittableJob(otherArgs);
+    writeTempManifestFile();
+    if (!job.waitForCompletion(true)) {
+      LOG.info("Map-reduce job failed!");
+      return 1;
+    }
+    completeManifest();
+    return 0;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/ae121ee7/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/SyncTable.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/SyncTable.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/SyncTable.java
new file mode 100644
index 0000000..3495ca9
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/SyncTable.java
@@ -0,0 +1,773 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellComparator;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
+import org.apache.hadoop.util.GenericOptionsParser;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+import com.google.common.base.Throwables;
+import com.google.common.collect.Iterators;
+
+public class SyncTable extends Configured implements Tool {
+
+  private static final Log LOG = LogFactory.getLog(SyncTable.class);
+
+  static final String SOURCE_HASH_DIR_CONF_KEY = "sync.table.source.hash.dir";
+  static final String SOURCE_TABLE_CONF_KEY = "sync.table.source.table.name";
+  static final String TARGET_TABLE_CONF_KEY = "sync.table.target.table.name";
+  static final String SOURCE_ZK_CLUSTER_CONF_KEY = "sync.table.source.zk.cluster";
+  static final String TARGET_ZK_CLUSTER_CONF_KEY = "sync.table.target.zk.cluster";
+  static final String DRY_RUN_CONF_KEY="sync.table.dry.run";
+  
+  Path sourceHashDir;
+  String sourceTableName;
+  String targetTableName;
+  
+  String sourceZkCluster;
+  String targetZkCluster;
+  boolean dryRun;
+  
+  Counters counters;
+  
+  public SyncTable(Configuration conf) {
+    super(conf);
+  }
+  
+  public Job createSubmittableJob(String[] args) throws IOException {
+    FileSystem fs = sourceHashDir.getFileSystem(getConf());
+    if (!fs.exists(sourceHashDir)) {
+      throw new IOException("Source hash dir not found: " + sourceHashDir);
+    }
+    
+    HashTable.TableHash tableHash = HashTable.TableHash.read(getConf(), sourceHashDir);
+    LOG.info("Read source hash manifest: " + tableHash);
+    LOG.info("Read " + tableHash.partitions.size() + " partition keys");
+    if (!tableHash.tableName.equals(sourceTableName)) {
+      LOG.warn("Table name mismatch - manifest indicates hash was taken from: "
+          + tableHash.tableName + " but job is reading from: " + sourceTableName);
+    }
+    if (tableHash.numHashFiles != tableHash.partitions.size() + 1) {
+      throw new RuntimeException("Hash data appears corrupt. The number of of hash files created"
+          + " should be 1 more than the number of partition keys.  However, the manifest file "
+          + " says numHashFiles=" + tableHash.numHashFiles + " but the number of partition keys"
+          + " found in the partitions file is " + tableHash.partitions.size());
+    }
+    
+    Path dataDir = new Path(sourceHashDir, HashTable.HASH_DATA_DIR);
+    int dataSubdirCount = 0;
+    for (FileStatus file : fs.listStatus(dataDir)) {
+      if (file.getPath().getName().startsWith(HashTable.OUTPUT_DATA_FILE_PREFIX)) {
+        dataSubdirCount++;
+      }
+    }
+    
+    if (dataSubdirCount != tableHash.numHashFiles) {
+      throw new RuntimeException("Hash data appears corrupt. The number of of hash files created"
+          + " should be 1 more than the number of partition keys.  However, the number of data dirs"
+          + " found is " + dataSubdirCount + " but the number of partition keys"
+          + " found in the partitions file is " + tableHash.partitions.size());
+    }
+    
+    Job job = Job.getInstance(getConf(),getConf().get("mapreduce.job.name",
+        "syncTable_" + sourceTableName + "-" + targetTableName));
+    Configuration jobConf = job.getConfiguration();
+    job.setJarByClass(HashTable.class);
+    jobConf.set(SOURCE_HASH_DIR_CONF_KEY, sourceHashDir.toString());
+    jobConf.set(SOURCE_TABLE_CONF_KEY, sourceTableName);
+    jobConf.set(TARGET_TABLE_CONF_KEY, targetTableName);
+    if (sourceZkCluster != null) {
+      jobConf.set(SOURCE_ZK_CLUSTER_CONF_KEY, sourceZkCluster);
+    }
+    if (targetZkCluster != null) {
+      jobConf.set(TARGET_ZK_CLUSTER_CONF_KEY, targetZkCluster);
+    }
+    jobConf.setBoolean(DRY_RUN_CONF_KEY, dryRun);
+    
+    TableMapReduceUtil.initTableMapperJob(targetTableName, tableHash.initScan(),
+        SyncMapper.class, null, null, job);
+    
+    job.setNumReduceTasks(0);
+     
+    if (dryRun) {
+      job.setOutputFormatClass(NullOutputFormat.class);
+    } else {
+      // No reducers.  Just write straight to table.  Call initTableReducerJob
+      // because it sets up the TableOutputFormat.
+      TableMapReduceUtil.initTableReducerJob(targetTableName, null, job, null,
+          targetZkCluster, null, null);
+      
+      // would be nice to add an option for bulk load instead
+    }
+    
+    return job;
+  }
+  
+  public static class SyncMapper extends TableMapper<ImmutableBytesWritable, Mutation> {
+    Path sourceHashDir;
+    
+    Connection sourceConnection;
+    Connection targetConnection;
+    Table sourceTable;
+    Table targetTable;
+    boolean dryRun;
+    
+    HashTable.TableHash sourceTableHash;
+    HashTable.TableHash.Reader sourceHashReader;
+    ImmutableBytesWritable currentSourceHash;
+    ImmutableBytesWritable nextSourceKey;
+    HashTable.ResultHasher targetHasher;
+    
+    Throwable mapperException;
+     
+    public static enum Counter {BATCHES, HASHES_MATCHED, HASHES_NOT_MATCHED, SOURCEMISSINGROWS,
+      SOURCEMISSINGCELLS, TARGETMISSINGROWS, TARGETMISSINGCELLS, ROWSWITHDIFFS, DIFFERENTCELLVALUES,
+      MATCHINGROWS, MATCHINGCELLS, EMPTY_BATCHES, RANGESMATCHED, RANGESNOTMATCHED};
+    
+    @Override
+    protected void setup(Context context) throws IOException {
+      
+      Configuration conf = context.getConfiguration();
+      sourceHashDir = new Path(conf.get(SOURCE_HASH_DIR_CONF_KEY));
+      sourceConnection = openConnection(conf, SOURCE_ZK_CLUSTER_CONF_KEY);
+      targetConnection = openConnection(conf, TARGET_ZK_CLUSTER_CONF_KEY);
+      sourceTable = openTable(sourceConnection, conf, SOURCE_TABLE_CONF_KEY);
+      targetTable = openTable(targetConnection, conf, TARGET_TABLE_CONF_KEY);
+      dryRun = conf.getBoolean(SOURCE_TABLE_CONF_KEY, false);
+      
+      sourceTableHash = HashTable.TableHash.read(conf, sourceHashDir);
+      LOG.info("Read source hash manifest: " + sourceTableHash);
+      LOG.info("Read " + sourceTableHash.partitions.size() + " partition keys");
+      
+      TableSplit split = (TableSplit) context.getInputSplit();
+      ImmutableBytesWritable splitStartKey = new ImmutableBytesWritable(split.getStartRow());
+      
+      sourceHashReader = sourceTableHash.newReader(conf, splitStartKey);
+      findNextKeyHashPair();
+      
+      // create a hasher, but don't start it right away
+      // instead, find the first hash batch at or after the start row
+      // and skip any rows that come before.  they will be caught by the previous task
+      targetHasher = new HashTable.ResultHasher();
+    }
+  
+    private static Connection openConnection(Configuration conf, String zkClusterConfKey)
+      throws IOException {
+        Configuration clusterConf = new Configuration(conf);
+        String zkCluster = conf.get(zkClusterConfKey);
+        if (zkCluster != null) {
+          ZKUtil.applyClusterKeyToConf(clusterConf, zkCluster);
+        }
+        return ConnectionFactory.createConnection(clusterConf);
+    }
+    
+    private static Table openTable(Connection connection, Configuration conf,
+        String tableNameConfKey) throws IOException {
+      return connection.getTable(TableName.valueOf(conf.get(tableNameConfKey)));
+    }
+    
+    /**
+     * Attempt to read the next source key/hash pair.
+     * If there are no more, set nextSourceKey to null
+     */
+    private void findNextKeyHashPair() throws IOException {
+      boolean hasNext = sourceHashReader.next();
+      if (hasNext) {
+        nextSourceKey = sourceHashReader.getCurrentKey();
+      } else {
+        // no more keys - last hash goes to the end
+        nextSourceKey = null;
+      }
+    }
+    
+    @Override
+    protected void map(ImmutableBytesWritable key, Result value, Context context)
+        throws IOException, InterruptedException {
+      try {
+        // first, finish any hash batches that end before the scanned row
+        while (nextSourceKey != null && key.compareTo(nextSourceKey) >= 0) {
+          moveToNextBatch(context);
+        }
+        
+        // next, add the scanned row (as long as we've reached the first batch)
+        if (targetHasher.isBatchStarted()) {
+          targetHasher.hashResult(value);
+        }
+      } catch (Throwable t) {
+        mapperException = t;
+        Throwables.propagateIfInstanceOf(t, IOException.class);
+        Throwables.propagateIfInstanceOf(t, InterruptedException.class);
+        Throwables.propagate(t);
+      }
+    }
+
+    /**
+     * If there is an open hash batch, complete it and sync if there are diffs.
+     * Start a new batch, and seek to read the 
+     */
+    private void moveToNextBatch(Context context) throws IOException, InterruptedException {
+      if (targetHasher.isBatchStarted()) {
+        finishBatchAndCompareHashes(context);
+      }
+      targetHasher.startBatch(nextSourceKey);
+      currentSourceHash = sourceHashReader.getCurrentHash();
+      
+      findNextKeyHashPair();
+    }
+
+    /**
+     * Finish the currently open hash batch.
+     * Compare the target hash to the given source hash.
+     * If they do not match, then sync the covered key range.
+     */
+    private void finishBatchAndCompareHashes(Context context)
+        throws IOException, InterruptedException {
+      targetHasher.finishBatch();
+      context.getCounter(Counter.BATCHES).increment(1);
+      if (targetHasher.getBatchSize() == 0) {
+        context.getCounter(Counter.EMPTY_BATCHES).increment(1);
+      }
+      ImmutableBytesWritable targetHash = targetHasher.getBatchHash();
+      if (targetHash.equals(currentSourceHash)) {
+        context.getCounter(Counter.HASHES_MATCHED).increment(1);
+      } else {
+        context.getCounter(Counter.HASHES_NOT_MATCHED).increment(1);
+        
+        ImmutableBytesWritable stopRow = nextSourceKey == null
+                                          ? new ImmutableBytesWritable(sourceTableHash.stopRow)
+                                          : nextSourceKey;
+        
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Hash mismatch.  Key range: " + toHex(targetHasher.getBatchStartKey())
+              + " to " + toHex(stopRow)
+              + " sourceHash: " + toHex(currentSourceHash)
+              + " targetHash: " + toHex(targetHash));
+        }
+        
+        syncRange(context, targetHasher.getBatchStartKey(), stopRow);
+      }
+    }
+    private static String toHex(ImmutableBytesWritable bytes) {
+      return Bytes.toHex(bytes.get(), bytes.getOffset(), bytes.getLength());
+    }
+    
+    private static final CellScanner EMPTY_CELL_SCANNER
+      = new CellScanner(Iterators.<Result>emptyIterator());
+    
+    /**
+     * Rescan the given range directly from the source and target tables.
+     * Count and log differences, and if this is not a dry run, output Puts and Deletes
+     * to make the target table match the source table for this range
+     */
+    private void syncRange(Context context, ImmutableBytesWritable startRow,
+        ImmutableBytesWritable stopRow) throws IOException, InterruptedException {
+      
+      Scan scan = sourceTableHash.initScan();
+      scan.setStartRow(startRow.copyBytes());
+      scan.setStopRow(stopRow.copyBytes());
+      
+      ResultScanner sourceScanner = sourceTable.getScanner(scan);
+      CellScanner sourceCells = new CellScanner(sourceScanner.iterator());
+
+      ResultScanner targetScanner = targetTable.getScanner(scan);
+      CellScanner targetCells = new CellScanner(targetScanner.iterator());
+      
+      boolean rangeMatched = true;
+      byte[] nextSourceRow = sourceCells.nextRow();
+      byte[] nextTargetRow = targetCells.nextRow();
+      while(nextSourceRow != null || nextTargetRow != null) {
+        boolean rowMatched;
+        int rowComparison = compareRowKeys(nextSourceRow, nextTargetRow);
+        if (rowComparison < 0) {
+          if (LOG.isInfoEnabled()) {
+            LOG.info("Target missing row: " + Bytes.toHex(nextSourceRow));
+          }
+          context.getCounter(Counter.TARGETMISSINGROWS).increment(1);
+          
+          rowMatched = syncRowCells(context, nextSourceRow, sourceCells, EMPTY_CELL_SCANNER);
+          nextSourceRow = sourceCells.nextRow();  // advance only source to next row
+        } else if (rowComparison > 0) {
+          if (LOG.isInfoEnabled()) {
+            LOG.info("Source missing row: " + Bytes.toHex(nextTargetRow));
+          }
+          context.getCounter(Counter.SOURCEMISSINGROWS).increment(1);
+          
+          rowMatched = syncRowCells(context, nextTargetRow, EMPTY_CELL_SCANNER, targetCells);
+          nextTargetRow = targetCells.nextRow();  // advance only target to next row
+        } else {
+          // current row is the same on both sides, compare cell by cell
+          rowMatched = syncRowCells(context, nextSourceRow, sourceCells, targetCells);
+          nextSourceRow = sourceCells.nextRow();  
+          nextTargetRow = targetCells.nextRow();
+        }
+        
+        if (!rowMatched) {
+          rangeMatched = false;
+        }
+      }
+      
+      sourceScanner.close();
+      targetScanner.close();
+      
+      context.getCounter(rangeMatched ? Counter.RANGESMATCHED : Counter.RANGESNOTMATCHED)
+        .increment(1);
+    }
+    
+    private static class CellScanner {
+      private final Iterator<Result> results;
+      
+      private byte[] currentRow;
+      private Result currentRowResult;
+      private int nextCellInRow;
+      
+      private Result nextRowResult;
+      
+      public CellScanner(Iterator<Result> results) {
+        this.results = results;
+      }
+      
+      /**
+       * Advance to the next row and return its row key.
+       * Returns null iff there are no more rows.
+       */
+      public byte[] nextRow() {
+        if (nextRowResult == null) {
+          // no cached row - check scanner for more
+          while (results.hasNext()) {
+            nextRowResult = results.next();
+            Cell nextCell = nextRowResult.rawCells()[0];
+            if (currentRow == null
+                || !Bytes.equals(currentRow, 0, currentRow.length, nextCell.getRowArray(),
+                nextCell.getRowOffset(), nextCell.getRowLength())) {
+              // found next row
+              break;
+            } else {
+              // found another result from current row, keep scanning
+              nextRowResult = null;
+            }
+          }
+          
+          if (nextRowResult == null) {
+            // end of data, no more rows
+            currentRowResult = null;
+            currentRow = null;
+            return null;
+          }
+        }
+        
+        // advance to cached result for next row
+        currentRowResult = nextRowResult;
+        nextCellInRow = 0;
+        currentRow = currentRowResult.getRow();
+        nextRowResult = null;
+        return currentRow;
+      }
+      
+      /**
+       * Returns the next Cell in the current row or null iff none remain.
+       */
+      public Cell nextCellInRow() {
+        if (currentRowResult == null) {
+          // nothing left in current row
+          return null;
+        }
+        
+        Cell nextCell = currentRowResult.rawCells()[nextCellInRow];
+        nextCellInRow++;
+        if (nextCellInRow == currentRowResult.size()) {
+          if (results.hasNext()) {
+            Result result = results.next();
+            Cell cell = result.rawCells()[0];
+            if (Bytes.equals(currentRow, 0, currentRow.length, cell.getRowArray(),
+                cell.getRowOffset(), cell.getRowLength())) {
+              // result is part of current row
+              currentRowResult = result;
+              nextCellInRow = 0;
+            } else {
+              // result is part of next row, cache it
+              nextRowResult = result;
+              // current row is complete
+              currentRowResult = null;
+            }
+          } else {
+            // end of data
+            currentRowResult = null;
+          }
+        }
+        return nextCell;
+      }
+    }
+       
+    /**
+     * Compare the cells for the given row from the source and target tables.
+     * Count and log any differences.
+     * If not a dry run, output a Put and/or Delete needed to sync the target table
+     * to match the source table.
+     */
+    private boolean syncRowCells(Context context, byte[] rowKey, CellScanner sourceCells,
+        CellScanner targetCells) throws IOException, InterruptedException {
+      Put put = null;
+      Delete delete = null;
+      long matchingCells = 0;
+      boolean matchingRow = true;
+      Cell sourceCell = sourceCells.nextCellInRow();
+      Cell targetCell = targetCells.nextCellInRow();
+      while (sourceCell != null || targetCell != null) {
+
+        int cellKeyComparison = compareCellKeysWithinRow(sourceCell, targetCell);
+        if (cellKeyComparison < 0) {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Target missing cell: " + sourceCell);
+          }
+          context.getCounter(Counter.TARGETMISSINGCELLS).increment(1);
+          matchingRow = false;
+          
+          if (!dryRun) {
+            if (put == null) {
+              put = new Put(rowKey);
+            }
+            put.add(sourceCell);
+          }
+          
+          sourceCell = sourceCells.nextCellInRow();
+        } else if (cellKeyComparison > 0) {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Source missing cell: " + targetCell);
+          }
+          context.getCounter(Counter.SOURCEMISSINGCELLS).increment(1);
+          matchingRow = false;
+          
+          if (!dryRun) {
+            if (delete == null) {
+              delete = new Delete(rowKey);
+            }
+            // add a tombstone to exactly match the target cell that is missing on the source
+            delete.addColumn(CellUtil.cloneFamily(targetCell),
+                CellUtil.cloneQualifier(targetCell), targetCell.getTimestamp());
+          }
+          
+          targetCell = targetCells.nextCellInRow();
+        } else {
+          // the cell keys are equal, now check values
+          if (CellUtil.matchingValue(sourceCell, targetCell)) {
+            matchingCells++;
+          } else {
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("Different values: ");
+              LOG.debug("  source cell: " + sourceCell
+                  + " value: " + Bytes.toHex(sourceCell.getValueArray(),
+                      sourceCell.getValueOffset(), sourceCell.getValueLength()));
+              LOG.debug("  target cell: " + targetCell
+                  + " value: " + Bytes.toHex(targetCell.getValueArray(),
+                      targetCell.getValueOffset(), targetCell.getValueLength()));
+            }
+            context.getCounter(Counter.DIFFERENTCELLVALUES).increment(1);
+            matchingRow = false;
+            
+            if (!dryRun) {
+              // overwrite target cell
+              if (put == null) {
+                put = new Put(rowKey);
+              }
+              put.add(sourceCell);
+            }
+          }
+          sourceCell = sourceCells.nextCellInRow();
+          targetCell = targetCells.nextCellInRow();
+        }
+        
+        if (!dryRun && sourceTableHash.scanBatch > 0) {
+          if (put != null && put.size() >= sourceTableHash.scanBatch) {
+            context.write(new ImmutableBytesWritable(rowKey), put);
+            put = null;
+          }
+          if (delete != null && delete.size() >= sourceTableHash.scanBatch) {
+            context.write(new ImmutableBytesWritable(rowKey), delete);
+            delete = null;
+          }
+        }
+      }
+      
+      if (!dryRun) {
+        if (put != null) {
+          context.write(new ImmutableBytesWritable(rowKey), put);
+        }
+        if (delete != null) {
+          context.write(new ImmutableBytesWritable(rowKey), delete);
+        }
+      }
+      
+      if (matchingCells > 0) {
+        context.getCounter(Counter.MATCHINGCELLS).increment(matchingCells);
+      }
+      if (matchingRow) {
+        context.getCounter(Counter.MATCHINGROWS).increment(1);
+        return true;
+      } else {
+        context.getCounter(Counter.ROWSWITHDIFFS).increment(1);
+        return false;
+      }
+    }
+
+    private static final CellComparator cellComparator = new CellComparator();
+    /**
+     * Compare row keys of the given Result objects.
+     * Nulls are after non-nulls
+     */
+    private static int compareRowKeys(byte[] r1, byte[] r2) {
+      if (r1 == null) {
+        return 1;  // source missing row
+      } else if (r2 == null) {
+        return -1; // target missing row
+      } else {
+        return cellComparator.compareRows(r1, 0, r1.length, r2, 0, r2.length);
+      }
+    }
+
+    /**
+     * Compare families, qualifiers, and timestamps of the given Cells.
+     * They are assumed to be of the same row.
+     * Nulls are after non-nulls.
+     */
+     private static int compareCellKeysWithinRow(Cell c1, Cell c2) {
+      if (c1 == null) {
+        return 1; // source missing cell
+      }
+      if (c2 == null) {
+        return -1; // target missing cell
+      }
+      
+      int result = CellComparator.compareFamilies(c1, c2);
+      if (result != 0) {
+        return result;
+      }
+      
+      result = CellComparator.compareQualifiers(c1, c2);
+      if (result != 0) {
+        return result;
+      }
+      
+      // note timestamp comparison is inverted - more recent cells first
+      return CellComparator.compareTimestamps(c1, c2);
+    }
+     
+    @Override
+    protected void cleanup(Context context)
+        throws IOException, InterruptedException {
+      if (mapperException == null) {
+        try {
+          finishRemainingHashRanges(context);
+        } catch (Throwable t) {
+          mapperException = t;
+        }
+      }
+      
+      try {
+        sourceTable.close();
+        targetTable.close();
+        sourceConnection.close();
+        targetConnection.close();
+      } catch (Throwable t) {
+        if (mapperException == null) {
+          mapperException = t;
+        } else {
+          LOG.error("Suppressing exception from closing tables", t);
+        }
+      }
+      
+      // propagate first exception
+      if (mapperException != null) {
+        Throwables.propagateIfInstanceOf(mapperException, IOException.class);
+        Throwables.propagateIfInstanceOf(mapperException, InterruptedException.class);
+        Throwables.propagate(mapperException);
+      }
+    }
+
+    private void finishRemainingHashRanges(Context context) throws IOException,
+        InterruptedException {
+      TableSplit split = (TableSplit) context.getInputSplit();
+      byte[] splitEndRow = split.getEndRow();
+      boolean reachedEndOfTable = HashTable.isTableEndRow(splitEndRow);
+
+      // if there are more hash batches that begin before the end of this split move to them
+      while (nextSourceKey != null
+          && (nextSourceKey.compareTo(splitEndRow) < 0 || reachedEndOfTable)) {
+        moveToNextBatch(context);
+      }
+      
+      if (targetHasher.isBatchStarted()) {
+        // need to complete the final open hash batch
+
+        if ((nextSourceKey != null && nextSourceKey.compareTo(splitEndRow) > 0)
+              || (nextSourceKey == null && !Bytes.equals(splitEndRow, sourceTableHash.stopRow))) {
+          // the open hash range continues past the end of this region
+          // add a scan to complete the current hash range
+          Scan scan = sourceTableHash.initScan();
+          scan.setStartRow(splitEndRow);
+          if (nextSourceKey == null) {
+            scan.setStopRow(sourceTableHash.stopRow);
+          } else {
+            scan.setStopRow(nextSourceKey.copyBytes());
+          }
+          
+          ResultScanner targetScanner = targetTable.getScanner(scan);
+          for (Result row : targetScanner) {
+            targetHasher.hashResult(row);          
+          }
+        } // else current batch ends exactly at split end row
+
+        finishBatchAndCompareHashes(context);
+      }
+    }
+  }
+  
+  private static final int NUM_ARGS = 3;
+  private static void printUsage(final String errorMsg) {
+    if (errorMsg != null && errorMsg.length() > 0) {
+      System.err.println("ERROR: " + errorMsg);
+      System.err.println();
+    }
+    System.err.println("Usage: SyncTable [options] <sourcehashdir> <sourcetable> <targettable>");
+    System.err.println();
+    System.err.println("Options:");
+    
+    System.err.println(" sourcezkcluster  ZK cluster key of the source table");
+    System.err.println("                  (defaults to cluster in classpath's config)");
+    System.err.println(" targetzkcluster  ZK cluster key of the target table");
+    System.err.println("                  (defaults to cluster in classpath's config)");
+    System.err.println(" dryrun           if true, output counters but no writes");
+    System.err.println("                  (defaults to false)");
+    System.err.println();
+    System.err.println("Args:");
+    System.err.println(" sourcehashdir    path to HashTable output dir for source table");
+    System.err.println("                  if not specified, then all data will be scanned");
+    System.err.println(" sourcetable      Name of the source table to sync from");
+    System.err.println(" targettable      Name of the target table to sync to");
+    System.err.println();
+    System.err.println("Examples:");
+    System.err.println(" For a dry run SyncTable of tableA from a remote source cluster");
+    System.err.println(" to a local target cluster:");
+    System.err.println(" $ bin/hbase " +
+        "org.apache.hadoop.hbase.mapreduce.SyncTable --dryrun=true"
+        + " --sourcezkcluster=zk1.example.com,zk2.example.com,zk3.example.com:2181:/hbase"
+        + " hdfs://nn:9000/hashes/tableA tableA tableA");
+  }
+  
+  private boolean doCommandLine(final String[] args) {
+    if (args.length < NUM_ARGS) {
+      printUsage(null);
+      return false;
+    }
+    try {
+      sourceHashDir = new Path(args[args.length - 3]);
+      sourceTableName = args[args.length - 2];
+      targetTableName = args[args.length - 1];
+            
+      for (int i = 0; i < args.length - NUM_ARGS; i++) {
+        String cmd = args[i];
+        if (cmd.equals("-h") || cmd.startsWith("--h")) {
+          printUsage(null);
+          return false;
+        }
+        
+        final String sourceZkClusterKey = "--sourcezkcluster=";
+        if (cmd.startsWith(sourceZkClusterKey)) {
+          sourceZkCluster = cmd.substring(sourceZkClusterKey.length());
+          continue;
+        }
+        
+        final String targetZkClusterKey = "--targetzkcluster=";
+        if (cmd.startsWith(targetZkClusterKey)) {
+          targetZkCluster = cmd.substring(targetZkClusterKey.length());
+          continue;
+        }
+        
+        final String dryRunKey = "--dryrun=";
+        if (cmd.startsWith(dryRunKey)) {
+          dryRun = Boolean.parseBoolean(cmd.substring(dryRunKey.length()));
+          continue;
+        }
+        
+        printUsage("Invalid argument '" + cmd + "'");
+        return false;
+      }
+
+      
+    } catch (Exception e) {
+      e.printStackTrace();
+      printUsage("Can't start because " + e.getMessage());
+      return false;
+    }
+    return true;
+  }
+  
+  /**
+   * Main entry point.
+   */
+  public static void main(String[] args) throws Exception {
+    int ret = ToolRunner.run(new SyncTable(HBaseConfiguration.create()), args);
+    System.exit(ret);
+  }
+
+  @Override
+  public int run(String[] args) throws Exception {
+    String[] otherArgs = new GenericOptionsParser(getConf(), args).getRemainingArgs();
+    if (!doCommandLine(otherArgs)) {
+      return 1;
+    }
+
+    Job job = createSubmittableJob(otherArgs);
+    if (!job.waitForCompletion(true)) {
+      LOG.info("Map-reduce job failed!");
+      return 1;
+    }
+    counters = job.getCounters();
+    return 0;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/ae121ee7/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHashTable.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHashTable.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHashTable.java
new file mode 100644
index 0000000..762f530
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHashTable.java
@@ -0,0 +1,192 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.MapFile;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
+
+/**
+ * Basic test for the HashTable M/R tool
+ */
+@Category(LargeTests.class)
+public class TestHashTable {
+  
+  private static final Log LOG = LogFactory.getLog(TestHashTable.class);
+  
+  private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();  
+  
+  @BeforeClass
+  public static void beforeClass() throws Exception {
+    TEST_UTIL.startMiniCluster(3);
+    TEST_UTIL.startMiniMapReduceCluster();
+  }
+  
+  @AfterClass
+  public static void afterClass() throws Exception {
+    TEST_UTIL.shutdownMiniMapReduceCluster();
+    TEST_UTIL.shutdownMiniCluster();
+  }
+  
+  @Test
+  public void testHashTable() throws Exception {
+    final String tableName = "testHashTable";
+    final byte[] family = Bytes.toBytes("family");
+    final byte[] column1 = Bytes.toBytes("c1");
+    final byte[] column2 = Bytes.toBytes("c2");
+    final byte[] column3 = Bytes.toBytes("c3");
+    
+    int numRows = 100;
+    int numRegions = 10;
+    int numHashFiles = 3;
+    
+    byte[][] splitRows = new byte[numRegions-1][];
+    for (int i = 1; i < numRegions; i++) {
+      splitRows[i-1] = Bytes.toBytes(numRows * i / numRegions);
+    }
+    
+    long timestamp = 1430764183454L;
+    // put rows into the first table
+    HTable t1 = TEST_UTIL.createTable(TableName.valueOf(tableName), family, splitRows);
+    for (int i = 0; i < numRows; i++) {
+      Put p = new Put(Bytes.toBytes(i), timestamp);
+      p.addColumn(family, column1, column1);
+      p.addColumn(family, column2, column2);
+      p.addColumn(family, column3, column3);
+      t1.put(p);
+    }
+    t1.close();
+    
+    HashTable hashTable = new HashTable(TEST_UTIL.getConfiguration());
+    
+    Path testDir = TEST_UTIL.getDataTestDirOnTestFS(tableName);
+    
+    long batchSize = 300;
+    int code = hashTable.run(new String[] { 
+        "--batchsize=" + batchSize,
+        "--numhashfiles=" + numHashFiles,
+        "--scanbatch=2",
+        tableName,
+        testDir.toString()});
+    assertEquals("test job failed", 0, code);
+    
+    FileSystem fs = TEST_UTIL.getTestFileSystem();
+    
+    HashTable.TableHash tableHash = HashTable.TableHash.read(fs.getConf(), testDir);
+    assertEquals(tableName, tableHash.tableName);
+    assertEquals(batchSize, tableHash.batchSize);
+    assertEquals(numHashFiles, tableHash.numHashFiles);
+    assertEquals(numHashFiles - 1, tableHash.partitions.size());
+    for (ImmutableBytesWritable bytes : tableHash.partitions) {
+      LOG.debug("partition: " + Bytes.toInt(bytes.get()));
+    }
+    
+    ImmutableMap<Integer, ImmutableBytesWritable> expectedHashes
+      = ImmutableMap.<Integer, ImmutableBytesWritable>builder()
+      .put(-1, new ImmutableBytesWritable(Bytes.fromHex("714cb10a9e3b5569852980edd8c6ca2f")))
+      .put(5, new ImmutableBytesWritable(Bytes.fromHex("28d961d9252ce8f8d44a07b38d3e1d96")))
+      .put(10, new ImmutableBytesWritable(Bytes.fromHex("f6bbc4a224d8fd929b783a92599eaffa")))
+      .put(15, new ImmutableBytesWritable(Bytes.fromHex("522deb5d97f73a414ecc11457be46881")))
+      .put(20, new ImmutableBytesWritable(Bytes.fromHex("b026f2611aaa46f7110116d807545352")))
+      .put(25, new ImmutableBytesWritable(Bytes.fromHex("39ffc1a3094aa12a2e90ffd9cef2ce93")))
+      .put(30, new ImmutableBytesWritable(Bytes.fromHex("f6b4d75727ce9a30ac29e4f08f601666")))
+      .put(35, new ImmutableBytesWritable(Bytes.fromHex("422e2d2f1eb79a8f02171a705a42c090")))
+      .put(40, new ImmutableBytesWritable(Bytes.fromHex("559ad61c900fffefea0a15abf8a97bc3")))
+      .put(45, new ImmutableBytesWritable(Bytes.fromHex("23019084513eca41cee436b2a29611cb")))
+      .put(50, new ImmutableBytesWritable(Bytes.fromHex("b40467d222ddb4949b142fe145ee9edc")))
+      .put(55, new ImmutableBytesWritable(Bytes.fromHex("372bf89fcd8ca4b7ab3c1add9d07f7e4")))
+      .put(60, new ImmutableBytesWritable(Bytes.fromHex("69ae0585e6255de27dce974e332b8f8b")))
+      .put(65, new ImmutableBytesWritable(Bytes.fromHex("8029610044297aad0abdbecd485d8e59")))
+      .put(70, new ImmutableBytesWritable(Bytes.fromHex("de5f784f7f78987b6e57ecfd81c8646f")))
+      .put(75, new ImmutableBytesWritable(Bytes.fromHex("1cd757cc4e1715c8c3b1c24447a1ec56")))
+      .put(80, new ImmutableBytesWritable(Bytes.fromHex("f9a53aacfeb6142b08066615e7038095")))
+      .put(85, new ImmutableBytesWritable(Bytes.fromHex("89b872b7e639df32d3276b33928c0c91")))
+      .put(90, new ImmutableBytesWritable(Bytes.fromHex("45eeac0646d46a474ea0484175faed38")))
+      .put(95, new ImmutableBytesWritable(Bytes.fromHex("f57c447e32a08f4bf1abb2892839ac56")))
+      .build();
+  
+    Map<Integer, ImmutableBytesWritable> actualHashes
+      = new HashMap<Integer, ImmutableBytesWritable>();
+    Path dataDir = new Path(testDir, HashTable.HASH_DATA_DIR);
+    for (int i = 0; i < numHashFiles; i++) {
+      Path hashPath = new Path(dataDir, HashTable.TableHash.getDataFileName(i));
+      
+      MapFile.Reader reader = new MapFile.Reader(hashPath, fs.getConf());
+      ImmutableBytesWritable key = new ImmutableBytesWritable();
+      ImmutableBytesWritable hash = new ImmutableBytesWritable();
+      while(reader.next(key, hash)) {
+        String keyString = Bytes.toHex(key.get(), key.getOffset(), key.getLength());
+        LOG.debug("Key: " + (keyString.isEmpty() ? "-1" : Integer.parseInt(keyString, 16))
+            + " Hash: " + Bytes.toHex(hash.get(), hash.getOffset(), hash.getLength()));
+        
+        int intKey = -1;
+        if (key.getLength() > 0) {
+          intKey = Bytes.toInt(key.get(),  key.getOffset(), key.getLength());
+        }
+        if (actualHashes.containsKey(intKey)) {
+          Assert.fail("duplicate key in data files: " + intKey);
+        }
+        actualHashes.put(intKey, new ImmutableBytesWritable(hash.copyBytes()));
+      }
+      reader.close();
+    }
+    
+    FileStatus[] files = fs.listStatus(testDir);
+    for (FileStatus file : files) {
+      LOG.debug("Output file: " + file.getPath());
+    }
+    
+    files = fs.listStatus(dataDir);
+    for (FileStatus file : files) {
+      LOG.debug("Data file: " + file.getPath());
+    }
+    
+    if (!expectedHashes.equals(actualHashes)) {
+      LOG.error("Diff: " + Maps.difference(expectedHashes, actualHashes));
+    }
+    Assert.assertEquals(expectedHashes, actualHashes);
+    
+    TEST_UTIL.deleteTable(tableName);
+    TEST_UTIL.cleanupDataTestDirOnTestFS();
+  }
+  
+
+}