You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by bu...@apache.org on 2017/08/27 05:33:28 UTC

[28/50] [abbrv] hbase git commit: HBASE-18640 Move mapreduce out of hbase-server into separate module.

http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TsvImporterCustomTestMapperForOprAttr.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TsvImporterCustomTestMapperForOprAttr.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TsvImporterCustomTestMapperForOprAttr.java
new file mode 100644
index 0000000..a9da98b
--- /dev/null
+++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TsvImporterCustomTestMapperForOprAttr.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.mapreduce.ImportTsv.TsvParser.BadTsvLineException;
+import org.apache.hadoop.hbase.mapreduce.ImportTsv.TsvParser.ParsedLine;
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ * Just shows a simple example of how the attributes can be extracted and added
+ * to the puts
+ */
+public class TsvImporterCustomTestMapperForOprAttr extends TsvImporterMapper {
+  @Override
+  protected void populatePut(byte[] lineBytes, ParsedLine parsed, Put put, int i)
+      throws BadTsvLineException, IOException {
+    KeyValue kv;
+    kv = new KeyValue(lineBytes, parsed.getRowKeyOffset(), parsed.getRowKeyLength(),
+        parser.getFamily(i), 0, parser.getFamily(i).length, parser.getQualifier(i), 0,
+        parser.getQualifier(i).length, ts, KeyValue.Type.Put, lineBytes, parsed.getColumnOffset(i),
+        parsed.getColumnLength(i));
+    if (parsed.getIndividualAttributes() != null) {
+      String[] attributes = parsed.getIndividualAttributes();
+      for (String attr : attributes) {
+        String[] split = attr.split(ImportTsv.DEFAULT_ATTRIBUTES_SEPERATOR);
+        if (split == null || split.length <= 1) {
+          throw new BadTsvLineException("Invalid attributes seperator specified" + attributes);
+        } else {
+          if (split[0].length() <= 0 || split[1].length() <= 0) {
+            throw new BadTsvLineException("Invalid attributes seperator specified" + attributes);
+          }
+          put.setAttribute(split[0], Bytes.toBytes(split[1]));
+        }
+      }
+    }
+    put.add(kv);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java
new file mode 100644
index 0000000..69c4c7c
--- /dev/null
+++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java
@@ -0,0 +1,1059 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.replication;
+
+import static org.junit.Assert.*;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.Waiter;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
+import org.apache.hadoop.hbase.mapreduce.replication.VerifyReplication;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.replication.regionserver.Replication;
+import org.apache.hadoop.hbase.replication.regionserver.ReplicationSource;
+import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
+import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.testclassification.ReplicationTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.JVMClusterUtil;
+import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
+import org.apache.hadoop.hbase.wal.WAL;
+import org.apache.hadoop.hbase.wal.WALKey;
+import org.apache.hadoop.mapreduce.Job;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists;
+
+@Category({ReplicationTests.class, LargeTests.class})
+public class TestReplicationSmallTests extends TestReplicationBase {
+
+  private static final Log LOG = LogFactory.getLog(TestReplicationSmallTests.class);
+  private static final String PEER_ID = "2";
+
+  @Rule
+  public TestName name = new TestName();
+
+  /**
+   * @throws java.lang.Exception
+   */
+  @Before
+  public void setUp() throws Exception {
+    // Starting and stopping replication can make us miss new logs,
+    // rolling like this makes sure the most recent one gets added to the queue
+    for ( JVMClusterUtil.RegionServerThread r :
+        utility1.getHBaseCluster().getRegionServerThreads()) {
+      utility1.getAdmin().rollWALWriter(r.getRegionServer().getServerName());
+    }
+    int rowCount = utility1.countRows(tableName);
+    utility1.deleteTableData(tableName);
+    // truncating the table will send one Delete per row to the slave cluster
+    // in an async fashion, which is why we cannot just call deleteTableData on
+    // utility2 since late writes could make it to the slave in some way.
+    // Instead, we truncate the first table and wait for all the Deletes to
+    // make it to the slave.
+    Scan scan = new Scan();
+    int lastCount = 0;
+    for (int i = 0; i < NB_RETRIES; i++) {
+      if (i==NB_RETRIES-1) {
+        fail("Waited too much time for truncate");
+      }
+      ResultScanner scanner = htable2.getScanner(scan);
+      Result[] res = scanner.next(rowCount);
+      scanner.close();
+      if (res.length != 0) {
+        if (res.length < lastCount) {
+          i--; // Don't increment timeout if we make progress
+        }
+        lastCount = res.length;
+        LOG.info("Still got " + res.length + " rows");
+        Thread.sleep(SLEEP_TIME);
+      } else {
+        break;
+      }
+    }
+  }
+
+  /**
+   * Verify that version and column delete marker types are replicated
+   * correctly.
+   * @throws Exception
+   */
+  @Test(timeout=300000)
+  public void testDeleteTypes() throws Exception {
+    LOG.info("testDeleteTypes");
+    final byte[] v1 = Bytes.toBytes("v1");
+    final byte[] v2 = Bytes.toBytes("v2");
+    final byte[] v3 = Bytes.toBytes("v3");
+    htable1 = utility1.getConnection().getTable(tableName);
+
+    long t = EnvironmentEdgeManager.currentTime();
+    // create three versions for "row"
+    Put put = new Put(row);
+    put.addColumn(famName, row, t, v1);
+    htable1.put(put);
+
+    put = new Put(row);
+    put.addColumn(famName, row, t + 1, v2);
+    htable1.put(put);
+
+    put = new Put(row);
+    put.addColumn(famName, row, t + 2, v3);
+    htable1.put(put);
+
+    Get get = new Get(row);
+    get.setMaxVersions();
+    for (int i = 0; i < NB_RETRIES; i++) {
+      if (i==NB_RETRIES-1) {
+        fail("Waited too much time for put replication");
+      }
+      Result res = htable2.get(get);
+      if (res.size() < 3) {
+        LOG.info("Rows not available");
+        Thread.sleep(SLEEP_TIME);
+      } else {
+        assertArrayEquals(CellUtil.cloneValue(res.rawCells()[0]), v3);
+        assertArrayEquals(CellUtil.cloneValue(res.rawCells()[1]), v2);
+        assertArrayEquals(CellUtil.cloneValue(res.rawCells()[2]), v1);
+        break;
+      }
+    }
+    // place a version delete marker (delete last version)
+    Delete d = new Delete(row);
+    d.addColumn(famName, row, t);
+    htable1.delete(d);
+
+    get = new Get(row);
+    get.setMaxVersions();
+    for (int i = 0; i < NB_RETRIES; i++) {
+      if (i==NB_RETRIES-1) {
+        fail("Waited too much time for put replication");
+      }
+      Result res = htable2.get(get);
+      if (res.size() > 2) {
+        LOG.info("Version not deleted");
+        Thread.sleep(SLEEP_TIME);
+      } else {
+        assertArrayEquals(CellUtil.cloneValue(res.rawCells()[0]), v3);
+        assertArrayEquals(CellUtil.cloneValue(res.rawCells()[1]), v2);
+        break;
+      }
+    }
+
+    // place a column delete marker
+    d = new Delete(row);
+    d.addColumns(famName, row, t+2);
+    htable1.delete(d);
+
+    // now *both* of the remaining version should be deleted
+    // at the replica
+    get = new Get(row);
+    for (int i = 0; i < NB_RETRIES; i++) {
+      if (i==NB_RETRIES-1) {
+        fail("Waited too much time for del replication");
+      }
+      Result res = htable2.get(get);
+      if (res.size() >= 1) {
+        LOG.info("Rows not deleted");
+        Thread.sleep(SLEEP_TIME);
+      } else {
+        break;
+      }
+    }
+  }
+
+  /**
+   * Add a row, check it's replicated, delete it, check's gone
+   * @throws Exception
+   */
+  @Test(timeout=300000)
+  public void testSimplePutDelete() throws Exception {
+    LOG.info("testSimplePutDelete");
+    Put put = new Put(row);
+    put.addColumn(famName, row, row);
+
+    htable1 = utility1.getConnection().getTable(tableName);
+    htable1.put(put);
+
+    Get get = new Get(row);
+    for (int i = 0; i < NB_RETRIES; i++) {
+      if (i==NB_RETRIES-1) {
+        fail("Waited too much time for put replication");
+      }
+      Result res = htable2.get(get);
+      if (res.isEmpty()) {
+        LOG.info("Row not available");
+        Thread.sleep(SLEEP_TIME);
+      } else {
+        assertArrayEquals(res.value(), row);
+        break;
+      }
+    }
+
+    Delete del = new Delete(row);
+    htable1.delete(del);
+
+    get = new Get(row);
+    for (int i = 0; i < NB_RETRIES; i++) {
+      if (i==NB_RETRIES-1) {
+        fail("Waited too much time for del replication");
+      }
+      Result res = htable2.get(get);
+      if (res.size() >= 1) {
+        LOG.info("Row not deleted");
+        Thread.sleep(SLEEP_TIME);
+      } else {
+        break;
+      }
+    }
+  }
+
+  /**
+   * Try a small batch upload using the write buffer, check it's replicated
+   * @throws Exception
+   */
+  @Test(timeout=300000)
+  public void testSmallBatch() throws Exception {
+    LOG.info("testSmallBatch");
+    // normal Batch tests
+    loadData("", row);
+
+    Scan scan = new Scan();
+
+    ResultScanner scanner1 = htable1.getScanner(scan);
+    Result[] res1 = scanner1.next(NB_ROWS_IN_BATCH);
+    scanner1.close();
+    assertEquals(NB_ROWS_IN_BATCH, res1.length);
+
+    waitForReplication(NB_ROWS_IN_BATCH, NB_RETRIES);
+  }
+
+  private void waitForReplication(int expectedRows, int retries) throws IOException, InterruptedException {
+    Scan scan;
+    for (int i = 0; i < retries; i++) {
+      scan = new Scan();
+      if (i== retries -1) {
+        fail("Waited too much time for normal batch replication");
+      }
+      ResultScanner scanner = htable2.getScanner(scan);
+      Result[] res = scanner.next(expectedRows);
+      scanner.close();
+      if (res.length != expectedRows) {
+        LOG.info("Only got " + res.length + " rows");
+        Thread.sleep(SLEEP_TIME);
+      } else {
+        break;
+      }
+    }
+  }
+
+  private void loadData(String prefix, byte[] row) throws IOException {
+    List<Put> puts = new ArrayList<>(NB_ROWS_IN_BATCH);
+    for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
+      Put put = new Put(Bytes.toBytes(prefix + Integer.toString(i)));
+      put.addColumn(famName, row, row);
+      puts.add(put);
+    }
+    htable1.put(puts);
+  }
+
+  /**
+   * Test disable/enable replication, trying to insert, make sure nothing's
+   * replicated, enable it, the insert should be replicated
+   *
+   * @throws Exception
+   */
+  @Test(timeout = 300000)
+  public void testDisableEnable() throws Exception {
+
+    // Test disabling replication
+    admin.disablePeer(PEER_ID);
+
+    byte[] rowkey = Bytes.toBytes("disable enable");
+    Put put = new Put(rowkey);
+    put.addColumn(famName, row, row);
+    htable1.put(put);
+
+    Get get = new Get(rowkey);
+    for (int i = 0; i < NB_RETRIES; i++) {
+      Result res = htable2.get(get);
+      if (res.size() >= 1) {
+        fail("Replication wasn't disabled");
+      } else {
+        LOG.info("Row not replicated, let's wait a bit more...");
+        Thread.sleep(SLEEP_TIME);
+      }
+    }
+
+    // Test enable replication
+    admin.enablePeer(PEER_ID);
+
+    for (int i = 0; i < NB_RETRIES; i++) {
+      Result res = htable2.get(get);
+      if (res.isEmpty()) {
+        LOG.info("Row not available");
+        Thread.sleep(SLEEP_TIME);
+      } else {
+        assertArrayEquals(res.value(), row);
+        return;
+      }
+    }
+    fail("Waited too much time for put replication");
+  }
+
+  /**
+   * Integration test for TestReplicationAdmin, removes and re-add a peer
+   * cluster
+   *
+   * @throws Exception
+   */
+  @Test(timeout=300000)
+  public void testAddAndRemoveClusters() throws Exception {
+    LOG.info("testAddAndRemoveClusters");
+    admin.removePeer(PEER_ID);
+    Thread.sleep(SLEEP_TIME);
+    byte[] rowKey = Bytes.toBytes("Won't be replicated");
+    Put put = new Put(rowKey);
+    put.addColumn(famName, row, row);
+    htable1.put(put);
+
+    Get get = new Get(rowKey);
+    for (int i = 0; i < NB_RETRIES; i++) {
+      if (i == NB_RETRIES-1) {
+        break;
+      }
+      Result res = htable2.get(get);
+      if (res.size() >= 1) {
+        fail("Not supposed to be replicated");
+      } else {
+        LOG.info("Row not replicated, let's wait a bit more...");
+        Thread.sleep(SLEEP_TIME);
+      }
+    }
+    ReplicationPeerConfig rpc = new ReplicationPeerConfig();
+    rpc.setClusterKey(utility2.getClusterKey());
+    admin.addPeer(PEER_ID, rpc, null);
+    Thread.sleep(SLEEP_TIME);
+    rowKey = Bytes.toBytes("do rep");
+    put = new Put(rowKey);
+    put.addColumn(famName, row, row);
+    LOG.info("Adding new row");
+    htable1.put(put);
+
+    get = new Get(rowKey);
+    for (int i = 0; i < NB_RETRIES; i++) {
+      if (i==NB_RETRIES-1) {
+        fail("Waited too much time for put replication");
+      }
+      Result res = htable2.get(get);
+      if (res.isEmpty()) {
+        LOG.info("Row not available");
+        Thread.sleep(SLEEP_TIME*i);
+      } else {
+        assertArrayEquals(res.value(), row);
+        break;
+      }
+    }
+  }
+
+
+  /**
+   * Do a more intense version testSmallBatch, one  that will trigger
+   * wal rolling and other non-trivial code paths
+   * @throws Exception
+   */
+  @Test(timeout=300000)
+  public void testLoading() throws Exception {
+    LOG.info("Writing out rows to table1 in testLoading");
+    List<Put> puts = new ArrayList<>(NB_ROWS_IN_BIG_BATCH);
+    for (int i = 0; i < NB_ROWS_IN_BIG_BATCH; i++) {
+      Put put = new Put(Bytes.toBytes(i));
+      put.addColumn(famName, row, row);
+      puts.add(put);
+    }
+    // The puts will be iterated through and flushed only when the buffer
+    // size is reached.
+    htable1.put(puts);
+
+    Scan scan = new Scan();
+
+    ResultScanner scanner = htable1.getScanner(scan);
+    Result[] res = scanner.next(NB_ROWS_IN_BIG_BATCH);
+    scanner.close();
+
+    assertEquals(NB_ROWS_IN_BIG_BATCH, res.length);
+
+    LOG.info("Looking in table2 for replicated rows in testLoading");
+    long start = System.currentTimeMillis();
+    // Retry more than NB_RETRIES.  As it was, retries were done in 5 seconds and we'd fail
+    // sometimes.
+    final long retries = NB_RETRIES * 10;
+    for (int i = 0; i < retries; i++) {
+      scan = new Scan();
+      scanner = htable2.getScanner(scan);
+      res = scanner.next(NB_ROWS_IN_BIG_BATCH);
+      scanner.close();
+      if (res.length != NB_ROWS_IN_BIG_BATCH) {
+        if (i == retries - 1) {
+          int lastRow = -1;
+          for (Result result : res) {
+            int currentRow = Bytes.toInt(result.getRow());
+            for (int row = lastRow+1; row < currentRow; row++) {
+              LOG.error("Row missing: " + row);
+            }
+            lastRow = currentRow;
+          }
+          LOG.error("Last row: " + lastRow);
+          fail("Waited too much time for normal batch replication, " +
+            res.length + " instead of " + NB_ROWS_IN_BIG_BATCH + "; waited=" +
+            (System.currentTimeMillis() - start) + "ms");
+        } else {
+          LOG.info("Only got " + res.length + " rows... retrying");
+          Thread.sleep(SLEEP_TIME);
+        }
+      } else {
+        break;
+      }
+    }
+  }
+
+  /**
+   * Do a small loading into a table, make sure the data is really the same,
+   * then run the VerifyReplication job to check the results. Do a second
+   * comparison where all the cells are different.
+   * @throws Exception
+   */
+  @Test(timeout=300000)
+  public void testVerifyRepJob() throws Exception {
+    // Populate the tables, at the same time it guarantees that the tables are
+    // identical since it does the check
+    testSmallBatch();
+
+    String[] args = new String[] {PEER_ID, tableName.getNameAsString()};
+    runVerifyReplication(args, NB_ROWS_IN_BATCH, 0);
+
+    Scan scan = new Scan();
+    ResultScanner rs = htable2.getScanner(scan);
+    Put put = null;
+    for (Result result : rs) {
+      put = new Put(result.getRow());
+      Cell firstVal = result.rawCells()[0];
+      put.addColumn(CellUtil.cloneFamily(firstVal), CellUtil.cloneQualifier(firstVal),
+          Bytes.toBytes("diff data"));
+      htable2.put(put);
+    }
+    Delete delete = new Delete(put.getRow());
+    htable2.delete(delete);
+    runVerifyReplication(args, 0, NB_ROWS_IN_BATCH);
+  }
+
+  /**
+   * Load a row into a table, make sure the data is really the same,
+   * delete the row, make sure the delete marker is replicated,
+   * run verify replication with and without raw to check the results.
+   * @throws Exception
+   */
+  @Test(timeout=300000)
+  public void testVerifyRepJobWithRawOptions() throws Exception {
+    LOG.info(name.getMethodName());
+
+    final TableName tableName = TableName.valueOf(name.getMethodName());
+    byte[] familyname = Bytes.toBytes("fam_raw");
+    byte[] row = Bytes.toBytes("row_raw");
+
+    Table lHtable1 = null;
+    Table lHtable2 = null;
+
+    try {
+      HTableDescriptor table = new HTableDescriptor(tableName);
+      HColumnDescriptor fam = new HColumnDescriptor(familyname);
+      fam.setMaxVersions(100);
+      fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
+      table.addFamily(fam);
+      scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
+      for (HColumnDescriptor f : table.getColumnFamilies()) {
+        scopes.put(f.getName(), f.getScope());
+      }
+
+      Connection connection1 = ConnectionFactory.createConnection(conf1);
+      Connection connection2 = ConnectionFactory.createConnection(conf2);
+      try (Admin admin1 = connection1.getAdmin()) {
+        admin1.createTable(table, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE);
+      }
+      try (Admin admin2 = connection2.getAdmin()) {
+        admin2.createTable(table, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE);
+      }
+      utility1.waitUntilAllRegionsAssigned(tableName);
+      utility2.waitUntilAllRegionsAssigned(tableName);
+
+      lHtable1 = utility1.getConnection().getTable(tableName);
+      lHtable2 = utility2.getConnection().getTable(tableName);
+
+      Put put = new Put(row);
+      put.addColumn(familyname, row, row);
+      lHtable1.put(put);
+
+      Get get = new Get(row);
+      for (int i = 0; i < NB_RETRIES; i++) {
+        if (i==NB_RETRIES-1) {
+          fail("Waited too much time for put replication");
+        }
+        Result res = lHtable2.get(get);
+        if (res.isEmpty()) {
+          LOG.info("Row not available");
+          Thread.sleep(SLEEP_TIME);
+        } else {
+          assertArrayEquals(res.value(), row);
+          break;
+        }
+      }
+
+      Delete del = new Delete(row);
+      lHtable1.delete(del);
+
+      get = new Get(row);
+      for (int i = 0; i < NB_RETRIES; i++) {
+        if (i==NB_RETRIES-1) {
+          fail("Waited too much time for del replication");
+        }
+        Result res = lHtable2.get(get);
+        if (res.size() >= 1) {
+          LOG.info("Row not deleted");
+          Thread.sleep(SLEEP_TIME);
+        } else {
+          break;
+        }
+      }
+
+      // Checking verifyReplication for the default behavior.
+      String[] argsWithoutRaw = new String[] {PEER_ID, tableName.getNameAsString()};
+      runVerifyReplication(argsWithoutRaw, 0, 0);
+
+      // Checking verifyReplication with raw
+      String[] argsWithRawAsTrue = new String[] {"--raw", PEER_ID, tableName.getNameAsString()};
+      runVerifyReplication(argsWithRawAsTrue, 1, 0);
+    } finally {
+      if (lHtable1 != null) {
+        lHtable1.close();
+      }
+      if (lHtable2 != null) {
+        lHtable2.close();
+      }
+    }
+  }
+
+  private void runVerifyReplication(String[] args, int expectedGoodRows, int expectedBadRows)
+      throws IOException, InterruptedException, ClassNotFoundException {
+    Job job = new VerifyReplication().createSubmittableJob(new Configuration(conf1), args);
+    if (job == null) {
+      fail("Job wasn't created, see the log");
+    }
+    if (!job.waitForCompletion(true)) {
+      fail("Job failed, see the log");
+    }
+    assertEquals(expectedGoodRows, job.getCounters().
+        findCounter(VerifyReplication.Verifier.Counters.GOODROWS).getValue());
+    assertEquals(expectedBadRows, job.getCounters().
+        findCounter(VerifyReplication.Verifier.Counters.BADROWS).getValue());
+  }
+
+  @Test(timeout=300000)
+  // VerifyReplication should honor versions option
+  public void testHBase14905() throws Exception {
+    // normal Batch tests
+    byte[] qualifierName = Bytes.toBytes("f1");
+    Put put = new Put(Bytes.toBytes("r1"));
+    put.addColumn(famName, qualifierName, Bytes.toBytes("v1002"));
+    htable1.put(put);
+    put.addColumn(famName, qualifierName, Bytes.toBytes("v1001"));
+    htable1.put(put);
+    put.addColumn(famName, qualifierName, Bytes.toBytes("v1112"));
+    htable1.put(put);
+
+    Scan scan = new Scan();
+    scan.setMaxVersions(100);
+    ResultScanner scanner1 = htable1.getScanner(scan);
+    Result[] res1 = scanner1.next(1);
+    scanner1.close();
+
+    assertEquals(1, res1.length);
+    assertEquals(3, res1[0].getColumnCells(famName, qualifierName).size());
+
+    for (int i = 0; i < NB_RETRIES; i++) {
+      scan = new Scan();
+      scan.setMaxVersions(100);
+      scanner1 = htable2.getScanner(scan);
+      res1 = scanner1.next(1);
+      scanner1.close();
+      if (res1.length != 1) {
+        LOG.info("Only got " + res1.length + " rows");
+        Thread.sleep(SLEEP_TIME);
+      } else {
+        int cellNumber = res1[0].getColumnCells(famName, Bytes.toBytes("f1")).size();
+        if (cellNumber != 3) {
+          LOG.info("Only got " + cellNumber + " cells");
+          Thread.sleep(SLEEP_TIME);
+        } else {
+          break;
+        }
+      }
+      if (i == NB_RETRIES-1) {
+        fail("Waited too much time for normal batch replication");
+      }
+    }
+
+    put.addColumn(famName, qualifierName, Bytes.toBytes("v1111"));
+    htable2.put(put);
+    put.addColumn(famName, qualifierName, Bytes.toBytes("v1112"));
+    htable2.put(put);
+
+    scan = new Scan();
+    scan.setMaxVersions(100);
+    scanner1 = htable2.getScanner(scan);
+    res1 = scanner1.next(NB_ROWS_IN_BATCH);
+    scanner1.close();
+
+    assertEquals(1, res1.length);
+    assertEquals(5, res1[0].getColumnCells(famName, qualifierName).size());
+
+    String[] args = new String[] {"--versions=100", PEER_ID, tableName.getNameAsString()};
+    runVerifyReplication(args, 0, 1);
+  }
+
+  @Test(timeout=300000)
+  // VerifyReplication should honor versions option
+  public void testVersionMismatchHBase14905() throws Exception {
+    // normal Batch tests
+    byte[] qualifierName = Bytes.toBytes("f1");
+    Put put = new Put(Bytes.toBytes("r1"));
+    long ts = System.currentTimeMillis();
+    put.addColumn(famName, qualifierName, ts + 1, Bytes.toBytes("v1"));
+    htable1.put(put);
+    put.addColumn(famName, qualifierName, ts + 2, Bytes.toBytes("v2"));
+    htable1.put(put);
+    put.addColumn(famName, qualifierName, ts + 3, Bytes.toBytes("v3"));
+    htable1.put(put);
+
+    Scan scan = new Scan();
+    scan.setMaxVersions(100);
+    ResultScanner scanner1 = htable1.getScanner(scan);
+    Result[] res1 = scanner1.next(1);
+    scanner1.close();
+
+    assertEquals(1, res1.length);
+    assertEquals(3, res1[0].getColumnCells(famName, qualifierName).size());
+
+    for (int i = 0; i < NB_RETRIES; i++) {
+      scan = new Scan();
+      scan.setMaxVersions(100);
+      scanner1 = htable2.getScanner(scan);
+      res1 = scanner1.next(1);
+      scanner1.close();
+      if (res1.length != 1) {
+        LOG.info("Only got " + res1.length + " rows");
+        Thread.sleep(SLEEP_TIME);
+      } else {
+        int cellNumber = res1[0].getColumnCells(famName, Bytes.toBytes("f1")).size();
+        if (cellNumber != 3) {
+          LOG.info("Only got " + cellNumber + " cells");
+          Thread.sleep(SLEEP_TIME);
+        } else {
+          break;
+        }
+      }
+      if (i == NB_RETRIES-1) {
+        fail("Waited too much time for normal batch replication");
+      }
+    }
+
+    try {
+      // Disabling replication and modifying the particular version of the cell to validate the feature.
+      admin.disablePeer(PEER_ID);
+      Put put2 = new Put(Bytes.toBytes("r1"));
+      put2.addColumn(famName, qualifierName, ts +2, Bytes.toBytes("v99"));
+      htable2.put(put2);
+
+      scan = new Scan();
+      scan.setMaxVersions(100);
+      scanner1 = htable2.getScanner(scan);
+      res1 = scanner1.next(NB_ROWS_IN_BATCH);
+      scanner1.close();
+      assertEquals(1, res1.length);
+      assertEquals(3, res1[0].getColumnCells(famName, qualifierName).size());
+
+      String[] args = new String[] {"--versions=100", PEER_ID, tableName.getNameAsString()};
+      runVerifyReplication(args, 0, 1);
+      }
+    finally {
+      admin.enablePeer(PEER_ID);
+    }
+  }
+
+  /**
+   * Test for HBASE-9038, Replication.scopeWALEdits would NPE if it wasn't filtering out
+   * the compaction WALEdit
+   * @throws Exception
+   */
+  @Test(timeout=300000)
+  public void testCompactionWALEdits() throws Exception {
+    WALProtos.CompactionDescriptor compactionDescriptor =
+        WALProtos.CompactionDescriptor.getDefaultInstance();
+    HRegionInfo hri = new HRegionInfo(htable1.getName(),
+      HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
+    WALEdit edit = WALEdit.createCompaction(hri, compactionDescriptor);
+    Replication.scopeWALEdits(new WALKey(), edit,
+      htable1.getConfiguration(), null);
+  }
+
+  /**
+   * Test for HBASE-8663
+   * Create two new Tables with colfamilies enabled for replication then run
+   * ReplicationAdmin.listReplicated(). Finally verify the table:colfamilies. Note:
+   * TestReplicationAdmin is a better place for this testing but it would need mocks.
+   * @throws Exception
+   */
+  @Test(timeout = 300000)
+  public void testVerifyListReplicatedTable() throws Exception {
+    LOG.info("testVerifyListReplicatedTable");
+
+    final String tName = "VerifyListReplicated_";
+    final String colFam = "cf1";
+    final int numOfTables = 3;
+
+    Admin hadmin = utility1.getAdmin();
+
+    // Create Tables
+    for (int i = 0; i < numOfTables; i++) {
+      HTableDescriptor ht = new HTableDescriptor(TableName.valueOf(tName + i));
+      HColumnDescriptor cfd = new HColumnDescriptor(colFam);
+      cfd.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
+      ht.addFamily(cfd);
+      hadmin.createTable(ht);
+    }
+
+    // verify the result
+    List<HashMap<String, String>> replicationColFams = admin.listReplicated();
+    int[] match = new int[numOfTables]; // array of 3 with init value of zero
+
+    for (int i = 0; i < replicationColFams.size(); i++) {
+      HashMap<String, String> replicationEntry = replicationColFams.get(i);
+      String tn = replicationEntry.get(ReplicationAdmin.TNAME);
+      if ((tn.startsWith(tName)) && replicationEntry.get(ReplicationAdmin.CFNAME).equals(colFam)) {
+        int m = Integer.parseInt(tn.substring(tn.length() - 1)); // get the last digit
+        match[m]++; // should only increase once
+      }
+    }
+
+    // check the matching result
+    for (int i = 0; i < match.length; i++) {
+      assertTrue("listReplicated() does not match table " + i, (match[i] == 1));
+    }
+
+    // drop tables
+    for (int i = 0; i < numOfTables; i++) {
+      TableName tableName = TableName.valueOf(tName + i);
+      hadmin.disableTable(tableName);
+      hadmin.deleteTable(tableName);
+    }
+
+    hadmin.close();
+  }
+
+  /**
+   *  Test for HBase-15259 WALEdits under replay will also be replicated
+   * */
+  @Test
+  public void testReplicationInReplay() throws Exception {
+    final TableName tableName = htable1.getName();
+
+    HRegion region = utility1.getMiniHBaseCluster().getRegions(tableName).get(0);
+    HRegionInfo hri = region.getRegionInfo();
+    NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
+    for (byte[] fam : htable1.getTableDescriptor().getFamiliesKeys()) {
+      scopes.put(fam, 1);
+    }
+    final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
+    int index = utility1.getMiniHBaseCluster().getServerWith(hri.getRegionName());
+    WAL wal = utility1.getMiniHBaseCluster().getRegionServer(index).getWAL(region.getRegionInfo());
+    final byte[] rowName = Bytes.toBytes("testReplicationInReplay");
+    final byte[] qualifier = Bytes.toBytes("q");
+    final byte[] value = Bytes.toBytes("v");
+    WALEdit edit = new WALEdit(true);
+    long now = EnvironmentEdgeManager.currentTime();
+    edit.add(new KeyValue(rowName, famName, qualifier,
+      now, value));
+    WALKey walKey = new WALKey(hri.getEncodedNameAsBytes(), tableName, now, mvcc, scopes);
+    wal.append(hri, walKey, edit, true);
+    wal.sync();
+
+    Get get = new Get(rowName);
+    for (int i = 0; i < NB_RETRIES; i++) {
+      if (i == NB_RETRIES-1) {
+        break;
+      }
+      Result res = htable2.get(get);
+      if (res.size() >= 1) {
+        fail("Not supposed to be replicated for " + Bytes.toString(res.getRow()));
+      } else {
+        LOG.info("Row not replicated, let's wait a bit more...");
+        Thread.sleep(SLEEP_TIME);
+      }
+    }
+  }
+
+  @Test(timeout=300000)
+  public void testVerifyReplicationPrefixFiltering() throws Exception {
+    final byte[] prefixRow = Bytes.toBytes("prefixrow");
+    final byte[] prefixRow2 = Bytes.toBytes("secondrow");
+    loadData("prefixrow", prefixRow);
+    loadData("secondrow", prefixRow2);
+    loadData("aaa", row);
+    loadData("zzz", row);
+    waitForReplication(NB_ROWS_IN_BATCH * 4, NB_RETRIES * 4);
+    String[] args = new String[] {"--row-prefixes=prefixrow,secondrow", PEER_ID,
+        tableName.getNameAsString()};
+    runVerifyReplication(args, NB_ROWS_IN_BATCH *2, 0);
+  }
+
+  @Test(timeout = 300000)
+  public void testVerifyReplicationSnapshotArguments() {
+    String[] args =
+        new String[] { "--sourceSnapshotName=snapshot1", "2", tableName.getNameAsString() };
+    assertFalse(Lists.newArrayList(args).toString(), new VerifyReplication().doCommandLine(args));
+
+    args = new String[] { "--sourceSnapshotTmpDir=tmp", "2", tableName.getNameAsString() };
+    assertFalse(Lists.newArrayList(args).toString(), new VerifyReplication().doCommandLine(args));
+
+    args = new String[] { "--sourceSnapshotName=snapshot1", "--sourceSnapshotTmpDir=tmp", "2",
+        tableName.getNameAsString() };
+    assertTrue(Lists.newArrayList(args).toString(), new VerifyReplication().doCommandLine(args));
+
+    args = new String[] { "--peerSnapshotName=snapshot1", "2", tableName.getNameAsString() };
+    assertFalse(Lists.newArrayList(args).toString(), new VerifyReplication().doCommandLine(args));
+
+    args = new String[] { "--peerSnapshotTmpDir=/tmp/", "2", tableName.getNameAsString() };
+    assertFalse(Lists.newArrayList(args).toString(), new VerifyReplication().doCommandLine(args));
+
+    args = new String[] { "--peerSnapshotName=snapshot1", "--peerSnapshotTmpDir=/tmp/",
+        "--peerFSAddress=tempfs", "--peerHBaseRootAddress=hdfs://tempfs:50070/hbase/", "2",
+        tableName.getNameAsString() };
+    assertTrue(Lists.newArrayList(args).toString(), new VerifyReplication().doCommandLine(args));
+
+    args = new String[] { "--sourceSnapshotName=snapshot1", "--sourceSnapshotTmpDir=/tmp/",
+        "--peerSnapshotName=snapshot2", "--peerSnapshotTmpDir=/tmp/", "--peerFSAddress=tempfs",
+        "--peerHBaseRootAddress=hdfs://tempfs:50070/hbase/", "2", tableName.getNameAsString() };
+
+    assertTrue(Lists.newArrayList(args).toString(), new VerifyReplication().doCommandLine(args));
+  }
+
+  @Test(timeout = 300000)
+  public void testVerifyReplicationWithSnapshotSupport() throws Exception {
+    // Populate the tables, at the same time it guarantees that the tables are
+    // identical since it does the check
+    testSmallBatch();
+
+    // Take source and target tables snapshot
+    Path rootDir = FSUtils.getRootDir(conf1);
+    FileSystem fs = rootDir.getFileSystem(conf1);
+    String sourceSnapshotName = "sourceSnapshot-" + System.currentTimeMillis();
+    SnapshotTestingUtils.createSnapshotAndValidate(utility1.getHBaseAdmin(), tableName,
+      new String(famName), sourceSnapshotName, rootDir, fs, true);
+
+    // Take target snapshot
+    Path peerRootDir = FSUtils.getRootDir(conf2);
+    FileSystem peerFs = peerRootDir.getFileSystem(conf2);
+    String peerSnapshotName = "peerSnapshot-" + System.currentTimeMillis();
+    SnapshotTestingUtils.createSnapshotAndValidate(utility2.getHBaseAdmin(), tableName,
+      new String(famName), peerSnapshotName, peerRootDir, peerFs, true);
+
+    String peerFSAddress = peerFs.getUri().toString();
+    String temPath1 = utility1.getRandomDir().toString();
+    String temPath2 = "/tmp2";
+
+    String[] args = new String[] { "--sourceSnapshotName=" + sourceSnapshotName,
+        "--sourceSnapshotTmpDir=" + temPath1, "--peerSnapshotName=" + peerSnapshotName,
+        "--peerSnapshotTmpDir=" + temPath2, "--peerFSAddress=" + peerFSAddress,
+        "--peerHBaseRootAddress=" + FSUtils.getRootDir(conf2), "2", tableName.getNameAsString() };
+
+    Job job = new VerifyReplication().createSubmittableJob(conf1, args);
+    if (job == null) {
+      fail("Job wasn't created, see the log");
+    }
+    if (!job.waitForCompletion(true)) {
+      fail("Job failed, see the log");
+    }
+    assertEquals(NB_ROWS_IN_BATCH,
+      job.getCounters().findCounter(VerifyReplication.Verifier.Counters.GOODROWS).getValue());
+    assertEquals(0,
+      job.getCounters().findCounter(VerifyReplication.Verifier.Counters.BADROWS).getValue());
+
+    Scan scan = new Scan();
+    ResultScanner rs = htable2.getScanner(scan);
+    Put put = null;
+    for (Result result : rs) {
+      put = new Put(result.getRow());
+      Cell firstVal = result.rawCells()[0];
+      put.addColumn(CellUtil.cloneFamily(firstVal), CellUtil.cloneQualifier(firstVal),
+        Bytes.toBytes("diff data"));
+      htable2.put(put);
+    }
+    Delete delete = new Delete(put.getRow());
+    htable2.delete(delete);
+
+    sourceSnapshotName = "sourceSnapshot-" + System.currentTimeMillis();
+    SnapshotTestingUtils.createSnapshotAndValidate(utility1.getHBaseAdmin(), tableName,
+      new String(famName), sourceSnapshotName, rootDir, fs, true);
+
+    peerSnapshotName = "peerSnapshot-" + System.currentTimeMillis();
+    SnapshotTestingUtils.createSnapshotAndValidate(utility2.getHBaseAdmin(), tableName,
+      new String(famName), peerSnapshotName, peerRootDir, peerFs, true);
+
+    args = new String[] { "--sourceSnapshotName=" + sourceSnapshotName,
+        "--sourceSnapshotTmpDir=" + temPath1, "--peerSnapshotName=" + peerSnapshotName,
+        "--peerSnapshotTmpDir=" + temPath2, "--peerFSAddress=" + peerFSAddress,
+        "--peerHBaseRootAddress=" + FSUtils.getRootDir(conf2), "2", tableName.getNameAsString() };
+
+    job = new VerifyReplication().createSubmittableJob(conf1, args);
+    if (job == null) {
+      fail("Job wasn't created, see the log");
+    }
+    if (!job.waitForCompletion(true)) {
+      fail("Job failed, see the log");
+    }
+    assertEquals(0,
+      job.getCounters().findCounter(VerifyReplication.Verifier.Counters.GOODROWS).getValue());
+    assertEquals(NB_ROWS_IN_BATCH,
+      job.getCounters().findCounter(VerifyReplication.Verifier.Counters.BADROWS).getValue());
+  }
+
+  @Test
+  public void testEmptyWALRecovery() throws Exception {
+    final int numRs = utility1.getHBaseCluster().getRegionServerThreads().size();
+
+    // for each RS, create an empty wal with same walGroupId
+    final List<Path> emptyWalPaths = new ArrayList<>();
+    long ts = System.currentTimeMillis();
+    for (int i = 0; i < numRs; i++) {
+      HRegionInfo regionInfo =
+          utility1.getHBaseCluster().getRegions(htable1.getName()).get(0).getRegionInfo();
+      WAL wal = utility1.getHBaseCluster().getRegionServer(i).getWAL(regionInfo);
+      Path currentWalPath = AbstractFSWALProvider.getCurrentFileName(wal);
+      String walGroupId = AbstractFSWALProvider.getWALPrefixFromWALName(currentWalPath.getName());
+      Path emptyWalPath = new Path(utility1.getDataTestDir(), walGroupId + "." + ts);
+      utility1.getTestFileSystem().create(emptyWalPath).close();
+      emptyWalPaths.add(emptyWalPath);
+    }
+
+    // inject our empty wal into the replication queue
+    for (int i = 0; i < numRs; i++) {
+      Replication replicationService =
+          (Replication) utility1.getHBaseCluster().getRegionServer(i).getReplicationSourceService();
+      replicationService.preLogRoll(null, emptyWalPaths.get(i));
+      replicationService.postLogRoll(null, emptyWalPaths.get(i));
+    }
+
+    // wait for ReplicationSource to start reading from our empty wal
+    waitForLogAdvance(numRs, emptyWalPaths, false);
+
+    // roll the original wal, which enqueues a new wal behind our empty wal
+    for (int i = 0; i < numRs; i++) {
+      HRegionInfo regionInfo =
+          utility1.getHBaseCluster().getRegions(htable1.getName()).get(0).getRegionInfo();
+      WAL wal = utility1.getHBaseCluster().getRegionServer(i).getWAL(regionInfo);
+      wal.rollWriter(true);
+    }
+
+    // ReplicationSource should advance past the empty wal, or else the test will fail
+    waitForLogAdvance(numRs, emptyWalPaths, true);
+
+    // we're now writing to the new wal
+    // if everything works, the source should've stopped reading from the empty wal, and start
+    // replicating from the new wal
+    testSimplePutDelete();
+  }
+
+  /**
+   * Waits for the ReplicationSource to start reading from the given paths
+   * @param numRs number of regionservers
+   * @param emptyWalPaths path for each regionserver
+   * @param invert if true, waits until ReplicationSource is NOT reading from the given paths
+   */
+  private void waitForLogAdvance(final int numRs, final List<Path> emptyWalPaths,
+      final boolean invert) throws Exception {
+    Waiter.waitFor(conf1, 10000, new Waiter.Predicate<Exception>() {
+      @Override
+      public boolean evaluate() throws Exception {
+        for (int i = 0; i < numRs; i++) {
+          Replication replicationService = (Replication) utility1.getHBaseCluster()
+              .getRegionServer(i).getReplicationSourceService();
+          for (ReplicationSourceInterface rsi : replicationService.getReplicationManager()
+              .getSources()) {
+            ReplicationSource source = (ReplicationSource) rsi;
+            if (!invert && !emptyWalPaths.get(i).equals(source.getCurrentPath())) {
+              return false;
+            }
+            if (invert && emptyWalPaths.get(i).equals(source.getCurrentPath())) {
+              return false;
+            }
+          }
+        }
+        return true;
+      }
+    });
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/snapshot/TestExportSnapshot.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/snapshot/TestExportSnapshot.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/snapshot/TestExportSnapshot.java
new file mode 100644
index 0000000..2e3cb5e
--- /dev/null
+++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/snapshot/TestExportSnapshot.java
@@ -0,0 +1,381 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.snapshot;
+
+import static org.apache.hadoop.util.ToolRunner.run;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.CategoryBasedTimeout;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.master.snapshot.SnapshotManager;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotRegionManifest;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.testclassification.VerySlowMapReduceTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.Ignore;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+import org.junit.rules.TestRule;
+
+/**
+ * Test Export Snapshot Tool
+ */
+@Ignore
+@Category({VerySlowMapReduceTests.class, LargeTests.class})
+public class TestExportSnapshot {
+  @Rule public final TestRule timeout = CategoryBasedTimeout.builder().
+      withTimeout(this.getClass()).withLookingForStuckThread(true).build();
+  private static final Log LOG = LogFactory.getLog(TestExportSnapshot.class);
+
+  protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+
+  protected final static byte[] FAMILY = Bytes.toBytes("cf");
+
+  @Rule
+  public final TestName testName = new TestName();
+
+  protected TableName tableName;
+  private byte[] emptySnapshotName;
+  private byte[] snapshotName;
+  private int tableNumFiles;
+  private Admin admin;
+
+  public static void setUpBaseConf(Configuration conf) {
+    conf.setBoolean(SnapshotManager.HBASE_SNAPSHOT_ENABLED, true);
+    conf.setInt("hbase.regionserver.msginterval", 100);
+    conf.setInt("hbase.client.pause", 250);
+    conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 6);
+    conf.setBoolean("hbase.master.enabletable.roundrobin", true);
+    conf.setInt("mapreduce.map.maxattempts", 10);
+    // If a single node has enough failures (default 3), resource manager will blacklist it.
+    // With only 2 nodes and tests injecting faults, we don't want that.
+    conf.setInt("mapreduce.job.maxtaskfailures.per.tracker", 100);
+  }
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    setUpBaseConf(TEST_UTIL.getConfiguration());
+    TEST_UTIL.startMiniCluster(1, 3);
+    TEST_UTIL.startMiniMapReduceCluster();
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    TEST_UTIL.shutdownMiniMapReduceCluster();
+    TEST_UTIL.shutdownMiniCluster();
+  }
+
+  /**
+   * Create a table and take a snapshot of the table used by the export test.
+   */
+  @Before
+  public void setUp() throws Exception {
+    this.admin = TEST_UTIL.getAdmin();
+
+    tableName = TableName.valueOf("testtb-" + testName.getMethodName());
+    snapshotName = Bytes.toBytes("snaptb0-" + testName.getMethodName());
+    emptySnapshotName = Bytes.toBytes("emptySnaptb0-" + testName.getMethodName());
+
+    // create Table
+    createTable();
+
+    // Take an empty snapshot
+    admin.snapshot(emptySnapshotName, tableName);
+
+    // Add some rows
+    SnapshotTestingUtils.loadData(TEST_UTIL, tableName, 50, FAMILY);
+    tableNumFiles = admin.getTableRegions(tableName).size();
+
+    // take a snapshot
+    admin.snapshot(snapshotName, tableName);
+  }
+
+  protected void createTable() throws Exception {
+    SnapshotTestingUtils.createPreSplitTable(TEST_UTIL, tableName, 2, FAMILY);
+  }
+
+  protected interface RegionPredicate {
+    boolean evaluate(final HRegionInfo regionInfo);
+  }
+
+  protected RegionPredicate getBypassRegionPredicate() {
+    return null;
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    TEST_UTIL.deleteTable(tableName);
+    SnapshotTestingUtils.deleteAllSnapshots(TEST_UTIL.getAdmin());
+    SnapshotTestingUtils.deleteArchiveDirectory(TEST_UTIL);
+  }
+
+  /**
+   * Verify if exported snapshot and copied files matches the original one.
+   */
+  @Test
+  public void testExportFileSystemState() throws Exception {
+    testExportFileSystemState(tableName, snapshotName, snapshotName, tableNumFiles);
+  }
+
+  @Test
+  public void testExportFileSystemStateWithSkipTmp() throws Exception {
+    TEST_UTIL.getConfiguration().setBoolean(ExportSnapshot.CONF_SKIP_TMP, true);
+    try {
+      testExportFileSystemState(tableName, snapshotName, snapshotName, tableNumFiles);
+    } finally {
+      TEST_UTIL.getConfiguration().setBoolean(ExportSnapshot.CONF_SKIP_TMP, false);
+    }
+  }
+
+  @Test
+  public void testEmptyExportFileSystemState() throws Exception {
+    testExportFileSystemState(tableName, emptySnapshotName, emptySnapshotName, 0);
+  }
+
+  @Test
+  public void testConsecutiveExports() throws Exception {
+    Path copyDir = getLocalDestinationDir();
+    testExportFileSystemState(tableName, snapshotName, snapshotName, tableNumFiles, copyDir, false);
+    testExportFileSystemState(tableName, snapshotName, snapshotName, tableNumFiles, copyDir, true);
+    removeExportDir(copyDir);
+  }
+
+  @Test
+  public void testExportWithTargetName() throws Exception {
+    final byte[] targetName = Bytes.toBytes("testExportWithTargetName");
+    testExportFileSystemState(tableName, snapshotName, targetName, tableNumFiles);
+  }
+
+  private void testExportFileSystemState(final TableName tableName, final byte[] snapshotName,
+      final byte[] targetName, int filesExpected) throws Exception {
+    testExportFileSystemState(tableName, snapshotName, targetName,
+      filesExpected, getHdfsDestinationDir(), false);
+  }
+
+  protected void testExportFileSystemState(final TableName tableName,
+      final byte[] snapshotName, final byte[] targetName, int filesExpected,
+      Path copyDir, boolean overwrite) throws Exception {
+    testExportFileSystemState(TEST_UTIL.getConfiguration(), tableName, snapshotName, targetName,
+      filesExpected, TEST_UTIL.getDefaultRootDirPath(), copyDir,
+      overwrite, getBypassRegionPredicate(), true);
+  }
+
+  /**
+   * Creates destination directory, runs ExportSnapshot() tool, and runs some verifications.
+   */
+  protected static void testExportFileSystemState(final Configuration conf, final TableName tableName,
+      final byte[] snapshotName, final byte[] targetName, final int filesExpected,
+      final Path sourceDir, Path copyDir, final boolean overwrite,
+      final RegionPredicate bypassregionPredicate, boolean success) throws Exception {
+    URI hdfsUri = FileSystem.get(conf).getUri();
+    FileSystem fs = FileSystem.get(copyDir.toUri(), new Configuration());
+    copyDir = copyDir.makeQualified(fs);
+
+    List<String> opts = new ArrayList<>();
+    opts.add("--snapshot");
+    opts.add(Bytes.toString(snapshotName));
+    opts.add("--copy-to");
+    opts.add(copyDir.toString());
+    if (targetName != snapshotName) {
+      opts.add("--target");
+      opts.add(Bytes.toString(targetName));
+    }
+    if (overwrite) opts.add("--overwrite");
+
+    // Export Snapshot
+    int res = run(conf, new ExportSnapshot(), opts.toArray(new String[opts.size()]));
+    assertEquals(success ? 0 : 1, res);
+    if (!success) {
+      final Path targetDir = new Path(HConstants.SNAPSHOT_DIR_NAME, Bytes.toString(targetName));
+      assertFalse(fs.exists(new Path(copyDir, targetDir)));
+      return;
+    }
+
+    // Verify File-System state
+    FileStatus[] rootFiles = fs.listStatus(copyDir);
+    assertEquals(filesExpected > 0 ? 2 : 1, rootFiles.length);
+    for (FileStatus fileStatus: rootFiles) {
+      String name = fileStatus.getPath().getName();
+      assertTrue(fileStatus.isDirectory());
+      assertTrue(name.equals(HConstants.SNAPSHOT_DIR_NAME) ||
+                 name.equals(HConstants.HFILE_ARCHIVE_DIRECTORY));
+    }
+
+    // compare the snapshot metadata and verify the hfiles
+    final FileSystem hdfs = FileSystem.get(hdfsUri, conf);
+    final Path snapshotDir = new Path(HConstants.SNAPSHOT_DIR_NAME, Bytes.toString(snapshotName));
+    final Path targetDir = new Path(HConstants.SNAPSHOT_DIR_NAME, Bytes.toString(targetName));
+    verifySnapshotDir(hdfs, new Path(sourceDir, snapshotDir),
+        fs, new Path(copyDir, targetDir));
+    Set<String> snapshotFiles = verifySnapshot(conf, fs, copyDir, tableName,
+      Bytes.toString(targetName), bypassregionPredicate);
+    assertEquals(filesExpected, snapshotFiles.size());
+  }
+
+  /**
+   * Check that ExportSnapshot will succeed if something fails but the retry succeed.
+   */
+  @Test
+  public void testExportRetry() throws Exception {
+    Path copyDir = getLocalDestinationDir();
+    FileSystem fs = FileSystem.get(copyDir.toUri(), new Configuration());
+    copyDir = copyDir.makeQualified(fs);
+    Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
+    conf.setBoolean(ExportSnapshot.Testing.CONF_TEST_FAILURE, true);
+    conf.setInt(ExportSnapshot.Testing.CONF_TEST_FAILURE_COUNT, 2);
+    conf.setInt("mapreduce.map.maxattempts", 3);
+    testExportFileSystemState(conf, tableName, snapshotName, snapshotName, tableNumFiles,
+        TEST_UTIL.getDefaultRootDirPath(), copyDir, true, getBypassRegionPredicate(), true);
+  }
+
+  /**
+   * Check that ExportSnapshot will fail if we inject failure more times than MR will retry.
+   */
+  @Test
+  public void testExportFailure() throws Exception {
+    Path copyDir = getLocalDestinationDir();
+    FileSystem fs = FileSystem.get(copyDir.toUri(), new Configuration());
+    copyDir = copyDir.makeQualified(fs);
+    Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
+    conf.setBoolean(ExportSnapshot.Testing.CONF_TEST_FAILURE, true);
+    conf.setInt(ExportSnapshot.Testing.CONF_TEST_FAILURE_COUNT, 4);
+    conf.setInt("mapreduce.map.maxattempts", 3);
+    testExportFileSystemState(conf, tableName, snapshotName, snapshotName, tableNumFiles,
+        TEST_UTIL.getDefaultRootDirPath(), copyDir, true, getBypassRegionPredicate(), false);
+  }
+
+  /*
+   * verify if the snapshot folder on file-system 1 match the one on file-system 2
+   */
+  protected static void verifySnapshotDir(final FileSystem fs1, final Path root1,
+      final FileSystem fs2, final Path root2) throws IOException {
+    assertEquals(listFiles(fs1, root1, root1), listFiles(fs2, root2, root2));
+  }
+
+  protected Set<String> verifySnapshot(final FileSystem fs, final Path rootDir,
+      final TableName tableName, final String snapshotName) throws IOException {
+    return verifySnapshot(TEST_UTIL.getConfiguration(), fs, rootDir, tableName,
+      snapshotName, getBypassRegionPredicate());
+  }
+
+  /*
+   * Verify if the files exists
+   */
+  protected static Set<String> verifySnapshot(final Configuration conf, final FileSystem fs,
+      final Path rootDir, final TableName tableName, final String snapshotName,
+      final RegionPredicate bypassregionPredicate) throws IOException {
+    final Path exportedSnapshot = new Path(rootDir,
+      new Path(HConstants.SNAPSHOT_DIR_NAME, snapshotName));
+    final Set<String> snapshotFiles = new HashSet<>();
+    final Path exportedArchive = new Path(rootDir, HConstants.HFILE_ARCHIVE_DIRECTORY);
+    SnapshotReferenceUtil.visitReferencedFiles(conf, fs, exportedSnapshot,
+          new SnapshotReferenceUtil.SnapshotVisitor() {
+        @Override
+        public void storeFile(final HRegionInfo regionInfo, final String family,
+            final SnapshotRegionManifest.StoreFile storeFile) throws IOException {
+          if (bypassregionPredicate != null && bypassregionPredicate.evaluate(regionInfo))
+            return;
+
+          String hfile = storeFile.getName();
+          snapshotFiles.add(hfile);
+          if (storeFile.hasReference()) {
+            // Nothing to do here, we have already the reference embedded
+          } else {
+            verifyNonEmptyFile(new Path(exportedArchive,
+              new Path(FSUtils.getTableDir(new Path("./"), tableName),
+                  new Path(regionInfo.getEncodedName(), new Path(family, hfile)))));
+          }
+        }
+
+        private void verifyNonEmptyFile(final Path path) throws IOException {
+          assertTrue(path + " should exists", fs.exists(path));
+          assertTrue(path + " should not be empty", fs.getFileStatus(path).getLen() > 0);
+        }
+    });
+
+    // Verify Snapshot description
+    SnapshotDescription desc = SnapshotDescriptionUtils.readSnapshotInfo(fs, exportedSnapshot);
+    assertTrue(desc.getName().equals(snapshotName));
+    assertTrue(desc.getTable().equals(tableName.getNameAsString()));
+    return snapshotFiles;
+  }
+
+  private static Set<String> listFiles(final FileSystem fs, final Path root, final Path dir)
+      throws IOException {
+    Set<String> files = new HashSet<>();
+    int rootPrefix = root.makeQualified(fs).toString().length();
+    FileStatus[] list = FSUtils.listStatus(fs, dir);
+    if (list != null) {
+      for (FileStatus fstat: list) {
+        LOG.debug(fstat.getPath());
+        if (fstat.isDirectory()) {
+          files.addAll(listFiles(fs, root, fstat.getPath()));
+        } else {
+          files.add(fstat.getPath().makeQualified(fs).toString().substring(rootPrefix));
+        }
+      }
+    }
+    return files;
+  }
+
+  private Path getHdfsDestinationDir() {
+    Path rootDir = TEST_UTIL.getHBaseCluster().getMaster().getMasterFileSystem().getRootDir();
+    Path path = new Path(new Path(rootDir, "export-test"), "export-" + System.currentTimeMillis());
+    LOG.info("HDFS export destination path: " + path);
+    return path;
+  }
+
+  private Path getLocalDestinationDir() {
+    Path path = TEST_UTIL.getDataTestDir("local-export-" + System.currentTimeMillis());
+    LOG.info("Local export destination path: " + path);
+    return path;
+  }
+
+  private static void removeExportDir(final Path path) throws IOException {
+    FileSystem fs = FileSystem.get(path.toUri(), new Configuration());
+    fs.delete(path, true);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/snapshot/TestExportSnapshotHelpers.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/snapshot/TestExportSnapshotHelpers.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/snapshot/TestExportSnapshotHelpers.java
new file mode 100644
index 0000000..e31e81e
--- /dev/null
+++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/snapshot/TestExportSnapshotHelpers.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.snapshot;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotFileInfo;
+import org.apache.hadoop.hbase.testclassification.RegionServerTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.util.Pair;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+/**
+ * Test Export Snapshot Tool helpers
+ */
+@Category({RegionServerTests.class, SmallTests.class})
+public class TestExportSnapshotHelpers {
+  /**
+   * Verfy the result of getBalanceSplits() method.
+   * The result are groups of files, used as input list for the "export" mappers.
+   * All the groups should have similar amount of data.
+   *
+   * The input list is a pair of file path and length.
+   * The getBalanceSplits() function sort it by length,
+   * and assign to each group a file, going back and forth through the groups.
+   */
+  @Test
+  public void testBalanceSplit() throws Exception {
+    // Create a list of files
+    List<Pair<SnapshotFileInfo, Long>> files = new ArrayList<>(21);
+    for (long i = 0; i <= 20; i++) {
+      SnapshotFileInfo fileInfo = SnapshotFileInfo.newBuilder()
+        .setType(SnapshotFileInfo.Type.HFILE)
+        .setHfile("file-" + i)
+        .build();
+      files.add(new Pair<>(fileInfo, i));
+    }
+
+    // Create 5 groups (total size 210)
+    //    group 0: 20, 11, 10,  1 (total size: 42)
+    //    group 1: 19, 12,  9,  2 (total size: 42)
+    //    group 2: 18, 13,  8,  3 (total size: 42)
+    //    group 3: 17, 12,  7,  4 (total size: 42)
+    //    group 4: 16, 11,  6,  5 (total size: 42)
+    List<List<Pair<SnapshotFileInfo, Long>>> splits = ExportSnapshot.getBalancedSplits(files, 5);
+    assertEquals(5, splits.size());
+
+    String[] split0 = new String[] {"file-20", "file-11", "file-10", "file-1", "file-0"};
+    verifyBalanceSplit(splits.get(0), split0, 42);
+    String[] split1 = new String[] {"file-19", "file-12", "file-9",  "file-2"};
+    verifyBalanceSplit(splits.get(1), split1, 42);
+    String[] split2 = new String[] {"file-18", "file-13", "file-8",  "file-3"};
+    verifyBalanceSplit(splits.get(2), split2, 42);
+    String[] split3 = new String[] {"file-17", "file-14", "file-7",  "file-4"};
+    verifyBalanceSplit(splits.get(3), split3, 42);
+    String[] split4 = new String[] {"file-16", "file-15", "file-6",  "file-5"};
+    verifyBalanceSplit(splits.get(4), split4, 42);
+  }
+
+  private void verifyBalanceSplit(final List<Pair<SnapshotFileInfo, Long>> split,
+      final String[] expected, final long expectedSize) {
+    assertEquals(expected.length, split.size());
+    long totalSize = 0;
+    for (int i = 0; i < expected.length; ++i) {
+      Pair<SnapshotFileInfo, Long> fileInfo = split.get(i);
+      assertEquals(expected[i], fileInfo.getFirst().getHfile());
+      totalSize += fileInfo.getSecond();
+    }
+    assertEquals(expectedSize, totalSize);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/snapshot/TestExportSnapshotNoCluster.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/snapshot/TestExportSnapshotNoCluster.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/snapshot/TestExportSnapshotNoCluster.java
new file mode 100644
index 0000000..00778502
--- /dev/null
+++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/snapshot/TestExportSnapshotNoCluster.java
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.snapshot;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.CategoryBasedTimeout;
+import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.master.snapshot.SnapshotManager;
+import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils.SnapshotMock;
+import org.apache.hadoop.hbase.testclassification.MapReduceTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestRule;
+
+/**
+ * Test Export Snapshot Tool
+ */
+@Category({MapReduceTests.class, MediumTests.class})
+public class TestExportSnapshotNoCluster {
+  @Rule public final TestRule timeout = CategoryBasedTimeout.builder().
+      withTimeout(this.getClass()).withLookingForStuckThread(true).build();
+  private static final Log LOG = LogFactory.getLog(TestExportSnapshotNoCluster.class);
+
+  protected final static HBaseCommonTestingUtility TEST_UTIL = new HBaseCommonTestingUtility();
+
+  private static FileSystem fs;
+  private static Path testDir;
+
+  public static void setUpBaseConf(Configuration conf) {
+    conf.setBoolean(SnapshotManager.HBASE_SNAPSHOT_ENABLED, true);
+    conf.setInt("hbase.regionserver.msginterval", 100);
+    conf.setInt("hbase.client.pause", 250);
+    conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 6);
+    conf.setBoolean("hbase.master.enabletable.roundrobin", true);
+    conf.setInt("mapreduce.map.maxattempts", 10);
+    conf.set(HConstants.HBASE_DIR, testDir.toString());
+  }
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    testDir = TEST_UTIL.getDataTestDir();
+    fs = testDir.getFileSystem(TEST_UTIL.getConfiguration());
+
+    setUpBaseConf(TEST_UTIL.getConfiguration());
+  }
+
+  /**
+   * Mock a snapshot with files in the archive dir,
+   * two regions, and one reference file.
+   */
+  @Test
+  public void testSnapshotWithRefsExportFileSystemState() throws Exception {
+    SnapshotMock snapshotMock = new SnapshotMock(TEST_UTIL.getConfiguration(), fs, testDir);
+    SnapshotMock.SnapshotBuilder builder = snapshotMock.createSnapshotV2("tableWithRefsV1",
+      "tableWithRefsV1");
+    testSnapshotWithRefsExportFileSystemState(builder);
+
+    snapshotMock = new SnapshotMock(TEST_UTIL.getConfiguration(), fs, testDir);
+    builder = snapshotMock.createSnapshotV2("tableWithRefsV2", "tableWithRefsV2");
+    testSnapshotWithRefsExportFileSystemState(builder);
+  }
+
+  /**
+   * Generates a couple of regions for the specified SnapshotMock,
+   * and then it will run the export and verification.
+   */
+  private void testSnapshotWithRefsExportFileSystemState(SnapshotMock.SnapshotBuilder builder)
+      throws Exception {
+    Path[] r1Files = builder.addRegion();
+    Path[] r2Files = builder.addRegion();
+    builder.commit();
+    int snapshotFilesCount = r1Files.length + r2Files.length;
+
+    byte[] snapshotName = Bytes.toBytes(builder.getSnapshotDescription().getName());
+    TableName tableName = builder.getTableDescriptor().getTableName();
+    TestExportSnapshot.testExportFileSystemState(TEST_UTIL.getConfiguration(),
+      tableName, snapshotName, snapshotName, snapshotFilesCount,
+      testDir, getDestinationDir(), false, null, true);
+  }
+
+  private Path getDestinationDir() {
+    Path path = new Path(new Path(testDir, "export-test"), "export-" + System.currentTimeMillis());
+    LOG.info("HDFS export destination path: " + path);
+    return path;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/snapshot/TestMobExportSnapshot.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/snapshot/TestMobExportSnapshot.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/snapshot/TestMobExportSnapshot.java
new file mode 100644
index 0000000..7407a7d
--- /dev/null
+++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/snapshot/TestMobExportSnapshot.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.snapshot;
+
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.mob.MobConstants;
+import org.apache.hadoop.hbase.mob.MobUtils;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.testclassification.VerySlowRegionServerTests;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.experimental.categories.Category;
+
+/**
+ * Test Export Snapshot Tool
+ */
+@Ignore
+@Category({VerySlowRegionServerTests.class, LargeTests.class})
+public class TestMobExportSnapshot extends TestExportSnapshot {
+
+  public static void setUpBaseConf(Configuration conf) {
+    TestExportSnapshot.setUpBaseConf(conf);
+    conf.setInt(MobConstants.MOB_FILE_CACHE_SIZE_KEY, 0);
+  }
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    setUpBaseConf(TEST_UTIL.getConfiguration());
+    TEST_UTIL.startMiniCluster(1, 3);
+    TEST_UTIL.startMiniMapReduceCluster();
+  }
+
+  @Override
+  protected void createTable() throws Exception {
+    MobSnapshotTestingUtils.createPreSplitMobTable(TEST_UTIL, tableName, 2, FAMILY);
+  }
+
+  @Override
+  protected RegionPredicate getBypassRegionPredicate() {
+    return new RegionPredicate() {
+      @Override
+      public boolean evaluate(final HRegionInfo regionInfo) {
+        return MobUtils.isMobRegionInfo(regionInfo);
+      }
+    };
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/snapshot/TestMobSecureExportSnapshot.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/snapshot/TestMobSecureExportSnapshot.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/snapshot/TestMobSecureExportSnapshot.java
new file mode 100644
index 0000000..98d03c0
--- /dev/null
+++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/snapshot/TestMobSecureExportSnapshot.java
@@ -0,0 +1,59 @@
+/**
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.snapshot;
+
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.testclassification.VerySlowRegionServerTests;
+import org.apache.hadoop.hbase.mapreduce.HadoopSecurityEnabledUserProviderForTesting;
+import org.apache.hadoop.hbase.security.UserProvider;
+import org.apache.hadoop.hbase.security.access.AccessControlLists;
+import org.apache.hadoop.hbase.security.access.SecureTestUtil;
+
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.experimental.categories.Category;
+
+/**
+ * Reruns TestMobExportSnapshot using MobExportSnapshot in secure mode.
+ */
+@Ignore
+@Category({VerySlowRegionServerTests.class, LargeTests.class})
+public class TestMobSecureExportSnapshot extends TestMobExportSnapshot {
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    setUpBaseConf(TEST_UTIL.getConfiguration());
+    // Setup separate test-data directory for MR cluster and set corresponding configurations.
+    // Otherwise, different test classes running MR cluster can step on each other.
+    TEST_UTIL.getDataTestDir();
+
+    // set the always on security provider
+    UserProvider.setUserProviderForTesting(TEST_UTIL.getConfiguration(),
+      HadoopSecurityEnabledUserProviderForTesting.class);
+
+    // setup configuration
+    SecureTestUtil.enableSecurity(TEST_UTIL.getConfiguration());
+
+    TEST_UTIL.startMiniCluster(1, 3);
+    TEST_UTIL.startMiniMapReduceCluster();
+
+    // Wait for the ACL table to become available
+    TEST_UTIL.waitTableEnabled(AccessControlLists.ACL_TABLE_NAME);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/snapshot/TestSecureExportSnapshot.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/snapshot/TestSecureExportSnapshot.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/snapshot/TestSecureExportSnapshot.java
new file mode 100644
index 0000000..7d4832c
--- /dev/null
+++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/snapshot/TestSecureExportSnapshot.java
@@ -0,0 +1,64 @@
+/**
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.snapshot;
+
+import org.apache.hadoop.hbase.CategoryBasedTimeout;
+import org.apache.hadoop.hbase.mapreduce.HadoopSecurityEnabledUserProviderForTesting;
+import org.apache.hadoop.hbase.security.UserProvider;
+import org.apache.hadoop.hbase.security.access.AccessControlLists;
+import org.apache.hadoop.hbase.security.access.SecureTestUtil;
+
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.testclassification.VerySlowRegionServerTests;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Rule;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestRule;
+
+/**
+ * Reruns TestExportSnapshot using ExportSnapshot in secure mode.
+ */
+@Ignore
+@Category({VerySlowRegionServerTests.class, LargeTests.class})
+public class TestSecureExportSnapshot extends TestExportSnapshot {
+  @Rule public final TestRule timeout = CategoryBasedTimeout.builder().
+      withTimeout(this.getClass()).withLookingForStuckThread(true).build();
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    setUpBaseConf(TEST_UTIL.getConfiguration());
+    // Setup separate test-data directory for MR cluster and set corresponding configurations.
+    // Otherwise, different test classes running MR cluster can step on each other.
+    TEST_UTIL.getDataTestDir();
+
+    // set the always on security provider
+    UserProvider.setUserProviderForTesting(TEST_UTIL.getConfiguration(),
+      HadoopSecurityEnabledUserProviderForTesting.class);
+
+    // setup configuration
+    SecureTestUtil.enableSecurity(TEST_UTIL.getConfiguration());
+
+    TEST_UTIL.startMiniCluster(1, 3);
+    TEST_UTIL.startMiniMapReduceCluster();
+
+    // Wait for the ACL table to become available
+    TEST_UTIL.waitTableEnabled(AccessControlLists.ACL_TABLE_NAME);
+  }
+}