You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ap...@apache.org on 2015/06/13 03:24:40 UTC
[5/5] hbase git commit: HBASE-13639 SyncTable - rsync for HBase tables
HBASE-13639 SyncTable - rsync for HBase tables
Signed-off-by: Andrew Purtell <ap...@apache.org>
Amending-Author: Andrew Purtell <ap...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/61c9a250
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/61c9a250
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/61c9a250
Branch: refs/heads/0.98
Commit: 61c9a25010f98133cae702d565ca5dc5a2b8b50b
Parents: 3f31327
Author: Dave Latham <da...@yahoo-inc.com>
Authored: Fri Jun 12 16:00:02 2015 -0700
Committer: Andrew Purtell <ap...@apache.org>
Committed: Fri Jun 12 16:00:02 2015 -0700
----------------------------------------------------------------------
.../org/apache/hadoop/hbase/CellComparator.java | 63 ++
.../java/org/apache/hadoop/hbase/CellUtil.java | 5 +
.../org/apache/hadoop/hbase/util/Bytes.java | 54 +-
.../org/apache/hadoop/hbase/util/TestBytes.java | 48 ++
.../hadoop/hbase/mapreduce/HashTable.java | 756 ++++++++++++++++++
.../hadoop/hbase/mapreduce/SyncTable.java | 772 +++++++++++++++++++
.../hadoop/hbase/mapreduce/TestHashTable.java | 192 +++++
.../hadoop/hbase/mapreduce/TestSyncTable.java | 335 ++++++++
8 files changed, 2213 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/61c9a250/hbase-common/src/main/java/org/apache/hadoop/hbase/CellComparator.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/CellComparator.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/CellComparator.java
index 245b181..e56f25e 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/CellComparator.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/CellComparator.java
@@ -135,6 +135,50 @@ public class CellComparator implements Comparator<Cell>, Serializable{
return a.getTypeByte() == b.getTypeByte();
}
+ public static int compareFamilies(Cell left, Cell right) {
+ return Bytes.compareTo(left.getFamilyArray(), left.getFamilyOffset(), left.getFamilyLength(),
+ right.getFamilyArray(), right.getFamilyOffset(), right.getFamilyLength());
+ }
+
+ public static int compareQualifiers(Cell left, Cell right) {
+ return Bytes.compareTo(left.getQualifierArray(), left.getQualifierOffset(),
+ left.getQualifierLength(), right.getQualifierArray(), right.getQualifierOffset(),
+ right.getQualifierLength());
+ }
+
+ /**
+ * Do not use comparing rows from hbase:meta. Meta table Cells have schema (table,startrow,hash)
+ * so can't be treated as plain byte arrays as this method does.
+ */
+ public static int compareRows(final Cell left, final Cell right) {
+ return Bytes.compareTo(left.getRowArray(), left.getRowOffset(), left.getRowLength(),
+ right.getRowArray(), right.getRowOffset(), right.getRowLength());
+ }
+
+ /**
+ * Do not use comparing rows from hbase:meta. Meta table Cells have schema (table,startrow,hash)
+ * so can't be treated as plain byte arrays as this method does.
+ */
+ public static int compareRows(byte[] left, int loffset, int llength, byte[] right, int roffset,
+ int rlength) {
+ return Bytes.compareTo(left, loffset, llength, right, roffset, rlength);
+ }
+
+ /**
+ * Compares cell's timestamps in DESCENDING order.
+ * The below older timestamps sorting ahead of newer timestamps looks
+ * wrong but it is intentional. This way, newer timestamps are first
+ * found when we iterate over a memstore and newer versions are the
+ * first we trip over when reading from a store file.
+ * @return 1 if left's timestamp < right's timestamp
+ * -1 if left's timestamp > right's timestamp
+ * 0 if both timestamps are equal
+ */
+ public static int compareTimestamps(final Cell left, final Cell right) {
+ long ltimestamp = left.getTimestamp();
+ long rtimestamp = right.getTimestamp();
+ return compareTimestamps(ltimestamp, rtimestamp);
+ }
/********************* hashCode ************************/
@@ -235,4 +279,23 @@ public class CellComparator implements Comparator<Cell>, Serializable{
return 0 == compareStaticIgnoreMvccVersion(a, b);
}
+ /**
+ * Compares timestamps in DESCENDING order.
+ * The below older timestamps sorting ahead of newer timestamps looks
+ * wrong but it is intentional. This way, newer timestamps are first
+ * found when we iterate over a memstore and newer versions are the
+ * first we trip over when reading from a store file.
+ * @return 1 if left timestamp < right timestamp
+ * -1 if left timestamp > right timestamp
+ * 0 if both timestamps are equal
+ */
+ static int compareTimestamps(final long ltimestamp, final long rtimestamp) {
+ if (ltimestamp < rtimestamp) {
+ return 1;
+ } else if (ltimestamp > rtimestamp) {
+ return -1;
+ }
+ return 0;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/61c9a250/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java
index 7c82c6c..cdfc72b 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java
@@ -364,6 +364,11 @@ public final class CellUtil {
return Bytes.equals(left.getValueArray(), left.getValueOffset(), left.getValueLength(),
buf, 0, buf.length);
}
+
+ public static boolean matchingTimestamp(Cell a, Cell b) {
+ return CellComparator.compareTimestamps(a.getTimestamp(), b.getTimestamp()) == 0;
+ }
+
/**
* @return True if a delete type, a {@link KeyValue.Type#Delete} or
* a {KeyValue.Type#DeleteFamily} or a {@link KeyValue.Type#DeleteColumn}
http://git-wip-us.apache.org/repos/asf/hbase/blob/61c9a250/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Bytes.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Bytes.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Bytes.java
index 0dc18b1..6e548ae 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Bytes.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Bytes.java
@@ -431,7 +431,7 @@ public class Bytes {
if (off + len > b.length) len = b.length - off;
for (int i = off; i < off + len ; ++i ) {
int ch = b[i] & 0xFF;
- if ( (ch >= '0' && ch <= '9')
+ if ((ch >= '0' && ch <= '9')
|| (ch >= 'A' && ch <= 'Z')
|| (ch >= 'a' && ch <= 'z')
|| " `~!@#$%^&*()-_=+[]{}|;:'\",.<>/?".indexOf(ch) >= 0 ) {
@@ -2276,14 +2276,47 @@ public class Bytes {
}
return result;
}
-
+
+ private static final char[] HEX_CHARS = {
+ '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f'
+ };
+
+ /**
+ * Convert a byte range into a hex string
+ */
+ public static String toHex(byte[] b, int offset, int length) {
+ checkArgument(length <= Integer.MAX_VALUE / 2);
+ int numChars = length * 2;
+ char[] ch = new char[numChars];
+ for (int i = 0; i < numChars; i += 2)
+ {
+ byte d = b[offset + i/2];
+ ch[i] = HEX_CHARS[(d >> 4) & 0x0F];
+ ch[i+1] = HEX_CHARS[d & 0x0F];
+ }
+ return new String(ch);
+ }
+
/**
* Convert a byte array into a hex string
- * @param b
*/
public static String toHex(byte[] b) {
- checkArgument(b.length > 0, "length must be greater than 0");
- return String.format("%x", new BigInteger(1, b));
+ return toHex(b, 0, b.length);
+ }
+
+ private static int hexCharToNibble(char ch) {
+ if (ch <= '9' && ch >= '0') {
+ return ch - '0';
+ } else if (ch >= 'a' && ch <= 'f') {
+ return ch - 'a' + 10;
+ } else if (ch >= 'A' && ch <= 'F') {
+ return ch - 'A' + 10;
+ }
+ throw new IllegalArgumentException("Invalid hex char: " + ch);
+ }
+
+ private static byte hexCharsToByte(char c1, char c2) {
+ return (byte) ((hexCharToNibble(c1) << 4) | hexCharToNibble(c2));
}
/**
@@ -2292,14 +2325,11 @@ public class Bytes {
* @param hex
*/
public static byte[] fromHex(String hex) {
- checkArgument(hex.length() > 0, "length must be greater than 0");
checkArgument(hex.length() % 2 == 0, "length must be a multiple of 2");
- // Make sure letters are upper case
- hex = hex.toUpperCase();
- byte[] b = new byte[hex.length() / 2];
- for (int i = 0; i < b.length; i++) {
- b[i] = (byte)((toBinaryFromHex((byte)hex.charAt(2 * i)) << 4) +
- toBinaryFromHex((byte)hex.charAt((2 * i + 1))));
+ int len = hex.length();
+ byte[] b = new byte[len / 2];
+ for (int i = 0; i < len; i += 2) {
+ b[i / 2] = hexCharsToByte(hex.charAt(i),hex.charAt(i+1));
}
return b;
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/61c9a250/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestBytes.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestBytes.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestBytes.java
index 75baa03..3ec0afb 100644
--- a/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestBytes.java
+++ b/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestBytes.java
@@ -24,7 +24,9 @@ import java.io.DataOutputStream;
import java.io.IOException;
import java.math.BigDecimal;
import java.nio.ByteBuffer;
+import java.util.ArrayList;
import java.util.Arrays;
+import java.util.List;
import java.util.Random;
import junit.framework.TestCase;
@@ -501,5 +503,51 @@ public class TestBytes extends TestCase {
Assert.assertEquals(i, b[i]);
}
}
+
+ public void testToFromHex() {
+ List<String> testStrings = new ArrayList<String>();
+ testStrings.addAll(Arrays.asList(new String[] {
+ "",
+ "00",
+ "A0",
+ "ff",
+ "FFffFFFFFFFFFF",
+ "12",
+ "0123456789abcdef",
+ "283462839463924623984692834692346ABCDFEDDCA0",
+ }));
+ for (String testString : testStrings)
+ {
+ byte[] byteData = Bytes.fromHex(testString);
+ Assert.assertEquals(testString.length() / 2, byteData.length);
+ String result = Bytes.toHex(byteData);
+ Assert.assertTrue(testString.equalsIgnoreCase(result));
+ }
+
+ List<byte[]> testByteData = new ArrayList<byte[]>();
+ testByteData.addAll(Arrays.asList(new byte[][] {
+ new byte[0],
+ new byte[1],
+ new byte[10],
+ new byte[] {1, 2, 3, 4, 5},
+ new byte[] {(byte) 0xFF},
+ }));
+ Random r = new Random();
+ for (int i = 0; i < 20; i++)
+ {
+
+ byte[] bytes = new byte[r.nextInt(100)];
+ r.nextBytes(bytes);
+ testByteData.add(bytes);
+ }
+
+ for (byte[] testData : testByteData)
+ {
+ String hexString = Bytes.toHex(testData);
+ Assert.assertEquals(testData.length * 2, hexString.length());
+ byte[] result = Bytes.fromHex(hexString);
+ Assert.assertArrayEquals(testData, result);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/61c9a250/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HashTable.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HashTable.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HashTable.java
new file mode 100644
index 0000000..eea1614
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HashTable.java
@@ -0,0 +1,756 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.HConnection;
+import org.apache.hadoop.hbase.client.HConnectionManager;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.io.MapFile;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.MapFileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner;
+import org.apache.hadoop.util.GenericOptionsParser;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+import com.google.common.base.Charsets;
+import com.google.common.base.Throwables;
+import com.google.common.collect.Ordering;
+
+public class HashTable extends Configured implements Tool {
+
+ private static final Log LOG = LogFactory.getLog(HashTable.class);
+
+ private static final int DEFAULT_BATCH_SIZE = 8000;
+
+ private final static String HASH_BATCH_SIZE_CONF_KEY = "hash.batch.size";
+ final static String PARTITIONS_FILE_NAME = "partitions";
+ final static String MANIFEST_FILE_NAME = "manifest";
+ final static String HASH_DATA_DIR = "hashes";
+ final static String OUTPUT_DATA_FILE_PREFIX = "part-r-";
+ private final static String TMP_MANIFEST_FILE_NAME = "manifest.tmp";
+
+ TableHash tableHash = new TableHash();
+ Path destPath;
+
+ public HashTable(Configuration conf) {
+ super(conf);
+ }
+
+ public static class TableHash {
+
+ Path hashDir;
+
+ String tableName;
+ String families = null;
+ long batchSize = DEFAULT_BATCH_SIZE;
+ int numHashFiles = 0;
+ byte[] startRow = HConstants.EMPTY_START_ROW;
+ byte[] stopRow = HConstants.EMPTY_END_ROW;
+ int scanBatch = 0;
+ int versions = -1;
+ long startTime = 0;
+ long endTime = 0;
+
+ List<ImmutableBytesWritable> partitions;
+
+ public static TableHash read(Configuration conf, Path hashDir) throws IOException {
+ TableHash tableHash = new TableHash();
+ FileSystem fs = hashDir.getFileSystem(conf);
+ tableHash.hashDir = hashDir;
+ tableHash.readPropertiesFile(fs, new Path(hashDir, MANIFEST_FILE_NAME));
+ tableHash.readPartitionFile(fs, conf, new Path(hashDir, PARTITIONS_FILE_NAME));
+ return tableHash;
+ }
+
+ void writePropertiesFile(FileSystem fs, Path path) throws IOException {
+ Properties p = new Properties();
+ p.setProperty("table", tableName);
+ if (families != null) {
+ p.setProperty("columnFamilies", families);
+ }
+ p.setProperty("targetBatchSize", Long.toString(batchSize));
+ p.setProperty("numHashFiles", Integer.toString(numHashFiles));
+ if (!isTableStartRow(startRow)) {
+ p.setProperty("startRowHex", Bytes.toHex(startRow));
+ }
+ if (!isTableEndRow(stopRow)) {
+ p.setProperty("stopRowHex", Bytes.toHex(stopRow));
+ }
+ if (scanBatch > 0) {
+ p.setProperty("scanBatch", Integer.toString(scanBatch));
+ }
+ if (versions >= 0) {
+ p.setProperty("versions", Integer.toString(versions));
+ }
+ if (startTime != 0) {
+ p.setProperty("startTimestamp", Long.toString(startTime));
+ }
+ if (endTime != 0) {
+ p.setProperty("endTimestamp", Long.toString(endTime));
+ }
+
+ FSDataOutputStream out = fs.create(path);
+ p.store(new OutputStreamWriter(out, Charsets.UTF_8), null);
+ out.close();
+ }
+
+ void readPropertiesFile(FileSystem fs, Path path) throws IOException {
+ FSDataInputStream in = fs.open(path);
+ Properties p = new Properties();
+ p.load(new InputStreamReader(in, Charsets.UTF_8));
+ in.close();
+
+ tableName = p.getProperty("table");
+ families = p.getProperty("columnFamilies");
+ batchSize = Long.parseLong(p.getProperty("targetBatchSize"));
+ numHashFiles = Integer.parseInt(p.getProperty("numHashFiles"));
+
+ String startRowHex = p.getProperty("startRowHex");
+ if (startRowHex != null) {
+ startRow = Bytes.fromHex(startRowHex);
+ }
+ String stopRowHex = p.getProperty("stopRowHex");
+ if (stopRowHex != null) {
+ stopRow = Bytes.fromHex(stopRowHex);
+ }
+
+ String scanBatchString = p.getProperty("scanBatch");
+ if (scanBatchString != null) {
+ scanBatch = Integer.parseInt(scanBatchString);
+ }
+
+ String versionString = p.getProperty("versions");
+ if (versionString != null) {
+ versions = Integer.parseInt(versionString);
+ }
+
+ String startTimeString = p.getProperty("startTimestamp");
+ if (startTimeString != null) {
+ startTime = Long.parseLong(startTimeString);
+ }
+
+ String endTimeString = p.getProperty("endTimestamp");
+ if (endTimeString != null) {
+ endTime = Long.parseLong(endTimeString);
+ }
+ }
+
+ Scan initScan() throws IOException {
+ Scan scan = new Scan();
+ scan.setCacheBlocks(false);
+ if (startTime != 0 || endTime != 0) {
+ scan.setTimeRange(startTime, endTime == 0 ? HConstants.LATEST_TIMESTAMP : endTime);
+ }
+ if (scanBatch > 0) {
+ scan.setBatch(scanBatch);
+ }
+ if (versions >= 0) {
+ scan.setMaxVersions(versions);
+ }
+ if (!isTableStartRow(startRow)) {
+ scan.setStartRow(startRow);
+ }
+ if (!isTableEndRow(stopRow)) {
+ scan.setStopRow(stopRow);
+ }
+ if(families != null) {
+ for(String fam : families.split(",")) {
+ scan.addFamily(Bytes.toBytes(fam));
+ }
+ }
+ return scan;
+ }
+
+ /**
+ * Choose partitions between row ranges to hash to a single output file
+ * Selects region boundaries that fall within the scan range, and groups them
+ * into the desired number of partitions.
+ */
+ void selectPartitions(Pair<byte[][], byte[][]> regionStartEndKeys) {
+ List<byte[]> startKeys = new ArrayList<byte[]>();
+ for (int i = 0; i < regionStartEndKeys.getFirst().length; i++) {
+ byte[] regionStartKey = regionStartEndKeys.getFirst()[i];
+ byte[] regionEndKey = regionStartEndKeys.getSecond()[i];
+
+ // if scan begins after this region, or starts before this region, then drop this region
+ // in other words:
+ // IF (scan begins before the end of this region
+ // AND scan ends before the start of this region)
+ // THEN include this region
+ if ((isTableStartRow(startRow) || isTableEndRow(regionEndKey)
+ || Bytes.compareTo(startRow, regionEndKey) < 0)
+ && (isTableEndRow(stopRow) || isTableStartRow(regionStartKey)
+ || Bytes.compareTo(stopRow, regionStartKey) > 0)) {
+ startKeys.add(regionStartKey);
+ }
+ }
+
+ int numRegions = startKeys.size();
+ if (numHashFiles == 0) {
+ numHashFiles = numRegions / 100;
+ }
+ if (numHashFiles == 0) {
+ numHashFiles = 1;
+ }
+ if (numHashFiles > numRegions) {
+ // can't partition within regions
+ numHashFiles = numRegions;
+ }
+
+ // choose a subset of start keys to group regions into ranges
+ partitions = new ArrayList<ImmutableBytesWritable>(numHashFiles - 1);
+ // skip the first start key as it is not a partition between ranges.
+ for (long i = 1; i < numHashFiles; i++) {
+ int splitIndex = (int) (numRegions * i / numHashFiles);
+ partitions.add(new ImmutableBytesWritable(startKeys.get(splitIndex)));
+ }
+ }
+
+ void writePartitionFile(Configuration conf, Path path) throws IOException {
+ FileSystem fs = path.getFileSystem(conf);
+ @SuppressWarnings("deprecation")
+ SequenceFile.Writer writer = SequenceFile.createWriter(
+ fs, conf, path, ImmutableBytesWritable.class, NullWritable.class);
+
+ for (int i = 0; i < partitions.size(); i++) {
+ writer.append(partitions.get(i), NullWritable.get());
+ }
+ writer.close();
+ }
+
+ private void readPartitionFile(FileSystem fs, Configuration conf, Path path)
+ throws IOException {
+ @SuppressWarnings("deprecation")
+ SequenceFile.Reader reader = new SequenceFile.Reader(fs, path, conf);
+ ImmutableBytesWritable key = new ImmutableBytesWritable();
+ partitions = new ArrayList<ImmutableBytesWritable>();
+ while (reader.next(key)) {
+ partitions.add(new ImmutableBytesWritable(key.copyBytes()));
+ }
+ reader.close();
+
+ if (!Ordering.natural().isOrdered(partitions)) {
+ throw new IOException("Partitions are not ordered!");
+ }
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("tableName=").append(tableName);
+ if (families != null) {
+ sb.append(", families=").append(families);
+ }
+ sb.append(", batchSize=").append(batchSize);
+ sb.append(", numHashFiles=").append(numHashFiles);
+ if (!isTableStartRow(startRow)) {
+ sb.append(", startRowHex=").append(Bytes.toHex(startRow));
+ }
+ if (!isTableEndRow(stopRow)) {
+ sb.append(", stopRowHex=").append(Bytes.toHex(stopRow));
+ }
+ if (scanBatch >= 0) {
+ sb.append(", scanBatch=").append(scanBatch);
+ }
+ if (versions >= 0) {
+ sb.append(", versions=").append(versions);
+ }
+ if (startTime != 0) {
+ sb.append("startTime=").append(startTime);
+ }
+ if (endTime != 0) {
+ sb.append("endTime=").append(endTime);
+ }
+ return sb.toString();
+ }
+
+ static String getDataFileName(int hashFileIndex) {
+ return String.format(HashTable.OUTPUT_DATA_FILE_PREFIX + "%05d", hashFileIndex);
+ }
+
+ /**
+ * Open a TableHash.Reader starting at the first hash at or after the given key.
+ * @throws IOException
+ */
+ public Reader newReader(Configuration conf, ImmutableBytesWritable startKey)
+ throws IOException {
+ return new Reader(conf, startKey);
+ }
+
+ public class Reader implements java.io.Closeable {
+ private final Configuration conf;
+
+ private int hashFileIndex;
+ private MapFile.Reader mapFileReader;
+
+ private boolean cachedNext;
+ private ImmutableBytesWritable key;
+ private ImmutableBytesWritable hash;
+
+ Reader(Configuration conf, ImmutableBytesWritable startKey) throws IOException {
+ this.conf = conf;
+ int partitionIndex = Collections.binarySearch(partitions, startKey);
+ if (partitionIndex >= 0) {
+ // if the key is equal to a partition, then go the file after that partition
+ hashFileIndex = partitionIndex+1;
+ } else {
+ // if the key is between partitions, then go to the file between those partitions
+ hashFileIndex = -1-partitionIndex;
+ }
+ openHashFile();
+
+ // MapFile's don't make it easy to seek() so that the subsequent next() returns
+ // the desired key/value pair. So we cache it for the first call of next().
+ hash = new ImmutableBytesWritable();
+ key = (ImmutableBytesWritable) mapFileReader.getClosest(startKey, hash);
+ if (key == null) {
+ cachedNext = false;
+ hash = null;
+ } else {
+ cachedNext = true;
+ }
+ }
+
+ /**
+ * Read the next key/hash pair.
+ * Returns true if such a pair exists and false when at the end of the data.
+ */
+ public boolean next() throws IOException {
+ if (cachedNext) {
+ cachedNext = false;
+ return true;
+ }
+ key = new ImmutableBytesWritable();
+ hash = new ImmutableBytesWritable();
+ while (true) {
+ boolean hasNext = mapFileReader.next(key, hash);
+ if (hasNext) {
+ return true;
+ }
+ hashFileIndex++;
+ if (hashFileIndex < TableHash.this.numHashFiles) {
+ mapFileReader.close();
+ openHashFile();
+ } else {
+ key = null;
+ hash = null;
+ return false;
+ }
+ }
+ }
+
+ /**
+ * Get the current key
+ * @return the current key or null if there is no current key
+ */
+ public ImmutableBytesWritable getCurrentKey() {
+ return key;
+ }
+
+ /**
+ * Get the current hash
+ * @return the current hash or null if there is no current hash
+ */
+ public ImmutableBytesWritable getCurrentHash() {
+ return hash;
+ }
+
+ private void openHashFile() throws IOException {
+ if (mapFileReader != null) {
+ mapFileReader.close();
+ }
+ Path dataDir = new Path(TableHash.this.hashDir, HASH_DATA_DIR);
+ Path dataFile = new Path(dataDir, getDataFileName(hashFileIndex));
+ mapFileReader = new MapFile.Reader(dataFile, conf);
+ }
+
+ @Override
+ public void close() throws IOException {
+ mapFileReader.close();
+ }
+ }
+ }
+
+ static boolean isTableStartRow(byte[] row) {
+ return Bytes.equals(HConstants.EMPTY_START_ROW, row);
+ }
+
+ static boolean isTableEndRow(byte[] row) {
+ return Bytes.equals(HConstants.EMPTY_END_ROW, row);
+ }
+
+ public Job createSubmittableJob(String[] args) throws IOException {
+ Path partitionsPath = new Path(destPath, PARTITIONS_FILE_NAME);
+ generatePartitions(partitionsPath);
+
+ Job job = Job.getInstance(getConf(),
+ getConf().get("mapreduce.job.name", "hashTable_" + tableHash.tableName));
+ Configuration jobConf = job.getConfiguration();
+ jobConf.setLong(HASH_BATCH_SIZE_CONF_KEY, tableHash.batchSize);
+ job.setJarByClass(HashTable.class);
+
+ TableMapReduceUtil.initTableMapperJob(tableHash.tableName, tableHash.initScan(),
+ HashMapper.class, ImmutableBytesWritable.class, ImmutableBytesWritable.class, job);
+
+ // use a TotalOrderPartitioner and reducers to group region output into hash files
+ job.setPartitionerClass(TotalOrderPartitioner.class);
+ TotalOrderPartitioner.setPartitionFile(jobConf, partitionsPath);
+ job.setReducerClass(Reducer.class); // identity reducer
+ job.setNumReduceTasks(tableHash.numHashFiles);
+ job.setOutputKeyClass(ImmutableBytesWritable.class);
+ job.setOutputValueClass(ImmutableBytesWritable.class);
+ job.setOutputFormatClass(MapFileOutputFormat.class);
+ FileOutputFormat.setOutputPath(job, new Path(destPath, HASH_DATA_DIR));
+
+ return job;
+ }
+
+ private void generatePartitions(Path partitionsPath) throws IOException {
+ Pair<byte[][], byte[][]> regionKeys;
+ HConnection connection = HConnectionManager.createConnection(getConf());
+ try {
+ HTable table = (HTable)connection.getTable(TableName.valueOf(tableHash.tableName));
+ try {
+ regionKeys = table.getStartEndKeys();
+ } finally {
+ table.close();
+ }
+ } finally {
+ connection.close();
+ }
+
+ tableHash.selectPartitions(regionKeys);
+ LOG.info("Writing " + tableHash.partitions.size() + " partition keys to " + partitionsPath);
+
+ tableHash.writePartitionFile(getConf(), partitionsPath);
+ }
+
+ static class ResultHasher {
+ private MessageDigest digest;
+
+ private boolean batchStarted = false;
+ private ImmutableBytesWritable batchStartKey;
+ private ImmutableBytesWritable batchHash;
+ private long batchSize = 0;
+
+
+ public ResultHasher() {
+ try {
+ digest = MessageDigest.getInstance("MD5");
+ } catch (NoSuchAlgorithmException e) {
+ Throwables.propagate(e);
+ }
+ }
+
+ public void startBatch(ImmutableBytesWritable row) {
+ if (batchStarted) {
+ throw new RuntimeException("Cannot start new batch without finishing existing one.");
+ }
+ batchStarted = true;
+ batchSize = 0;
+ batchStartKey = row;
+ batchHash = null;
+ }
+
+ public void hashResult(Result result) {
+ if (!batchStarted) {
+ throw new RuntimeException("Cannot add to batch that has not been started.");
+ }
+ for (Cell cell : result.rawCells()) {
+ int rowLength = cell.getRowLength();
+ int familyLength = cell.getFamilyLength();
+ int qualifierLength = cell.getQualifierLength();
+ int valueLength = cell.getValueLength();
+ digest.update(cell.getRowArray(), cell.getRowOffset(), rowLength);
+ digest.update(cell.getFamilyArray(), cell.getFamilyOffset(), familyLength);
+ digest.update(cell.getQualifierArray(), cell.getQualifierOffset(), qualifierLength);
+ long ts = cell.getTimestamp();
+ for (int i = 8; i > 0; i--) {
+ digest.update((byte) ts);
+ ts >>>= 8;
+ }
+ digest.update(cell.getValueArray(), cell.getValueOffset(), valueLength);
+
+ batchSize += rowLength + familyLength + qualifierLength + 8 + valueLength;
+ }
+ }
+
+ public void finishBatch() {
+ if (!batchStarted) {
+ throw new RuntimeException("Cannot finish batch that has not started.");
+ }
+ batchStarted = false;
+ batchHash = new ImmutableBytesWritable(digest.digest());
+ }
+
+ public boolean isBatchStarted() {
+ return batchStarted;
+ }
+
+ public ImmutableBytesWritable getBatchStartKey() {
+ return batchStartKey;
+ }
+
+ public ImmutableBytesWritable getBatchHash() {
+ return batchHash;
+ }
+
+ public long getBatchSize() {
+ return batchSize;
+ }
+ }
+
+ public static class HashMapper
+ extends TableMapper<ImmutableBytesWritable, ImmutableBytesWritable> {
+
+ private ResultHasher hasher;
+ private long targetBatchSize;
+
+ private ImmutableBytesWritable currentRow;
+
+ @Override
+ protected void setup(Context context) throws IOException, InterruptedException {
+ targetBatchSize = context.getConfiguration()
+ .getLong(HASH_BATCH_SIZE_CONF_KEY, DEFAULT_BATCH_SIZE);
+ hasher = new ResultHasher();
+
+ TableSplit split = (TableSplit) context.getInputSplit();
+ hasher.startBatch(new ImmutableBytesWritable(split.getStartRow()));
+ }
+
+ @Override
+ protected void map(ImmutableBytesWritable key, Result value, Context context)
+ throws IOException, InterruptedException {
+
+ if (currentRow == null || !currentRow.equals(key)) {
+ currentRow = new ImmutableBytesWritable(key); // not immutable
+
+ if (hasher.getBatchSize() >= targetBatchSize) {
+ hasher.finishBatch();
+ context.write(hasher.getBatchStartKey(), hasher.getBatchHash());
+ hasher.startBatch(currentRow);
+ }
+ }
+
+ hasher.hashResult(value);
+ }
+
+ @Override
+ protected void cleanup(Context context) throws IOException, InterruptedException {
+ hasher.finishBatch();
+ context.write(hasher.getBatchStartKey(), hasher.getBatchHash());
+ }
+ }
+
+ private void writeTempManifestFile() throws IOException {
+ Path tempManifestPath = new Path(destPath, TMP_MANIFEST_FILE_NAME);
+ FileSystem fs = tempManifestPath.getFileSystem(getConf());
+ tableHash.writePropertiesFile(fs, tempManifestPath);
+ }
+
+ private void completeManifest() throws IOException {
+ Path tempManifestPath = new Path(destPath, TMP_MANIFEST_FILE_NAME);
+ Path manifestPath = new Path(destPath, MANIFEST_FILE_NAME);
+ FileSystem fs = tempManifestPath.getFileSystem(getConf());
+ fs.rename(tempManifestPath, manifestPath);
+ }
+
+ private static final int NUM_ARGS = 2;
+ private static void printUsage(final String errorMsg) {
+ if (errorMsg != null && errorMsg.length() > 0) {
+ System.err.println("ERROR: " + errorMsg);
+ System.err.println();
+ }
+ System.err.println("Usage: HashTable [options] <tablename> <outputpath>");
+ System.err.println();
+ System.err.println("Options:");
+ System.err.println(" batchsize the target amount of bytes to hash in each batch");
+ System.err.println(" rows are added to the batch until this size is reached");
+ System.err.println(" (defaults to " + DEFAULT_BATCH_SIZE + " bytes)");
+ System.err.println(" numhashfiles the number of hash files to create");
+ System.err.println(" if set to fewer than number of regions then");
+ System.err.println(" the job will create this number of reducers");
+ System.err.println(" (defaults to 1/100 of regions -- at least 1)");
+ System.err.println(" startrow the start row");
+ System.err.println(" stoprow the stop row");
+ System.err.println(" starttime beginning of the time range (unixtime in millis)");
+ System.err.println(" without endtime means from starttime to forever");
+ System.err.println(" endtime end of the time range. Ignored if no starttime specified.");
+ System.err.println(" scanbatch scanner batch size to support intra row scans");
+ System.err.println(" versions number of cell versions to include");
+ System.err.println(" families comma-separated list of families to include");
+ System.err.println();
+ System.err.println("Args:");
+ System.err.println(" tablename Name of the table to hash");
+ System.err.println(" outputpath Filesystem path to put the output data");
+ System.err.println();
+ System.err.println("Examples:");
+ System.err.println(" To hash 'TestTable' in 32kB batches for a 1 hour window into 50 files:");
+ System.err.println(" $ bin/hbase " +
+ "org.apache.hadoop.hbase.mapreduce.HashTable --batchsize=32000 --numhashfiles=50"
+ + " --starttime=1265875194289 --endtime=1265878794289 --families=cf2,cf3"
+ + " TestTable /hashes/testTable");
+ }
+
+ private boolean doCommandLine(final String[] args) {
+ if (args.length < NUM_ARGS) {
+ printUsage(null);
+ return false;
+ }
+ try {
+
+ tableHash.tableName = args[args.length-2];
+ destPath = new Path(args[args.length-1]);
+
+ for (int i = 0; i < args.length - NUM_ARGS; i++) {
+ String cmd = args[i];
+ if (cmd.equals("-h") || cmd.startsWith("--h")) {
+ printUsage(null);
+ return false;
+ }
+
+ final String batchSizeArgKey = "--batchsize=";
+ if (cmd.startsWith(batchSizeArgKey)) {
+ tableHash.batchSize = Long.parseLong(cmd.substring(batchSizeArgKey.length()));
+ continue;
+ }
+
+ final String numHashFilesArgKey = "--numhashfiles=";
+ if (cmd.startsWith(numHashFilesArgKey)) {
+ tableHash.numHashFiles = Integer.parseInt(cmd.substring(numHashFilesArgKey.length()));
+ continue;
+ }
+
+ final String startRowArgKey = "--startrow=";
+ if (cmd.startsWith(startRowArgKey)) {
+ tableHash.startRow = Bytes.fromHex(cmd.substring(startRowArgKey.length()));
+ continue;
+ }
+
+ final String stopRowArgKey = "--stoprow=";
+ if (cmd.startsWith(stopRowArgKey)) {
+ tableHash.stopRow = Bytes.fromHex(cmd.substring(stopRowArgKey.length()));
+ continue;
+ }
+
+ final String startTimeArgKey = "--starttime=";
+ if (cmd.startsWith(startTimeArgKey)) {
+ tableHash.startTime = Long.parseLong(cmd.substring(startTimeArgKey.length()));
+ continue;
+ }
+
+ final String endTimeArgKey = "--endtime=";
+ if (cmd.startsWith(endTimeArgKey)) {
+ tableHash.endTime = Long.parseLong(cmd.substring(endTimeArgKey.length()));
+ continue;
+ }
+
+ final String scanBatchArgKey = "--scanbatch=";
+ if (cmd.startsWith(scanBatchArgKey)) {
+ tableHash.scanBatch = Integer.parseInt(cmd.substring(scanBatchArgKey.length()));
+ continue;
+ }
+
+ final String versionsArgKey = "--versions=";
+ if (cmd.startsWith(versionsArgKey)) {
+ tableHash.versions = Integer.parseInt(cmd.substring(versionsArgKey.length()));
+ continue;
+ }
+
+ final String familiesArgKey = "--families=";
+ if (cmd.startsWith(familiesArgKey)) {
+ tableHash.families = cmd.substring(familiesArgKey.length());
+ continue;
+ }
+
+ printUsage("Invalid argument '" + cmd + "'");
+ return false;
+ }
+ if ((tableHash.startTime != 0 || tableHash.endTime != 0)
+ && (tableHash.startTime >= tableHash.endTime)) {
+ printUsage("Invalid time range filter: starttime="
+ + tableHash.startTime + " >= endtime=" + tableHash.endTime);
+ return false;
+ }
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ printUsage("Can't start because " + e.getMessage());
+ return false;
+ }
+ return true;
+ }
+
+ /**
+ * Main entry point.
+ */
+ public static void main(String[] args) throws Exception {
+ int ret = ToolRunner.run(new HashTable(HBaseConfiguration.create()), args);
+ System.exit(ret);
+ }
+
+ @Override
+ public int run(String[] args) throws Exception {
+ String[] otherArgs = new GenericOptionsParser(getConf(), args).getRemainingArgs();
+ if (!doCommandLine(otherArgs)) {
+ return 1;
+ }
+
+ Job job = createSubmittableJob(otherArgs);
+ writeTempManifestFile();
+ if (!job.waitForCompletion(true)) {
+ LOG.info("Map-reduce job failed!");
+ return 1;
+ }
+ completeManifest();
+ return 0;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/61c9a250/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/SyncTable.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/SyncTable.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/SyncTable.java
new file mode 100644
index 0000000..37d91b1
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/SyncTable.java
@@ -0,0 +1,772 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellComparator;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.HConnection;
+import org.apache.hadoop.hbase.client.HConnectionManager;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
+import org.apache.hadoop.util.GenericOptionsParser;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+import com.google.common.base.Throwables;
+import com.google.common.collect.Iterators;
+
+public class SyncTable extends Configured implements Tool {
+
+ private static final Log LOG = LogFactory.getLog(SyncTable.class);
+
+ static final String SOURCE_HASH_DIR_CONF_KEY = "sync.table.source.hash.dir";
+ static final String SOURCE_TABLE_CONF_KEY = "sync.table.source.table.name";
+ static final String TARGET_TABLE_CONF_KEY = "sync.table.target.table.name";
+ static final String SOURCE_ZK_CLUSTER_CONF_KEY = "sync.table.source.zk.cluster";
+ static final String TARGET_ZK_CLUSTER_CONF_KEY = "sync.table.target.zk.cluster";
+ static final String DRY_RUN_CONF_KEY="sync.table.dry.run";
+
+ Path sourceHashDir;
+ String sourceTableName;
+ String targetTableName;
+
+ String sourceZkCluster;
+ String targetZkCluster;
+ boolean dryRun;
+
+ Counters counters;
+
+ public SyncTable(Configuration conf) {
+ super(conf);
+ }
+
+ public Job createSubmittableJob(String[] args) throws IOException {
+ FileSystem fs = sourceHashDir.getFileSystem(getConf());
+ if (!fs.exists(sourceHashDir)) {
+ throw new IOException("Source hash dir not found: " + sourceHashDir);
+ }
+
+ HashTable.TableHash tableHash = HashTable.TableHash.read(getConf(), sourceHashDir);
+ LOG.info("Read source hash manifest: " + tableHash);
+ LOG.info("Read " + tableHash.partitions.size() + " partition keys");
+ if (!tableHash.tableName.equals(sourceTableName)) {
+ LOG.warn("Table name mismatch - manifest indicates hash was taken from: "
+ + tableHash.tableName + " but job is reading from: " + sourceTableName);
+ }
+ if (tableHash.numHashFiles != tableHash.partitions.size() + 1) {
+ throw new RuntimeException("Hash data appears corrupt. The number of of hash files created"
+ + " should be 1 more than the number of partition keys. However, the manifest file "
+ + " says numHashFiles=" + tableHash.numHashFiles + " but the number of partition keys"
+ + " found in the partitions file is " + tableHash.partitions.size());
+ }
+
+ Path dataDir = new Path(sourceHashDir, HashTable.HASH_DATA_DIR);
+ int dataSubdirCount = 0;
+ for (FileStatus file : fs.listStatus(dataDir)) {
+ if (file.getPath().getName().startsWith(HashTable.OUTPUT_DATA_FILE_PREFIX)) {
+ dataSubdirCount++;
+ }
+ }
+
+ if (dataSubdirCount != tableHash.numHashFiles) {
+ throw new RuntimeException("Hash data appears corrupt. The number of of hash files created"
+ + " should be 1 more than the number of partition keys. However, the number of data dirs"
+ + " found is " + dataSubdirCount + " but the number of partition keys"
+ + " found in the partitions file is " + tableHash.partitions.size());
+ }
+
+ Job job = Job.getInstance(getConf(),getConf().get("mapreduce.job.name",
+ "syncTable_" + sourceTableName + "-" + targetTableName));
+ Configuration jobConf = job.getConfiguration();
+ job.setJarByClass(HashTable.class);
+ jobConf.set(SOURCE_HASH_DIR_CONF_KEY, sourceHashDir.toString());
+ jobConf.set(SOURCE_TABLE_CONF_KEY, sourceTableName);
+ jobConf.set(TARGET_TABLE_CONF_KEY, targetTableName);
+ if (sourceZkCluster != null) {
+ jobConf.set(SOURCE_ZK_CLUSTER_CONF_KEY, sourceZkCluster);
+ }
+ if (targetZkCluster != null) {
+ jobConf.set(TARGET_ZK_CLUSTER_CONF_KEY, targetZkCluster);
+ }
+ jobConf.setBoolean(DRY_RUN_CONF_KEY, dryRun);
+
+ TableMapReduceUtil.initTableMapperJob(targetTableName, tableHash.initScan(),
+ SyncMapper.class, null, null, job);
+
+ job.setNumReduceTasks(0);
+
+ if (dryRun) {
+ job.setOutputFormatClass(NullOutputFormat.class);
+ } else {
+ // No reducers. Just write straight to table. Call initTableReducerJob
+ // because it sets up the TableOutputFormat.
+ TableMapReduceUtil.initTableReducerJob(targetTableName, null, job, null,
+ targetZkCluster, null, null);
+
+ // would be nice to add an option for bulk load instead
+ }
+
+ return job;
+ }
+
+ public static class SyncMapper extends TableMapper<ImmutableBytesWritable, Mutation> {
+ Path sourceHashDir;
+
+ HConnection sourceConnection;
+ HConnection targetConnection;
+ HTableInterface sourceTable;
+ HTableInterface targetTable;
+ boolean dryRun;
+
+ HashTable.TableHash sourceTableHash;
+ HashTable.TableHash.Reader sourceHashReader;
+ ImmutableBytesWritable currentSourceHash;
+ ImmutableBytesWritable nextSourceKey;
+ HashTable.ResultHasher targetHasher;
+
+ Throwable mapperException;
+
+ public static enum Counter {BATCHES, HASHES_MATCHED, HASHES_NOT_MATCHED, SOURCEMISSINGROWS,
+ SOURCEMISSINGCELLS, TARGETMISSINGROWS, TARGETMISSINGCELLS, ROWSWITHDIFFS, DIFFERENTCELLVALUES,
+ MATCHINGROWS, MATCHINGCELLS, EMPTY_BATCHES, RANGESMATCHED, RANGESNOTMATCHED};
+
+ @Override
+ protected void setup(Context context) throws IOException {
+
+ Configuration conf = context.getConfiguration();
+ sourceHashDir = new Path(conf.get(SOURCE_HASH_DIR_CONF_KEY));
+ sourceConnection = openConnection(conf, SOURCE_ZK_CLUSTER_CONF_KEY);
+ targetConnection = openConnection(conf, TARGET_ZK_CLUSTER_CONF_KEY);
+ sourceTable = openTable(sourceConnection, conf, SOURCE_TABLE_CONF_KEY);
+ targetTable = openTable(targetConnection, conf, TARGET_TABLE_CONF_KEY);
+ dryRun = conf.getBoolean(SOURCE_TABLE_CONF_KEY, false);
+
+ sourceTableHash = HashTable.TableHash.read(conf, sourceHashDir);
+ LOG.info("Read source hash manifest: " + sourceTableHash);
+ LOG.info("Read " + sourceTableHash.partitions.size() + " partition keys");
+
+ TableSplit split = (TableSplit) context.getInputSplit();
+ ImmutableBytesWritable splitStartKey = new ImmutableBytesWritable(split.getStartRow());
+
+ sourceHashReader = sourceTableHash.newReader(conf, splitStartKey);
+ findNextKeyHashPair();
+
+ // create a hasher, but don't start it right away
+ // instead, find the first hash batch at or after the start row
+ // and skip any rows that come before. they will be caught by the previous task
+ targetHasher = new HashTable.ResultHasher();
+ }
+
+ private static HConnection openConnection(Configuration conf, String zkClusterConfKey)
+ throws IOException {
+ Configuration clusterConf = new Configuration(conf);
+ String zkCluster = conf.get(zkClusterConfKey);
+ if (zkCluster != null) {
+ ZKUtil.applyClusterKeyToConf(clusterConf, zkCluster);
+ }
+ return HConnectionManager.createConnection(clusterConf);
+ }
+
+ private static HTableInterface openTable(HConnection connection, Configuration conf,
+ String tableNameConfKey) throws IOException {
+ return connection.getTable(TableName.valueOf(conf.get(tableNameConfKey)));
+ }
+
+ /**
+ * Attempt to read the next source key/hash pair.
+ * If there are no more, set nextSourceKey to null
+ */
+ private void findNextKeyHashPair() throws IOException {
+ boolean hasNext = sourceHashReader.next();
+ if (hasNext) {
+ nextSourceKey = sourceHashReader.getCurrentKey();
+ } else {
+ // no more keys - last hash goes to the end
+ nextSourceKey = null;
+ }
+ }
+
+ @Override
+ protected void map(ImmutableBytesWritable key, Result value, Context context)
+ throws IOException, InterruptedException {
+ try {
+ // first, finish any hash batches that end before the scanned row
+ while (nextSourceKey != null && key.compareTo(nextSourceKey) >= 0) {
+ moveToNextBatch(context);
+ }
+
+ // next, add the scanned row (as long as we've reached the first batch)
+ if (targetHasher.isBatchStarted()) {
+ targetHasher.hashResult(value);
+ }
+ } catch (Throwable t) {
+ mapperException = t;
+ Throwables.propagateIfInstanceOf(t, IOException.class);
+ Throwables.propagateIfInstanceOf(t, InterruptedException.class);
+ Throwables.propagate(t);
+ }
+ }
+
+ /**
+ * If there is an open hash batch, complete it and sync if there are diffs.
+ * Start a new batch, and seek to read the
+ */
+ private void moveToNextBatch(Context context) throws IOException, InterruptedException {
+ if (targetHasher.isBatchStarted()) {
+ finishBatchAndCompareHashes(context);
+ }
+ targetHasher.startBatch(nextSourceKey);
+ currentSourceHash = sourceHashReader.getCurrentHash();
+
+ findNextKeyHashPair();
+ }
+
+ /**
+ * Finish the currently open hash batch.
+ * Compare the target hash to the given source hash.
+ * If they do not match, then sync the covered key range.
+ */
+ private void finishBatchAndCompareHashes(Context context)
+ throws IOException, InterruptedException {
+ targetHasher.finishBatch();
+ context.getCounter(Counter.BATCHES).increment(1);
+ if (targetHasher.getBatchSize() == 0) {
+ context.getCounter(Counter.EMPTY_BATCHES).increment(1);
+ }
+ ImmutableBytesWritable targetHash = targetHasher.getBatchHash();
+ if (targetHash.equals(currentSourceHash)) {
+ context.getCounter(Counter.HASHES_MATCHED).increment(1);
+ } else {
+ context.getCounter(Counter.HASHES_NOT_MATCHED).increment(1);
+
+ ImmutableBytesWritable stopRow = nextSourceKey == null
+ ? new ImmutableBytesWritable(sourceTableHash.stopRow)
+ : nextSourceKey;
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Hash mismatch. Key range: " + toHex(targetHasher.getBatchStartKey())
+ + " to " + toHex(stopRow)
+ + " sourceHash: " + toHex(currentSourceHash)
+ + " targetHash: " + toHex(targetHash));
+ }
+
+ syncRange(context, targetHasher.getBatchStartKey(), stopRow);
+ }
+ }
+ private static String toHex(ImmutableBytesWritable bytes) {
+ return Bytes.toHex(bytes.get(), bytes.getOffset(), bytes.getLength());
+ }
+
+ private static final CellScanner EMPTY_CELL_SCANNER
+ = new CellScanner(Iterators.<Result>emptyIterator());
+
+ /**
+ * Rescan the given range directly from the source and target tables.
+ * Count and log differences, and if this is not a dry run, output Puts and Deletes
+ * to make the target table match the source table for this range
+ */
+ private void syncRange(Context context, ImmutableBytesWritable startRow,
+ ImmutableBytesWritable stopRow) throws IOException, InterruptedException {
+
+ Scan scan = sourceTableHash.initScan();
+ scan.setStartRow(startRow.copyBytes());
+ scan.setStopRow(stopRow.copyBytes());
+
+ ResultScanner sourceScanner = sourceTable.getScanner(scan);
+ CellScanner sourceCells = new CellScanner(sourceScanner.iterator());
+
+ ResultScanner targetScanner = targetTable.getScanner(scan);
+ CellScanner targetCells = new CellScanner(targetScanner.iterator());
+
+ boolean rangeMatched = true;
+ byte[] nextSourceRow = sourceCells.nextRow();
+ byte[] nextTargetRow = targetCells.nextRow();
+ while(nextSourceRow != null || nextTargetRow != null) {
+ boolean rowMatched;
+ int rowComparison = compareRowKeys(nextSourceRow, nextTargetRow);
+ if (rowComparison < 0) {
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Target missing row: " + Bytes.toHex(nextSourceRow));
+ }
+ context.getCounter(Counter.TARGETMISSINGROWS).increment(1);
+
+ rowMatched = syncRowCells(context, nextSourceRow, sourceCells, EMPTY_CELL_SCANNER);
+ nextSourceRow = sourceCells.nextRow(); // advance only source to next row
+ } else if (rowComparison > 0) {
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Source missing row: " + Bytes.toHex(nextTargetRow));
+ }
+ context.getCounter(Counter.SOURCEMISSINGROWS).increment(1);
+
+ rowMatched = syncRowCells(context, nextTargetRow, EMPTY_CELL_SCANNER, targetCells);
+ nextTargetRow = targetCells.nextRow(); // advance only target to next row
+ } else {
+ // current row is the same on both sides, compare cell by cell
+ rowMatched = syncRowCells(context, nextSourceRow, sourceCells, targetCells);
+ nextSourceRow = sourceCells.nextRow();
+ nextTargetRow = targetCells.nextRow();
+ }
+
+ if (!rowMatched) {
+ rangeMatched = false;
+ }
+ }
+
+ sourceScanner.close();
+ targetScanner.close();
+
+ context.getCounter(rangeMatched ? Counter.RANGESMATCHED : Counter.RANGESNOTMATCHED)
+ .increment(1);
+ }
+
+ private static class CellScanner {
+ private final Iterator<Result> results;
+
+ private byte[] currentRow;
+ private Result currentRowResult;
+ private int nextCellInRow;
+
+ private Result nextRowResult;
+
+ public CellScanner(Iterator<Result> results) {
+ this.results = results;
+ }
+
+ /**
+ * Advance to the next row and return its row key.
+ * Returns null iff there are no more rows.
+ */
+ public byte[] nextRow() {
+ if (nextRowResult == null) {
+ // no cached row - check scanner for more
+ while (results.hasNext()) {
+ nextRowResult = results.next();
+ Cell nextCell = nextRowResult.rawCells()[0];
+ if (currentRow == null
+ || !Bytes.equals(currentRow, 0, currentRow.length, nextCell.getRowArray(),
+ nextCell.getRowOffset(), nextCell.getRowLength())) {
+ // found next row
+ break;
+ } else {
+ // found another result from current row, keep scanning
+ nextRowResult = null;
+ }
+ }
+
+ if (nextRowResult == null) {
+ // end of data, no more rows
+ currentRowResult = null;
+ currentRow = null;
+ return null;
+ }
+ }
+
+ // advance to cached result for next row
+ currentRowResult = nextRowResult;
+ nextCellInRow = 0;
+ currentRow = currentRowResult.getRow();
+ nextRowResult = null;
+ return currentRow;
+ }
+
+ /**
+ * Returns the next Cell in the current row or null iff none remain.
+ */
+ public Cell nextCellInRow() {
+ if (currentRowResult == null) {
+ // nothing left in current row
+ return null;
+ }
+
+ Cell nextCell = currentRowResult.rawCells()[nextCellInRow];
+ nextCellInRow++;
+ if (nextCellInRow == currentRowResult.size()) {
+ if (results.hasNext()) {
+ Result result = results.next();
+ Cell cell = result.rawCells()[0];
+ if (Bytes.equals(currentRow, 0, currentRow.length, cell.getRowArray(),
+ cell.getRowOffset(), cell.getRowLength())) {
+ // result is part of current row
+ currentRowResult = result;
+ nextCellInRow = 0;
+ } else {
+ // result is part of next row, cache it
+ nextRowResult = result;
+ // current row is complete
+ currentRowResult = null;
+ }
+ } else {
+ // end of data
+ currentRowResult = null;
+ }
+ }
+ return nextCell;
+ }
+ }
+
+ /**
+ * Compare the cells for the given row from the source and target tables.
+ * Count and log any differences.
+ * If not a dry run, output a Put and/or Delete needed to sync the target table
+ * to match the source table.
+ */
+ private boolean syncRowCells(Context context, byte[] rowKey, CellScanner sourceCells,
+ CellScanner targetCells) throws IOException, InterruptedException {
+ Put put = null;
+ Delete delete = null;
+ long matchingCells = 0;
+ boolean matchingRow = true;
+ Cell sourceCell = sourceCells.nextCellInRow();
+ Cell targetCell = targetCells.nextCellInRow();
+ while (sourceCell != null || targetCell != null) {
+
+ int cellKeyComparison = compareCellKeysWithinRow(sourceCell, targetCell);
+ if (cellKeyComparison < 0) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Target missing cell: " + sourceCell);
+ }
+ context.getCounter(Counter.TARGETMISSINGCELLS).increment(1);
+ matchingRow = false;
+
+ if (!dryRun) {
+ if (put == null) {
+ put = new Put(rowKey);
+ }
+ put.add(sourceCell);
+ }
+
+ sourceCell = sourceCells.nextCellInRow();
+ } else if (cellKeyComparison > 0) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Source missing cell: " + targetCell);
+ }
+ context.getCounter(Counter.SOURCEMISSINGCELLS).increment(1);
+ matchingRow = false;
+
+ if (!dryRun) {
+ if (delete == null) {
+ delete = new Delete(rowKey);
+ }
+ // add a tombstone to exactly match the target cell that is missing on the source
+ delete.deleteColumn(CellUtil.cloneFamily(targetCell),
+ CellUtil.cloneQualifier(targetCell), targetCell.getTimestamp());
+ }
+
+ targetCell = targetCells.nextCellInRow();
+ } else {
+ // the cell keys are equal, now check values
+ if (CellUtil.matchingValue(sourceCell, targetCell)) {
+ matchingCells++;
+ } else {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Different values: ");
+ LOG.debug(" source cell: " + sourceCell
+ + " value: " + Bytes.toHex(sourceCell.getValueArray(),
+ sourceCell.getValueOffset(), sourceCell.getValueLength()));
+ LOG.debug(" target cell: " + targetCell
+ + " value: " + Bytes.toHex(targetCell.getValueArray(),
+ targetCell.getValueOffset(), targetCell.getValueLength()));
+ }
+ context.getCounter(Counter.DIFFERENTCELLVALUES).increment(1);
+ matchingRow = false;
+
+ if (!dryRun) {
+ // overwrite target cell
+ if (put == null) {
+ put = new Put(rowKey);
+ }
+ put.add(sourceCell);
+ }
+ }
+ sourceCell = sourceCells.nextCellInRow();
+ targetCell = targetCells.nextCellInRow();
+ }
+
+ if (!dryRun && sourceTableHash.scanBatch > 0) {
+ if (put != null && put.size() >= sourceTableHash.scanBatch) {
+ context.write(new ImmutableBytesWritable(rowKey), put);
+ put = null;
+ }
+ if (delete != null && delete.size() >= sourceTableHash.scanBatch) {
+ context.write(new ImmutableBytesWritable(rowKey), delete);
+ delete = null;
+ }
+ }
+ }
+
+ if (!dryRun) {
+ if (put != null) {
+ context.write(new ImmutableBytesWritable(rowKey), put);
+ }
+ if (delete != null) {
+ context.write(new ImmutableBytesWritable(rowKey), delete);
+ }
+ }
+
+ if (matchingCells > 0) {
+ context.getCounter(Counter.MATCHINGCELLS).increment(matchingCells);
+ }
+ if (matchingRow) {
+ context.getCounter(Counter.MATCHINGROWS).increment(1);
+ return true;
+ } else {
+ context.getCounter(Counter.ROWSWITHDIFFS).increment(1);
+ return false;
+ }
+ }
+
+ /**
+ * Compare row keys of the given Result objects.
+ * Nulls are after non-nulls
+ */
+ private static int compareRowKeys(byte[] r1, byte[] r2) {
+ if (r1 == null) {
+ return 1; // source missing row
+ } else if (r2 == null) {
+ return -1; // target missing row
+ } else {
+ return CellComparator.compareRows(r1, 0, r1.length, r2, 0, r2.length);
+ }
+ }
+
+ /**
+ * Compare families, qualifiers, and timestamps of the given Cells.
+ * They are assumed to be of the same row.
+ * Nulls are after non-nulls.
+ */
+ private static int compareCellKeysWithinRow(Cell c1, Cell c2) {
+ if (c1 == null) {
+ return 1; // source missing cell
+ }
+ if (c2 == null) {
+ return -1; // target missing cell
+ }
+
+ int result = CellComparator.compareFamilies(c1, c2);
+ if (result != 0) {
+ return result;
+ }
+
+ result = CellComparator.compareQualifiers(c1, c2);
+ if (result != 0) {
+ return result;
+ }
+
+ // note timestamp comparison is inverted - more recent cells first
+ return CellComparator.compareTimestamps(c1, c2);
+ }
+
+ @Override
+ protected void cleanup(Context context)
+ throws IOException, InterruptedException {
+ if (mapperException == null) {
+ try {
+ finishRemainingHashRanges(context);
+ } catch (Throwable t) {
+ mapperException = t;
+ }
+ }
+
+ try {
+ sourceTable.close();
+ targetTable.close();
+ sourceConnection.close();
+ targetConnection.close();
+ } catch (Throwable t) {
+ if (mapperException == null) {
+ mapperException = t;
+ } else {
+ LOG.error("Suppressing exception from closing tables", t);
+ }
+ }
+
+ // propagate first exception
+ if (mapperException != null) {
+ Throwables.propagateIfInstanceOf(mapperException, IOException.class);
+ Throwables.propagateIfInstanceOf(mapperException, InterruptedException.class);
+ Throwables.propagate(mapperException);
+ }
+ }
+
+ private void finishRemainingHashRanges(Context context) throws IOException,
+ InterruptedException {
+ TableSplit split = (TableSplit) context.getInputSplit();
+ byte[] splitEndRow = split.getEndRow();
+ boolean reachedEndOfTable = HashTable.isTableEndRow(splitEndRow);
+
+ // if there are more hash batches that begin before the end of this split move to them
+ while (nextSourceKey != null
+ && (nextSourceKey.compareTo(splitEndRow) < 0 || reachedEndOfTable)) {
+ moveToNextBatch(context);
+ }
+
+ if (targetHasher.isBatchStarted()) {
+ // need to complete the final open hash batch
+
+ if ((nextSourceKey != null && nextSourceKey.compareTo(splitEndRow) > 0)
+ || (nextSourceKey == null && !Bytes.equals(splitEndRow, sourceTableHash.stopRow))) {
+ // the open hash range continues past the end of this region
+ // add a scan to complete the current hash range
+ Scan scan = sourceTableHash.initScan();
+ scan.setStartRow(splitEndRow);
+ if (nextSourceKey == null) {
+ scan.setStopRow(sourceTableHash.stopRow);
+ } else {
+ scan.setStopRow(nextSourceKey.copyBytes());
+ }
+
+ ResultScanner targetScanner = targetTable.getScanner(scan);
+ for (Result row : targetScanner) {
+ targetHasher.hashResult(row);
+ }
+ } // else current batch ends exactly at split end row
+
+ finishBatchAndCompareHashes(context);
+ }
+ }
+ }
+
+ private static final int NUM_ARGS = 3;
+ private static void printUsage(final String errorMsg) {
+ if (errorMsg != null && errorMsg.length() > 0) {
+ System.err.println("ERROR: " + errorMsg);
+ System.err.println();
+ }
+ System.err.println("Usage: SyncTable [options] <sourcehashdir> <sourcetable> <targettable>");
+ System.err.println();
+ System.err.println("Options:");
+
+ System.err.println(" sourcezkcluster ZK cluster key of the source table");
+ System.err.println(" (defaults to cluster in classpath's config)");
+ System.err.println(" targetzkcluster ZK cluster key of the target table");
+ System.err.println(" (defaults to cluster in classpath's config)");
+ System.err.println(" dryrun if true, output counters but no writes");
+ System.err.println(" (defaults to false)");
+ System.err.println();
+ System.err.println("Args:");
+ System.err.println(" sourcehashdir path to HashTable output dir for source table");
+ System.err.println(" if not specified, then all data will be scanned");
+ System.err.println(" sourcetable Name of the source table to sync from");
+ System.err.println(" targettable Name of the target table to sync to");
+ System.err.println();
+ System.err.println("Examples:");
+ System.err.println(" For a dry run SyncTable of tableA from a remote source cluster");
+ System.err.println(" to a local target cluster:");
+ System.err.println(" $ bin/hbase " +
+ "org.apache.hadoop.hbase.mapreduce.SyncTable --dryrun=true"
+ + " --sourcezkcluster=zk1.example.com,zk2.example.com,zk3.example.com:2181:/hbase"
+ + " hdfs://nn:9000/hashes/tableA tableA tableA");
+ }
+
+ private boolean doCommandLine(final String[] args) {
+ if (args.length < NUM_ARGS) {
+ printUsage(null);
+ return false;
+ }
+ try {
+ sourceHashDir = new Path(args[args.length - 3]);
+ sourceTableName = args[args.length - 2];
+ targetTableName = args[args.length - 1];
+
+ for (int i = 0; i < args.length - NUM_ARGS; i++) {
+ String cmd = args[i];
+ if (cmd.equals("-h") || cmd.startsWith("--h")) {
+ printUsage(null);
+ return false;
+ }
+
+ final String sourceZkClusterKey = "--sourcezkcluster=";
+ if (cmd.startsWith(sourceZkClusterKey)) {
+ sourceZkCluster = cmd.substring(sourceZkClusterKey.length());
+ continue;
+ }
+
+ final String targetZkClusterKey = "--targetzkcluster=";
+ if (cmd.startsWith(targetZkClusterKey)) {
+ targetZkCluster = cmd.substring(targetZkClusterKey.length());
+ continue;
+ }
+
+ final String dryRunKey = "--dryrun=";
+ if (cmd.startsWith(dryRunKey)) {
+ dryRun = Boolean.parseBoolean(cmd.substring(dryRunKey.length()));
+ continue;
+ }
+
+ printUsage("Invalid argument '" + cmd + "'");
+ return false;
+ }
+
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ printUsage("Can't start because " + e.getMessage());
+ return false;
+ }
+ return true;
+ }
+
+ /**
+ * Main entry point.
+ */
+ public static void main(String[] args) throws Exception {
+ int ret = ToolRunner.run(new SyncTable(HBaseConfiguration.create()), args);
+ System.exit(ret);
+ }
+
+ @Override
+ public int run(String[] args) throws Exception {
+ String[] otherArgs = new GenericOptionsParser(getConf(), args).getRemainingArgs();
+ if (!doCommandLine(otherArgs)) {
+ return 1;
+ }
+
+ Job job = createSubmittableJob(otherArgs);
+ if (!job.waitForCompletion(true)) {
+ LOG.info("Map-reduce job failed!");
+ return 1;
+ }
+ counters = job.getCounters();
+ return 0;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/61c9a250/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHashTable.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHashTable.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHashTable.java
new file mode 100644
index 0000000..906a3c7
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHashTable.java
@@ -0,0 +1,192 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.MapFile;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
+
+/**
+ * Basic test for the HashTable M/R tool
+ */
+@Category(LargeTests.class)
+public class TestHashTable {
+
+ private static final Log LOG = LogFactory.getLog(TestHashTable.class);
+
+ private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+
+ @BeforeClass
+ public static void beforeClass() throws Exception {
+ TEST_UTIL.startMiniCluster(3);
+ TEST_UTIL.startMiniMapReduceCluster();
+ }
+
+ @AfterClass
+ public static void afterClass() throws Exception {
+ TEST_UTIL.shutdownMiniMapReduceCluster();
+ TEST_UTIL.shutdownMiniCluster();
+ }
+
+ @Test
+ public void testHashTable() throws Exception {
+ final String tableName = "testHashTable";
+ final byte[] family = Bytes.toBytes("family");
+ final byte[] column1 = Bytes.toBytes("c1");
+ final byte[] column2 = Bytes.toBytes("c2");
+ final byte[] column3 = Bytes.toBytes("c3");
+
+ int numRows = 100;
+ int numRegions = 10;
+ int numHashFiles = 3;
+
+ byte[][] splitRows = new byte[numRegions-1][];
+ for (int i = 1; i < numRegions; i++) {
+ splitRows[i-1] = Bytes.toBytes(numRows * i / numRegions);
+ }
+
+ long timestamp = 1430764183454L;
+ // put rows into the first table
+ HTable t1 = TEST_UTIL.createTable(TableName.valueOf(tableName), family, splitRows);
+ for (int i = 0; i < numRows; i++) {
+ Put p = new Put(Bytes.toBytes(i), timestamp);
+ p.add(family, column1, column1);
+ p.add(family, column2, column2);
+ p.add(family, column3, column3);
+ t1.put(p);
+ }
+ t1.close();
+
+ HashTable hashTable = new HashTable(TEST_UTIL.getConfiguration());
+
+ Path testDir = TEST_UTIL.getDataTestDirOnTestFS(tableName);
+
+ long batchSize = 300;
+ int code = hashTable.run(new String[] {
+ "--batchsize=" + batchSize,
+ "--numhashfiles=" + numHashFiles,
+ "--scanbatch=2",
+ tableName,
+ testDir.toString()});
+ assertEquals("test job failed", 0, code);
+
+ FileSystem fs = TEST_UTIL.getTestFileSystem();
+
+ HashTable.TableHash tableHash = HashTable.TableHash.read(fs.getConf(), testDir);
+ assertEquals(tableName, tableHash.tableName);
+ assertEquals(batchSize, tableHash.batchSize);
+ assertEquals(numHashFiles, tableHash.numHashFiles);
+ assertEquals(numHashFiles - 1, tableHash.partitions.size());
+ for (ImmutableBytesWritable bytes : tableHash.partitions) {
+ LOG.debug("partition: " + Bytes.toInt(bytes.get()));
+ }
+
+ ImmutableMap<Integer, ImmutableBytesWritable> expectedHashes
+ = ImmutableMap.<Integer, ImmutableBytesWritable>builder()
+ .put(-1, new ImmutableBytesWritable(Bytes.fromHex("714cb10a9e3b5569852980edd8c6ca2f")))
+ .put(5, new ImmutableBytesWritable(Bytes.fromHex("28d961d9252ce8f8d44a07b38d3e1d96")))
+ .put(10, new ImmutableBytesWritable(Bytes.fromHex("f6bbc4a224d8fd929b783a92599eaffa")))
+ .put(15, new ImmutableBytesWritable(Bytes.fromHex("522deb5d97f73a414ecc11457be46881")))
+ .put(20, new ImmutableBytesWritable(Bytes.fromHex("b026f2611aaa46f7110116d807545352")))
+ .put(25, new ImmutableBytesWritable(Bytes.fromHex("39ffc1a3094aa12a2e90ffd9cef2ce93")))
+ .put(30, new ImmutableBytesWritable(Bytes.fromHex("f6b4d75727ce9a30ac29e4f08f601666")))
+ .put(35, new ImmutableBytesWritable(Bytes.fromHex("422e2d2f1eb79a8f02171a705a42c090")))
+ .put(40, new ImmutableBytesWritable(Bytes.fromHex("559ad61c900fffefea0a15abf8a97bc3")))
+ .put(45, new ImmutableBytesWritable(Bytes.fromHex("23019084513eca41cee436b2a29611cb")))
+ .put(50, new ImmutableBytesWritable(Bytes.fromHex("b40467d222ddb4949b142fe145ee9edc")))
+ .put(55, new ImmutableBytesWritable(Bytes.fromHex("372bf89fcd8ca4b7ab3c1add9d07f7e4")))
+ .put(60, new ImmutableBytesWritable(Bytes.fromHex("69ae0585e6255de27dce974e332b8f8b")))
+ .put(65, new ImmutableBytesWritable(Bytes.fromHex("8029610044297aad0abdbecd485d8e59")))
+ .put(70, new ImmutableBytesWritable(Bytes.fromHex("de5f784f7f78987b6e57ecfd81c8646f")))
+ .put(75, new ImmutableBytesWritable(Bytes.fromHex("1cd757cc4e1715c8c3b1c24447a1ec56")))
+ .put(80, new ImmutableBytesWritable(Bytes.fromHex("f9a53aacfeb6142b08066615e7038095")))
+ .put(85, new ImmutableBytesWritable(Bytes.fromHex("89b872b7e639df32d3276b33928c0c91")))
+ .put(90, new ImmutableBytesWritable(Bytes.fromHex("45eeac0646d46a474ea0484175faed38")))
+ .put(95, new ImmutableBytesWritable(Bytes.fromHex("f57c447e32a08f4bf1abb2892839ac56")))
+ .build();
+
+ Map<Integer, ImmutableBytesWritable> actualHashes
+ = new HashMap<Integer, ImmutableBytesWritable>();
+ Path dataDir = new Path(testDir, HashTable.HASH_DATA_DIR);
+ for (int i = 0; i < numHashFiles; i++) {
+ Path hashPath = new Path(dataDir, HashTable.TableHash.getDataFileName(i));
+
+ MapFile.Reader reader = new MapFile.Reader(hashPath, fs.getConf());
+ ImmutableBytesWritable key = new ImmutableBytesWritable();
+ ImmutableBytesWritable hash = new ImmutableBytesWritable();
+ while(reader.next(key, hash)) {
+ String keyString = Bytes.toHex(key.get(), key.getOffset(), key.getLength());
+ LOG.debug("Key: " + (keyString.isEmpty() ? "-1" : Integer.parseInt(keyString, 16))
+ + " Hash: " + Bytes.toHex(hash.get(), hash.getOffset(), hash.getLength()));
+
+ int intKey = -1;
+ if (key.getLength() > 0) {
+ intKey = Bytes.toInt(key.get(), key.getOffset(), key.getLength());
+ }
+ if (actualHashes.containsKey(intKey)) {
+ Assert.fail("duplicate key in data files: " + intKey);
+ }
+ actualHashes.put(intKey, new ImmutableBytesWritable(hash.copyBytes()));
+ }
+ reader.close();
+ }
+
+ FileStatus[] files = fs.listStatus(testDir);
+ for (FileStatus file : files) {
+ LOG.debug("Output file: " + file.getPath());
+ }
+
+ files = fs.listStatus(dataDir);
+ for (FileStatus file : files) {
+ LOG.debug("Data file: " + file.getPath());
+ }
+
+ if (!expectedHashes.equals(actualHashes)) {
+ LOG.error("Diff: " + Maps.difference(expectedHashes, actualHashes));
+ }
+ Assert.assertEquals(expectedHashes, actualHashes);
+
+ TEST_UTIL.deleteTable(tableName);
+ TEST_UTIL.cleanupDataTestDirOnTestFS();
+ }
+
+
+}