You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by bu...@apache.org on 2017/08/27 05:33:28 UTC
[28/50] [abbrv] hbase git commit: HBASE-18640 Move mapreduce out of
hbase-server into separate module.
http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TsvImporterCustomTestMapperForOprAttr.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TsvImporterCustomTestMapperForOprAttr.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TsvImporterCustomTestMapperForOprAttr.java
new file mode 100644
index 0000000..a9da98b
--- /dev/null
+++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TsvImporterCustomTestMapperForOprAttr.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.mapreduce.ImportTsv.TsvParser.BadTsvLineException;
+import org.apache.hadoop.hbase.mapreduce.ImportTsv.TsvParser.ParsedLine;
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ * Just shows a simple example of how the attributes can be extracted and added
+ * to the puts
+ */
+public class TsvImporterCustomTestMapperForOprAttr extends TsvImporterMapper {
+ @Override
+ protected void populatePut(byte[] lineBytes, ParsedLine parsed, Put put, int i)
+ throws BadTsvLineException, IOException {
+ KeyValue kv;
+ kv = new KeyValue(lineBytes, parsed.getRowKeyOffset(), parsed.getRowKeyLength(),
+ parser.getFamily(i), 0, parser.getFamily(i).length, parser.getQualifier(i), 0,
+ parser.getQualifier(i).length, ts, KeyValue.Type.Put, lineBytes, parsed.getColumnOffset(i),
+ parsed.getColumnLength(i));
+ if (parsed.getIndividualAttributes() != null) {
+ String[] attributes = parsed.getIndividualAttributes();
+ for (String attr : attributes) {
+ String[] split = attr.split(ImportTsv.DEFAULT_ATTRIBUTES_SEPERATOR);
+ if (split == null || split.length <= 1) {
+ throw new BadTsvLineException("Invalid attributes seperator specified" + attributes);
+ } else {
+ if (split[0].length() <= 0 || split[1].length() <= 0) {
+ throw new BadTsvLineException("Invalid attributes seperator specified" + attributes);
+ }
+ put.setAttribute(split[0], Bytes.toBytes(split[1]));
+ }
+ }
+ }
+ put.add(kv);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java
new file mode 100644
index 0000000..69c4c7c
--- /dev/null
+++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java
@@ -0,0 +1,1059 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.replication;
+
+import static org.junit.Assert.*;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.Waiter;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
+import org.apache.hadoop.hbase.mapreduce.replication.VerifyReplication;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.replication.regionserver.Replication;
+import org.apache.hadoop.hbase.replication.regionserver.ReplicationSource;
+import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
+import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.testclassification.ReplicationTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.JVMClusterUtil;
+import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
+import org.apache.hadoop.hbase.wal.WAL;
+import org.apache.hadoop.hbase.wal.WALKey;
+import org.apache.hadoop.mapreduce.Job;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists;
+
+@Category({ReplicationTests.class, LargeTests.class})
+public class TestReplicationSmallTests extends TestReplicationBase {
+
+ private static final Log LOG = LogFactory.getLog(TestReplicationSmallTests.class);
+ private static final String PEER_ID = "2";
+
+ @Rule
+ public TestName name = new TestName();
+
+ /**
+ * @throws java.lang.Exception
+ */
+ @Before
+ public void setUp() throws Exception {
+ // Starting and stopping replication can make us miss new logs,
+ // rolling like this makes sure the most recent one gets added to the queue
+ for ( JVMClusterUtil.RegionServerThread r :
+ utility1.getHBaseCluster().getRegionServerThreads()) {
+ utility1.getAdmin().rollWALWriter(r.getRegionServer().getServerName());
+ }
+ int rowCount = utility1.countRows(tableName);
+ utility1.deleteTableData(tableName);
+ // truncating the table will send one Delete per row to the slave cluster
+ // in an async fashion, which is why we cannot just call deleteTableData on
+ // utility2 since late writes could make it to the slave in some way.
+ // Instead, we truncate the first table and wait for all the Deletes to
+ // make it to the slave.
+ Scan scan = new Scan();
+ int lastCount = 0;
+ for (int i = 0; i < NB_RETRIES; i++) {
+ if (i==NB_RETRIES-1) {
+ fail("Waited too much time for truncate");
+ }
+ ResultScanner scanner = htable2.getScanner(scan);
+ Result[] res = scanner.next(rowCount);
+ scanner.close();
+ if (res.length != 0) {
+ if (res.length < lastCount) {
+ i--; // Don't increment timeout if we make progress
+ }
+ lastCount = res.length;
+ LOG.info("Still got " + res.length + " rows");
+ Thread.sleep(SLEEP_TIME);
+ } else {
+ break;
+ }
+ }
+ }
+
+ /**
+ * Verify that version and column delete marker types are replicated
+ * correctly.
+ * @throws Exception
+ */
+ @Test(timeout=300000)
+ public void testDeleteTypes() throws Exception {
+ LOG.info("testDeleteTypes");
+ final byte[] v1 = Bytes.toBytes("v1");
+ final byte[] v2 = Bytes.toBytes("v2");
+ final byte[] v3 = Bytes.toBytes("v3");
+ htable1 = utility1.getConnection().getTable(tableName);
+
+ long t = EnvironmentEdgeManager.currentTime();
+ // create three versions for "row"
+ Put put = new Put(row);
+ put.addColumn(famName, row, t, v1);
+ htable1.put(put);
+
+ put = new Put(row);
+ put.addColumn(famName, row, t + 1, v2);
+ htable1.put(put);
+
+ put = new Put(row);
+ put.addColumn(famName, row, t + 2, v3);
+ htable1.put(put);
+
+ Get get = new Get(row);
+ get.setMaxVersions();
+ for (int i = 0; i < NB_RETRIES; i++) {
+ if (i==NB_RETRIES-1) {
+ fail("Waited too much time for put replication");
+ }
+ Result res = htable2.get(get);
+ if (res.size() < 3) {
+ LOG.info("Rows not available");
+ Thread.sleep(SLEEP_TIME);
+ } else {
+ assertArrayEquals(CellUtil.cloneValue(res.rawCells()[0]), v3);
+ assertArrayEquals(CellUtil.cloneValue(res.rawCells()[1]), v2);
+ assertArrayEquals(CellUtil.cloneValue(res.rawCells()[2]), v1);
+ break;
+ }
+ }
+ // place a version delete marker (delete last version)
+ Delete d = new Delete(row);
+ d.addColumn(famName, row, t);
+ htable1.delete(d);
+
+ get = new Get(row);
+ get.setMaxVersions();
+ for (int i = 0; i < NB_RETRIES; i++) {
+ if (i==NB_RETRIES-1) {
+ fail("Waited too much time for put replication");
+ }
+ Result res = htable2.get(get);
+ if (res.size() > 2) {
+ LOG.info("Version not deleted");
+ Thread.sleep(SLEEP_TIME);
+ } else {
+ assertArrayEquals(CellUtil.cloneValue(res.rawCells()[0]), v3);
+ assertArrayEquals(CellUtil.cloneValue(res.rawCells()[1]), v2);
+ break;
+ }
+ }
+
+ // place a column delete marker
+ d = new Delete(row);
+ d.addColumns(famName, row, t+2);
+ htable1.delete(d);
+
+ // now *both* of the remaining version should be deleted
+ // at the replica
+ get = new Get(row);
+ for (int i = 0; i < NB_RETRIES; i++) {
+ if (i==NB_RETRIES-1) {
+ fail("Waited too much time for del replication");
+ }
+ Result res = htable2.get(get);
+ if (res.size() >= 1) {
+ LOG.info("Rows not deleted");
+ Thread.sleep(SLEEP_TIME);
+ } else {
+ break;
+ }
+ }
+ }
+
+ /**
+ * Add a row, check it's replicated, delete it, check's gone
+ * @throws Exception
+ */
+ @Test(timeout=300000)
+ public void testSimplePutDelete() throws Exception {
+ LOG.info("testSimplePutDelete");
+ Put put = new Put(row);
+ put.addColumn(famName, row, row);
+
+ htable1 = utility1.getConnection().getTable(tableName);
+ htable1.put(put);
+
+ Get get = new Get(row);
+ for (int i = 0; i < NB_RETRIES; i++) {
+ if (i==NB_RETRIES-1) {
+ fail("Waited too much time for put replication");
+ }
+ Result res = htable2.get(get);
+ if (res.isEmpty()) {
+ LOG.info("Row not available");
+ Thread.sleep(SLEEP_TIME);
+ } else {
+ assertArrayEquals(res.value(), row);
+ break;
+ }
+ }
+
+ Delete del = new Delete(row);
+ htable1.delete(del);
+
+ get = new Get(row);
+ for (int i = 0; i < NB_RETRIES; i++) {
+ if (i==NB_RETRIES-1) {
+ fail("Waited too much time for del replication");
+ }
+ Result res = htable2.get(get);
+ if (res.size() >= 1) {
+ LOG.info("Row not deleted");
+ Thread.sleep(SLEEP_TIME);
+ } else {
+ break;
+ }
+ }
+ }
+
+ /**
+ * Try a small batch upload using the write buffer, check it's replicated
+ * @throws Exception
+ */
+ @Test(timeout=300000)
+ public void testSmallBatch() throws Exception {
+ LOG.info("testSmallBatch");
+ // normal Batch tests
+ loadData("", row);
+
+ Scan scan = new Scan();
+
+ ResultScanner scanner1 = htable1.getScanner(scan);
+ Result[] res1 = scanner1.next(NB_ROWS_IN_BATCH);
+ scanner1.close();
+ assertEquals(NB_ROWS_IN_BATCH, res1.length);
+
+ waitForReplication(NB_ROWS_IN_BATCH, NB_RETRIES);
+ }
+
+ private void waitForReplication(int expectedRows, int retries) throws IOException, InterruptedException {
+ Scan scan;
+ for (int i = 0; i < retries; i++) {
+ scan = new Scan();
+ if (i== retries -1) {
+ fail("Waited too much time for normal batch replication");
+ }
+ ResultScanner scanner = htable2.getScanner(scan);
+ Result[] res = scanner.next(expectedRows);
+ scanner.close();
+ if (res.length != expectedRows) {
+ LOG.info("Only got " + res.length + " rows");
+ Thread.sleep(SLEEP_TIME);
+ } else {
+ break;
+ }
+ }
+ }
+
+ private void loadData(String prefix, byte[] row) throws IOException {
+ List<Put> puts = new ArrayList<>(NB_ROWS_IN_BATCH);
+ for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
+ Put put = new Put(Bytes.toBytes(prefix + Integer.toString(i)));
+ put.addColumn(famName, row, row);
+ puts.add(put);
+ }
+ htable1.put(puts);
+ }
+
+ /**
+ * Test disable/enable replication, trying to insert, make sure nothing's
+ * replicated, enable it, the insert should be replicated
+ *
+ * @throws Exception
+ */
+ @Test(timeout = 300000)
+ public void testDisableEnable() throws Exception {
+
+ // Test disabling replication
+ admin.disablePeer(PEER_ID);
+
+ byte[] rowkey = Bytes.toBytes("disable enable");
+ Put put = new Put(rowkey);
+ put.addColumn(famName, row, row);
+ htable1.put(put);
+
+ Get get = new Get(rowkey);
+ for (int i = 0; i < NB_RETRIES; i++) {
+ Result res = htable2.get(get);
+ if (res.size() >= 1) {
+ fail("Replication wasn't disabled");
+ } else {
+ LOG.info("Row not replicated, let's wait a bit more...");
+ Thread.sleep(SLEEP_TIME);
+ }
+ }
+
+ // Test enable replication
+ admin.enablePeer(PEER_ID);
+
+ for (int i = 0; i < NB_RETRIES; i++) {
+ Result res = htable2.get(get);
+ if (res.isEmpty()) {
+ LOG.info("Row not available");
+ Thread.sleep(SLEEP_TIME);
+ } else {
+ assertArrayEquals(res.value(), row);
+ return;
+ }
+ }
+ fail("Waited too much time for put replication");
+ }
+
+ /**
+ * Integration test for TestReplicationAdmin, removes and re-add a peer
+ * cluster
+ *
+ * @throws Exception
+ */
+ @Test(timeout=300000)
+ public void testAddAndRemoveClusters() throws Exception {
+ LOG.info("testAddAndRemoveClusters");
+ admin.removePeer(PEER_ID);
+ Thread.sleep(SLEEP_TIME);
+ byte[] rowKey = Bytes.toBytes("Won't be replicated");
+ Put put = new Put(rowKey);
+ put.addColumn(famName, row, row);
+ htable1.put(put);
+
+ Get get = new Get(rowKey);
+ for (int i = 0; i < NB_RETRIES; i++) {
+ if (i == NB_RETRIES-1) {
+ break;
+ }
+ Result res = htable2.get(get);
+ if (res.size() >= 1) {
+ fail("Not supposed to be replicated");
+ } else {
+ LOG.info("Row not replicated, let's wait a bit more...");
+ Thread.sleep(SLEEP_TIME);
+ }
+ }
+ ReplicationPeerConfig rpc = new ReplicationPeerConfig();
+ rpc.setClusterKey(utility2.getClusterKey());
+ admin.addPeer(PEER_ID, rpc, null);
+ Thread.sleep(SLEEP_TIME);
+ rowKey = Bytes.toBytes("do rep");
+ put = new Put(rowKey);
+ put.addColumn(famName, row, row);
+ LOG.info("Adding new row");
+ htable1.put(put);
+
+ get = new Get(rowKey);
+ for (int i = 0; i < NB_RETRIES; i++) {
+ if (i==NB_RETRIES-1) {
+ fail("Waited too much time for put replication");
+ }
+ Result res = htable2.get(get);
+ if (res.isEmpty()) {
+ LOG.info("Row not available");
+ Thread.sleep(SLEEP_TIME*i);
+ } else {
+ assertArrayEquals(res.value(), row);
+ break;
+ }
+ }
+ }
+
+
+ /**
+ * Do a more intense version testSmallBatch, one that will trigger
+ * wal rolling and other non-trivial code paths
+ * @throws Exception
+ */
+ @Test(timeout=300000)
+ public void testLoading() throws Exception {
+ LOG.info("Writing out rows to table1 in testLoading");
+ List<Put> puts = new ArrayList<>(NB_ROWS_IN_BIG_BATCH);
+ for (int i = 0; i < NB_ROWS_IN_BIG_BATCH; i++) {
+ Put put = new Put(Bytes.toBytes(i));
+ put.addColumn(famName, row, row);
+ puts.add(put);
+ }
+ // The puts will be iterated through and flushed only when the buffer
+ // size is reached.
+ htable1.put(puts);
+
+ Scan scan = new Scan();
+
+ ResultScanner scanner = htable1.getScanner(scan);
+ Result[] res = scanner.next(NB_ROWS_IN_BIG_BATCH);
+ scanner.close();
+
+ assertEquals(NB_ROWS_IN_BIG_BATCH, res.length);
+
+ LOG.info("Looking in table2 for replicated rows in testLoading");
+ long start = System.currentTimeMillis();
+ // Retry more than NB_RETRIES. As it was, retries were done in 5 seconds and we'd fail
+ // sometimes.
+ final long retries = NB_RETRIES * 10;
+ for (int i = 0; i < retries; i++) {
+ scan = new Scan();
+ scanner = htable2.getScanner(scan);
+ res = scanner.next(NB_ROWS_IN_BIG_BATCH);
+ scanner.close();
+ if (res.length != NB_ROWS_IN_BIG_BATCH) {
+ if (i == retries - 1) {
+ int lastRow = -1;
+ for (Result result : res) {
+ int currentRow = Bytes.toInt(result.getRow());
+ for (int row = lastRow+1; row < currentRow; row++) {
+ LOG.error("Row missing: " + row);
+ }
+ lastRow = currentRow;
+ }
+ LOG.error("Last row: " + lastRow);
+ fail("Waited too much time for normal batch replication, " +
+ res.length + " instead of " + NB_ROWS_IN_BIG_BATCH + "; waited=" +
+ (System.currentTimeMillis() - start) + "ms");
+ } else {
+ LOG.info("Only got " + res.length + " rows... retrying");
+ Thread.sleep(SLEEP_TIME);
+ }
+ } else {
+ break;
+ }
+ }
+ }
+
+ /**
+ * Do a small loading into a table, make sure the data is really the same,
+ * then run the VerifyReplication job to check the results. Do a second
+ * comparison where all the cells are different.
+ * @throws Exception
+ */
+ @Test(timeout=300000)
+ public void testVerifyRepJob() throws Exception {
+ // Populate the tables, at the same time it guarantees that the tables are
+ // identical since it does the check
+ testSmallBatch();
+
+ String[] args = new String[] {PEER_ID, tableName.getNameAsString()};
+ runVerifyReplication(args, NB_ROWS_IN_BATCH, 0);
+
+ Scan scan = new Scan();
+ ResultScanner rs = htable2.getScanner(scan);
+ Put put = null;
+ for (Result result : rs) {
+ put = new Put(result.getRow());
+ Cell firstVal = result.rawCells()[0];
+ put.addColumn(CellUtil.cloneFamily(firstVal), CellUtil.cloneQualifier(firstVal),
+ Bytes.toBytes("diff data"));
+ htable2.put(put);
+ }
+ Delete delete = new Delete(put.getRow());
+ htable2.delete(delete);
+ runVerifyReplication(args, 0, NB_ROWS_IN_BATCH);
+ }
+
+ /**
+ * Load a row into a table, make sure the data is really the same,
+ * delete the row, make sure the delete marker is replicated,
+ * run verify replication with and without raw to check the results.
+ * @throws Exception
+ */
+ @Test(timeout=300000)
+ public void testVerifyRepJobWithRawOptions() throws Exception {
+ LOG.info(name.getMethodName());
+
+ final TableName tableName = TableName.valueOf(name.getMethodName());
+ byte[] familyname = Bytes.toBytes("fam_raw");
+ byte[] row = Bytes.toBytes("row_raw");
+
+ Table lHtable1 = null;
+ Table lHtable2 = null;
+
+ try {
+ HTableDescriptor table = new HTableDescriptor(tableName);
+ HColumnDescriptor fam = new HColumnDescriptor(familyname);
+ fam.setMaxVersions(100);
+ fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
+ table.addFamily(fam);
+ scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
+ for (HColumnDescriptor f : table.getColumnFamilies()) {
+ scopes.put(f.getName(), f.getScope());
+ }
+
+ Connection connection1 = ConnectionFactory.createConnection(conf1);
+ Connection connection2 = ConnectionFactory.createConnection(conf2);
+ try (Admin admin1 = connection1.getAdmin()) {
+ admin1.createTable(table, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE);
+ }
+ try (Admin admin2 = connection2.getAdmin()) {
+ admin2.createTable(table, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE);
+ }
+ utility1.waitUntilAllRegionsAssigned(tableName);
+ utility2.waitUntilAllRegionsAssigned(tableName);
+
+ lHtable1 = utility1.getConnection().getTable(tableName);
+ lHtable2 = utility2.getConnection().getTable(tableName);
+
+ Put put = new Put(row);
+ put.addColumn(familyname, row, row);
+ lHtable1.put(put);
+
+ Get get = new Get(row);
+ for (int i = 0; i < NB_RETRIES; i++) {
+ if (i==NB_RETRIES-1) {
+ fail("Waited too much time for put replication");
+ }
+ Result res = lHtable2.get(get);
+ if (res.isEmpty()) {
+ LOG.info("Row not available");
+ Thread.sleep(SLEEP_TIME);
+ } else {
+ assertArrayEquals(res.value(), row);
+ break;
+ }
+ }
+
+ Delete del = new Delete(row);
+ lHtable1.delete(del);
+
+ get = new Get(row);
+ for (int i = 0; i < NB_RETRIES; i++) {
+ if (i==NB_RETRIES-1) {
+ fail("Waited too much time for del replication");
+ }
+ Result res = lHtable2.get(get);
+ if (res.size() >= 1) {
+ LOG.info("Row not deleted");
+ Thread.sleep(SLEEP_TIME);
+ } else {
+ break;
+ }
+ }
+
+ // Checking verifyReplication for the default behavior.
+ String[] argsWithoutRaw = new String[] {PEER_ID, tableName.getNameAsString()};
+ runVerifyReplication(argsWithoutRaw, 0, 0);
+
+ // Checking verifyReplication with raw
+ String[] argsWithRawAsTrue = new String[] {"--raw", PEER_ID, tableName.getNameAsString()};
+ runVerifyReplication(argsWithRawAsTrue, 1, 0);
+ } finally {
+ if (lHtable1 != null) {
+ lHtable1.close();
+ }
+ if (lHtable2 != null) {
+ lHtable2.close();
+ }
+ }
+ }
+
+ private void runVerifyReplication(String[] args, int expectedGoodRows, int expectedBadRows)
+ throws IOException, InterruptedException, ClassNotFoundException {
+ Job job = new VerifyReplication().createSubmittableJob(new Configuration(conf1), args);
+ if (job == null) {
+ fail("Job wasn't created, see the log");
+ }
+ if (!job.waitForCompletion(true)) {
+ fail("Job failed, see the log");
+ }
+ assertEquals(expectedGoodRows, job.getCounters().
+ findCounter(VerifyReplication.Verifier.Counters.GOODROWS).getValue());
+ assertEquals(expectedBadRows, job.getCounters().
+ findCounter(VerifyReplication.Verifier.Counters.BADROWS).getValue());
+ }
+
+ @Test(timeout=300000)
+ // VerifyReplication should honor versions option
+ public void testHBase14905() throws Exception {
+ // normal Batch tests
+ byte[] qualifierName = Bytes.toBytes("f1");
+ Put put = new Put(Bytes.toBytes("r1"));
+ put.addColumn(famName, qualifierName, Bytes.toBytes("v1002"));
+ htable1.put(put);
+ put.addColumn(famName, qualifierName, Bytes.toBytes("v1001"));
+ htable1.put(put);
+ put.addColumn(famName, qualifierName, Bytes.toBytes("v1112"));
+ htable1.put(put);
+
+ Scan scan = new Scan();
+ scan.setMaxVersions(100);
+ ResultScanner scanner1 = htable1.getScanner(scan);
+ Result[] res1 = scanner1.next(1);
+ scanner1.close();
+
+ assertEquals(1, res1.length);
+ assertEquals(3, res1[0].getColumnCells(famName, qualifierName).size());
+
+ for (int i = 0; i < NB_RETRIES; i++) {
+ scan = new Scan();
+ scan.setMaxVersions(100);
+ scanner1 = htable2.getScanner(scan);
+ res1 = scanner1.next(1);
+ scanner1.close();
+ if (res1.length != 1) {
+ LOG.info("Only got " + res1.length + " rows");
+ Thread.sleep(SLEEP_TIME);
+ } else {
+ int cellNumber = res1[0].getColumnCells(famName, Bytes.toBytes("f1")).size();
+ if (cellNumber != 3) {
+ LOG.info("Only got " + cellNumber + " cells");
+ Thread.sleep(SLEEP_TIME);
+ } else {
+ break;
+ }
+ }
+ if (i == NB_RETRIES-1) {
+ fail("Waited too much time for normal batch replication");
+ }
+ }
+
+ put.addColumn(famName, qualifierName, Bytes.toBytes("v1111"));
+ htable2.put(put);
+ put.addColumn(famName, qualifierName, Bytes.toBytes("v1112"));
+ htable2.put(put);
+
+ scan = new Scan();
+ scan.setMaxVersions(100);
+ scanner1 = htable2.getScanner(scan);
+ res1 = scanner1.next(NB_ROWS_IN_BATCH);
+ scanner1.close();
+
+ assertEquals(1, res1.length);
+ assertEquals(5, res1[0].getColumnCells(famName, qualifierName).size());
+
+ String[] args = new String[] {"--versions=100", PEER_ID, tableName.getNameAsString()};
+ runVerifyReplication(args, 0, 1);
+ }
+
+ @Test(timeout=300000)
+ // VerifyReplication should honor versions option
+ public void testVersionMismatchHBase14905() throws Exception {
+ // normal Batch tests
+ byte[] qualifierName = Bytes.toBytes("f1");
+ Put put = new Put(Bytes.toBytes("r1"));
+ long ts = System.currentTimeMillis();
+ put.addColumn(famName, qualifierName, ts + 1, Bytes.toBytes("v1"));
+ htable1.put(put);
+ put.addColumn(famName, qualifierName, ts + 2, Bytes.toBytes("v2"));
+ htable1.put(put);
+ put.addColumn(famName, qualifierName, ts + 3, Bytes.toBytes("v3"));
+ htable1.put(put);
+
+ Scan scan = new Scan();
+ scan.setMaxVersions(100);
+ ResultScanner scanner1 = htable1.getScanner(scan);
+ Result[] res1 = scanner1.next(1);
+ scanner1.close();
+
+ assertEquals(1, res1.length);
+ assertEquals(3, res1[0].getColumnCells(famName, qualifierName).size());
+
+ for (int i = 0; i < NB_RETRIES; i++) {
+ scan = new Scan();
+ scan.setMaxVersions(100);
+ scanner1 = htable2.getScanner(scan);
+ res1 = scanner1.next(1);
+ scanner1.close();
+ if (res1.length != 1) {
+ LOG.info("Only got " + res1.length + " rows");
+ Thread.sleep(SLEEP_TIME);
+ } else {
+ int cellNumber = res1[0].getColumnCells(famName, Bytes.toBytes("f1")).size();
+ if (cellNumber != 3) {
+ LOG.info("Only got " + cellNumber + " cells");
+ Thread.sleep(SLEEP_TIME);
+ } else {
+ break;
+ }
+ }
+ if (i == NB_RETRIES-1) {
+ fail("Waited too much time for normal batch replication");
+ }
+ }
+
+ try {
+ // Disabling replication and modifying the particular version of the cell to validate the feature.
+ admin.disablePeer(PEER_ID);
+ Put put2 = new Put(Bytes.toBytes("r1"));
+ put2.addColumn(famName, qualifierName, ts +2, Bytes.toBytes("v99"));
+ htable2.put(put2);
+
+ scan = new Scan();
+ scan.setMaxVersions(100);
+ scanner1 = htable2.getScanner(scan);
+ res1 = scanner1.next(NB_ROWS_IN_BATCH);
+ scanner1.close();
+ assertEquals(1, res1.length);
+ assertEquals(3, res1[0].getColumnCells(famName, qualifierName).size());
+
+ String[] args = new String[] {"--versions=100", PEER_ID, tableName.getNameAsString()};
+ runVerifyReplication(args, 0, 1);
+ }
+ finally {
+ admin.enablePeer(PEER_ID);
+ }
+ }
+
+ /**
+ * Test for HBASE-9038, Replication.scopeWALEdits would NPE if it wasn't filtering out
+ * the compaction WALEdit
+ * @throws Exception
+ */
+ @Test(timeout=300000)
+ public void testCompactionWALEdits() throws Exception {
+ WALProtos.CompactionDescriptor compactionDescriptor =
+ WALProtos.CompactionDescriptor.getDefaultInstance();
+ HRegionInfo hri = new HRegionInfo(htable1.getName(),
+ HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
+ WALEdit edit = WALEdit.createCompaction(hri, compactionDescriptor);
+ Replication.scopeWALEdits(new WALKey(), edit,
+ htable1.getConfiguration(), null);
+ }
+
+ /**
+ * Test for HBASE-8663
+ * Create two new Tables with colfamilies enabled for replication then run
+ * ReplicationAdmin.listReplicated(). Finally verify the table:colfamilies. Note:
+ * TestReplicationAdmin is a better place for this testing but it would need mocks.
+ * @throws Exception
+ */
+ @Test(timeout = 300000)
+ public void testVerifyListReplicatedTable() throws Exception {
+ LOG.info("testVerifyListReplicatedTable");
+
+ final String tName = "VerifyListReplicated_";
+ final String colFam = "cf1";
+ final int numOfTables = 3;
+
+ Admin hadmin = utility1.getAdmin();
+
+ // Create Tables
+ for (int i = 0; i < numOfTables; i++) {
+ HTableDescriptor ht = new HTableDescriptor(TableName.valueOf(tName + i));
+ HColumnDescriptor cfd = new HColumnDescriptor(colFam);
+ cfd.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
+ ht.addFamily(cfd);
+ hadmin.createTable(ht);
+ }
+
+ // verify the result
+ List<HashMap<String, String>> replicationColFams = admin.listReplicated();
+ int[] match = new int[numOfTables]; // array of 3 with init value of zero
+
+ for (int i = 0; i < replicationColFams.size(); i++) {
+ HashMap<String, String> replicationEntry = replicationColFams.get(i);
+ String tn = replicationEntry.get(ReplicationAdmin.TNAME);
+ if ((tn.startsWith(tName)) && replicationEntry.get(ReplicationAdmin.CFNAME).equals(colFam)) {
+ int m = Integer.parseInt(tn.substring(tn.length() - 1)); // get the last digit
+ match[m]++; // should only increase once
+ }
+ }
+
+ // check the matching result
+ for (int i = 0; i < match.length; i++) {
+ assertTrue("listReplicated() does not match table " + i, (match[i] == 1));
+ }
+
+ // drop tables
+ for (int i = 0; i < numOfTables; i++) {
+ TableName tableName = TableName.valueOf(tName + i);
+ hadmin.disableTable(tableName);
+ hadmin.deleteTable(tableName);
+ }
+
+ hadmin.close();
+ }
+
+ /**
+ * Test for HBase-15259 WALEdits under replay will also be replicated
+ * */
+ @Test
+ public void testReplicationInReplay() throws Exception {
+ final TableName tableName = htable1.getName();
+
+ HRegion region = utility1.getMiniHBaseCluster().getRegions(tableName).get(0);
+ HRegionInfo hri = region.getRegionInfo();
+ NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
+ for (byte[] fam : htable1.getTableDescriptor().getFamiliesKeys()) {
+ scopes.put(fam, 1);
+ }
+ final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
+ int index = utility1.getMiniHBaseCluster().getServerWith(hri.getRegionName());
+ WAL wal = utility1.getMiniHBaseCluster().getRegionServer(index).getWAL(region.getRegionInfo());
+ final byte[] rowName = Bytes.toBytes("testReplicationInReplay");
+ final byte[] qualifier = Bytes.toBytes("q");
+ final byte[] value = Bytes.toBytes("v");
+ WALEdit edit = new WALEdit(true);
+ long now = EnvironmentEdgeManager.currentTime();
+ edit.add(new KeyValue(rowName, famName, qualifier,
+ now, value));
+ WALKey walKey = new WALKey(hri.getEncodedNameAsBytes(), tableName, now, mvcc, scopes);
+ wal.append(hri, walKey, edit, true);
+ wal.sync();
+
+ Get get = new Get(rowName);
+ for (int i = 0; i < NB_RETRIES; i++) {
+ if (i == NB_RETRIES-1) {
+ break;
+ }
+ Result res = htable2.get(get);
+ if (res.size() >= 1) {
+ fail("Not supposed to be replicated for " + Bytes.toString(res.getRow()));
+ } else {
+ LOG.info("Row not replicated, let's wait a bit more...");
+ Thread.sleep(SLEEP_TIME);
+ }
+ }
+ }
+
+ @Test(timeout=300000)
+ public void testVerifyReplicationPrefixFiltering() throws Exception {
+ final byte[] prefixRow = Bytes.toBytes("prefixrow");
+ final byte[] prefixRow2 = Bytes.toBytes("secondrow");
+ loadData("prefixrow", prefixRow);
+ loadData("secondrow", prefixRow2);
+ loadData("aaa", row);
+ loadData("zzz", row);
+ waitForReplication(NB_ROWS_IN_BATCH * 4, NB_RETRIES * 4);
+ String[] args = new String[] {"--row-prefixes=prefixrow,secondrow", PEER_ID,
+ tableName.getNameAsString()};
+ runVerifyReplication(args, NB_ROWS_IN_BATCH *2, 0);
+ }
+
+ @Test(timeout = 300000)
+ public void testVerifyReplicationSnapshotArguments() {
+ String[] args =
+ new String[] { "--sourceSnapshotName=snapshot1", "2", tableName.getNameAsString() };
+ assertFalse(Lists.newArrayList(args).toString(), new VerifyReplication().doCommandLine(args));
+
+ args = new String[] { "--sourceSnapshotTmpDir=tmp", "2", tableName.getNameAsString() };
+ assertFalse(Lists.newArrayList(args).toString(), new VerifyReplication().doCommandLine(args));
+
+ args = new String[] { "--sourceSnapshotName=snapshot1", "--sourceSnapshotTmpDir=tmp", "2",
+ tableName.getNameAsString() };
+ assertTrue(Lists.newArrayList(args).toString(), new VerifyReplication().doCommandLine(args));
+
+ args = new String[] { "--peerSnapshotName=snapshot1", "2", tableName.getNameAsString() };
+ assertFalse(Lists.newArrayList(args).toString(), new VerifyReplication().doCommandLine(args));
+
+ args = new String[] { "--peerSnapshotTmpDir=/tmp/", "2", tableName.getNameAsString() };
+ assertFalse(Lists.newArrayList(args).toString(), new VerifyReplication().doCommandLine(args));
+
+ args = new String[] { "--peerSnapshotName=snapshot1", "--peerSnapshotTmpDir=/tmp/",
+ "--peerFSAddress=tempfs", "--peerHBaseRootAddress=hdfs://tempfs:50070/hbase/", "2",
+ tableName.getNameAsString() };
+ assertTrue(Lists.newArrayList(args).toString(), new VerifyReplication().doCommandLine(args));
+
+ args = new String[] { "--sourceSnapshotName=snapshot1", "--sourceSnapshotTmpDir=/tmp/",
+ "--peerSnapshotName=snapshot2", "--peerSnapshotTmpDir=/tmp/", "--peerFSAddress=tempfs",
+ "--peerHBaseRootAddress=hdfs://tempfs:50070/hbase/", "2", tableName.getNameAsString() };
+
+ assertTrue(Lists.newArrayList(args).toString(), new VerifyReplication().doCommandLine(args));
+ }
+
+ @Test(timeout = 300000)
+ public void testVerifyReplicationWithSnapshotSupport() throws Exception {
+ // Populate the tables, at the same time it guarantees that the tables are
+ // identical since it does the check
+ testSmallBatch();
+
+ // Take source and target tables snapshot
+ Path rootDir = FSUtils.getRootDir(conf1);
+ FileSystem fs = rootDir.getFileSystem(conf1);
+ String sourceSnapshotName = "sourceSnapshot-" + System.currentTimeMillis();
+ SnapshotTestingUtils.createSnapshotAndValidate(utility1.getHBaseAdmin(), tableName,
+ new String(famName), sourceSnapshotName, rootDir, fs, true);
+
+ // Take target snapshot
+ Path peerRootDir = FSUtils.getRootDir(conf2);
+ FileSystem peerFs = peerRootDir.getFileSystem(conf2);
+ String peerSnapshotName = "peerSnapshot-" + System.currentTimeMillis();
+ SnapshotTestingUtils.createSnapshotAndValidate(utility2.getHBaseAdmin(), tableName,
+ new String(famName), peerSnapshotName, peerRootDir, peerFs, true);
+
+ String peerFSAddress = peerFs.getUri().toString();
+ String temPath1 = utility1.getRandomDir().toString();
+ String temPath2 = "/tmp2";
+
+ String[] args = new String[] { "--sourceSnapshotName=" + sourceSnapshotName,
+ "--sourceSnapshotTmpDir=" + temPath1, "--peerSnapshotName=" + peerSnapshotName,
+ "--peerSnapshotTmpDir=" + temPath2, "--peerFSAddress=" + peerFSAddress,
+ "--peerHBaseRootAddress=" + FSUtils.getRootDir(conf2), "2", tableName.getNameAsString() };
+
+ Job job = new VerifyReplication().createSubmittableJob(conf1, args);
+ if (job == null) {
+ fail("Job wasn't created, see the log");
+ }
+ if (!job.waitForCompletion(true)) {
+ fail("Job failed, see the log");
+ }
+ assertEquals(NB_ROWS_IN_BATCH,
+ job.getCounters().findCounter(VerifyReplication.Verifier.Counters.GOODROWS).getValue());
+ assertEquals(0,
+ job.getCounters().findCounter(VerifyReplication.Verifier.Counters.BADROWS).getValue());
+
+ Scan scan = new Scan();
+ ResultScanner rs = htable2.getScanner(scan);
+ Put put = null;
+ for (Result result : rs) {
+ put = new Put(result.getRow());
+ Cell firstVal = result.rawCells()[0];
+ put.addColumn(CellUtil.cloneFamily(firstVal), CellUtil.cloneQualifier(firstVal),
+ Bytes.toBytes("diff data"));
+ htable2.put(put);
+ }
+ Delete delete = new Delete(put.getRow());
+ htable2.delete(delete);
+
+ sourceSnapshotName = "sourceSnapshot-" + System.currentTimeMillis();
+ SnapshotTestingUtils.createSnapshotAndValidate(utility1.getHBaseAdmin(), tableName,
+ new String(famName), sourceSnapshotName, rootDir, fs, true);
+
+ peerSnapshotName = "peerSnapshot-" + System.currentTimeMillis();
+ SnapshotTestingUtils.createSnapshotAndValidate(utility2.getHBaseAdmin(), tableName,
+ new String(famName), peerSnapshotName, peerRootDir, peerFs, true);
+
+ args = new String[] { "--sourceSnapshotName=" + sourceSnapshotName,
+ "--sourceSnapshotTmpDir=" + temPath1, "--peerSnapshotName=" + peerSnapshotName,
+ "--peerSnapshotTmpDir=" + temPath2, "--peerFSAddress=" + peerFSAddress,
+ "--peerHBaseRootAddress=" + FSUtils.getRootDir(conf2), "2", tableName.getNameAsString() };
+
+ job = new VerifyReplication().createSubmittableJob(conf1, args);
+ if (job == null) {
+ fail("Job wasn't created, see the log");
+ }
+ if (!job.waitForCompletion(true)) {
+ fail("Job failed, see the log");
+ }
+ assertEquals(0,
+ job.getCounters().findCounter(VerifyReplication.Verifier.Counters.GOODROWS).getValue());
+ assertEquals(NB_ROWS_IN_BATCH,
+ job.getCounters().findCounter(VerifyReplication.Verifier.Counters.BADROWS).getValue());
+ }
+
+ @Test
+ public void testEmptyWALRecovery() throws Exception {
+ final int numRs = utility1.getHBaseCluster().getRegionServerThreads().size();
+
+ // for each RS, create an empty wal with same walGroupId
+ final List<Path> emptyWalPaths = new ArrayList<>();
+ long ts = System.currentTimeMillis();
+ for (int i = 0; i < numRs; i++) {
+ HRegionInfo regionInfo =
+ utility1.getHBaseCluster().getRegions(htable1.getName()).get(0).getRegionInfo();
+ WAL wal = utility1.getHBaseCluster().getRegionServer(i).getWAL(regionInfo);
+ Path currentWalPath = AbstractFSWALProvider.getCurrentFileName(wal);
+ String walGroupId = AbstractFSWALProvider.getWALPrefixFromWALName(currentWalPath.getName());
+ Path emptyWalPath = new Path(utility1.getDataTestDir(), walGroupId + "." + ts);
+ utility1.getTestFileSystem().create(emptyWalPath).close();
+ emptyWalPaths.add(emptyWalPath);
+ }
+
+ // inject our empty wal into the replication queue
+ for (int i = 0; i < numRs; i++) {
+ Replication replicationService =
+ (Replication) utility1.getHBaseCluster().getRegionServer(i).getReplicationSourceService();
+ replicationService.preLogRoll(null, emptyWalPaths.get(i));
+ replicationService.postLogRoll(null, emptyWalPaths.get(i));
+ }
+
+ // wait for ReplicationSource to start reading from our empty wal
+ waitForLogAdvance(numRs, emptyWalPaths, false);
+
+ // roll the original wal, which enqueues a new wal behind our empty wal
+ for (int i = 0; i < numRs; i++) {
+ HRegionInfo regionInfo =
+ utility1.getHBaseCluster().getRegions(htable1.getName()).get(0).getRegionInfo();
+ WAL wal = utility1.getHBaseCluster().getRegionServer(i).getWAL(regionInfo);
+ wal.rollWriter(true);
+ }
+
+ // ReplicationSource should advance past the empty wal, or else the test will fail
+ waitForLogAdvance(numRs, emptyWalPaths, true);
+
+ // we're now writing to the new wal
+ // if everything works, the source should've stopped reading from the empty wal, and start
+ // replicating from the new wal
+ testSimplePutDelete();
+ }
+
+ /**
+ * Waits for the ReplicationSource to start reading from the given paths
+ * @param numRs number of regionservers
+ * @param emptyWalPaths path for each regionserver
+ * @param invert if true, waits until ReplicationSource is NOT reading from the given paths
+ */
+ private void waitForLogAdvance(final int numRs, final List<Path> emptyWalPaths,
+ final boolean invert) throws Exception {
+ Waiter.waitFor(conf1, 10000, new Waiter.Predicate<Exception>() {
+ @Override
+ public boolean evaluate() throws Exception {
+ for (int i = 0; i < numRs; i++) {
+ Replication replicationService = (Replication) utility1.getHBaseCluster()
+ .getRegionServer(i).getReplicationSourceService();
+ for (ReplicationSourceInterface rsi : replicationService.getReplicationManager()
+ .getSources()) {
+ ReplicationSource source = (ReplicationSource) rsi;
+ if (!invert && !emptyWalPaths.get(i).equals(source.getCurrentPath())) {
+ return false;
+ }
+ if (invert && emptyWalPaths.get(i).equals(source.getCurrentPath())) {
+ return false;
+ }
+ }
+ }
+ return true;
+ }
+ });
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/snapshot/TestExportSnapshot.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/snapshot/TestExportSnapshot.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/snapshot/TestExportSnapshot.java
new file mode 100644
index 0000000..2e3cb5e
--- /dev/null
+++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/snapshot/TestExportSnapshot.java
@@ -0,0 +1,381 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.snapshot;
+
+import static org.apache.hadoop.util.ToolRunner.run;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.CategoryBasedTimeout;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.master.snapshot.SnapshotManager;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotRegionManifest;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.testclassification.VerySlowMapReduceTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.Ignore;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+import org.junit.rules.TestRule;
+
+/**
+ * Test Export Snapshot Tool
+ */
+@Ignore
+@Category({VerySlowMapReduceTests.class, LargeTests.class})
+public class TestExportSnapshot {
+ @Rule public final TestRule timeout = CategoryBasedTimeout.builder().
+ withTimeout(this.getClass()).withLookingForStuckThread(true).build();
+ private static final Log LOG = LogFactory.getLog(TestExportSnapshot.class);
+
+ protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+
+ protected final static byte[] FAMILY = Bytes.toBytes("cf");
+
+ @Rule
+ public final TestName testName = new TestName();
+
+ protected TableName tableName;
+ private byte[] emptySnapshotName;
+ private byte[] snapshotName;
+ private int tableNumFiles;
+ private Admin admin;
+
+ public static void setUpBaseConf(Configuration conf) {
+ conf.setBoolean(SnapshotManager.HBASE_SNAPSHOT_ENABLED, true);
+ conf.setInt("hbase.regionserver.msginterval", 100);
+ conf.setInt("hbase.client.pause", 250);
+ conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 6);
+ conf.setBoolean("hbase.master.enabletable.roundrobin", true);
+ conf.setInt("mapreduce.map.maxattempts", 10);
+ // If a single node has enough failures (default 3), resource manager will blacklist it.
+ // With only 2 nodes and tests injecting faults, we don't want that.
+ conf.setInt("mapreduce.job.maxtaskfailures.per.tracker", 100);
+ }
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ setUpBaseConf(TEST_UTIL.getConfiguration());
+ TEST_UTIL.startMiniCluster(1, 3);
+ TEST_UTIL.startMiniMapReduceCluster();
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ TEST_UTIL.shutdownMiniMapReduceCluster();
+ TEST_UTIL.shutdownMiniCluster();
+ }
+
+ /**
+ * Create a table and take a snapshot of the table used by the export test.
+ */
+ @Before
+ public void setUp() throws Exception {
+ this.admin = TEST_UTIL.getAdmin();
+
+ tableName = TableName.valueOf("testtb-" + testName.getMethodName());
+ snapshotName = Bytes.toBytes("snaptb0-" + testName.getMethodName());
+ emptySnapshotName = Bytes.toBytes("emptySnaptb0-" + testName.getMethodName());
+
+ // create Table
+ createTable();
+
+ // Take an empty snapshot
+ admin.snapshot(emptySnapshotName, tableName);
+
+ // Add some rows
+ SnapshotTestingUtils.loadData(TEST_UTIL, tableName, 50, FAMILY);
+ tableNumFiles = admin.getTableRegions(tableName).size();
+
+ // take a snapshot
+ admin.snapshot(snapshotName, tableName);
+ }
+
+ protected void createTable() throws Exception {
+ SnapshotTestingUtils.createPreSplitTable(TEST_UTIL, tableName, 2, FAMILY);
+ }
+
+ protected interface RegionPredicate {
+ boolean evaluate(final HRegionInfo regionInfo);
+ }
+
+ protected RegionPredicate getBypassRegionPredicate() {
+ return null;
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ TEST_UTIL.deleteTable(tableName);
+ SnapshotTestingUtils.deleteAllSnapshots(TEST_UTIL.getAdmin());
+ SnapshotTestingUtils.deleteArchiveDirectory(TEST_UTIL);
+ }
+
+ /**
+ * Verify if exported snapshot and copied files matches the original one.
+ */
+ @Test
+ public void testExportFileSystemState() throws Exception {
+ testExportFileSystemState(tableName, snapshotName, snapshotName, tableNumFiles);
+ }
+
+ @Test
+ public void testExportFileSystemStateWithSkipTmp() throws Exception {
+ TEST_UTIL.getConfiguration().setBoolean(ExportSnapshot.CONF_SKIP_TMP, true);
+ try {
+ testExportFileSystemState(tableName, snapshotName, snapshotName, tableNumFiles);
+ } finally {
+ TEST_UTIL.getConfiguration().setBoolean(ExportSnapshot.CONF_SKIP_TMP, false);
+ }
+ }
+
+ @Test
+ public void testEmptyExportFileSystemState() throws Exception {
+ testExportFileSystemState(tableName, emptySnapshotName, emptySnapshotName, 0);
+ }
+
+ @Test
+ public void testConsecutiveExports() throws Exception {
+ Path copyDir = getLocalDestinationDir();
+ testExportFileSystemState(tableName, snapshotName, snapshotName, tableNumFiles, copyDir, false);
+ testExportFileSystemState(tableName, snapshotName, snapshotName, tableNumFiles, copyDir, true);
+ removeExportDir(copyDir);
+ }
+
+ @Test
+ public void testExportWithTargetName() throws Exception {
+ final byte[] targetName = Bytes.toBytes("testExportWithTargetName");
+ testExportFileSystemState(tableName, snapshotName, targetName, tableNumFiles);
+ }
+
+ private void testExportFileSystemState(final TableName tableName, final byte[] snapshotName,
+ final byte[] targetName, int filesExpected) throws Exception {
+ testExportFileSystemState(tableName, snapshotName, targetName,
+ filesExpected, getHdfsDestinationDir(), false);
+ }
+
+ protected void testExportFileSystemState(final TableName tableName,
+ final byte[] snapshotName, final byte[] targetName, int filesExpected,
+ Path copyDir, boolean overwrite) throws Exception {
+ testExportFileSystemState(TEST_UTIL.getConfiguration(), tableName, snapshotName, targetName,
+ filesExpected, TEST_UTIL.getDefaultRootDirPath(), copyDir,
+ overwrite, getBypassRegionPredicate(), true);
+ }
+
+ /**
+ * Creates destination directory, runs ExportSnapshot() tool, and runs some verifications.
+ */
+ protected static void testExportFileSystemState(final Configuration conf, final TableName tableName,
+ final byte[] snapshotName, final byte[] targetName, final int filesExpected,
+ final Path sourceDir, Path copyDir, final boolean overwrite,
+ final RegionPredicate bypassregionPredicate, boolean success) throws Exception {
+ URI hdfsUri = FileSystem.get(conf).getUri();
+ FileSystem fs = FileSystem.get(copyDir.toUri(), new Configuration());
+ copyDir = copyDir.makeQualified(fs);
+
+ List<String> opts = new ArrayList<>();
+ opts.add("--snapshot");
+ opts.add(Bytes.toString(snapshotName));
+ opts.add("--copy-to");
+ opts.add(copyDir.toString());
+ if (targetName != snapshotName) {
+ opts.add("--target");
+ opts.add(Bytes.toString(targetName));
+ }
+ if (overwrite) opts.add("--overwrite");
+
+ // Export Snapshot
+ int res = run(conf, new ExportSnapshot(), opts.toArray(new String[opts.size()]));
+ assertEquals(success ? 0 : 1, res);
+ if (!success) {
+ final Path targetDir = new Path(HConstants.SNAPSHOT_DIR_NAME, Bytes.toString(targetName));
+ assertFalse(fs.exists(new Path(copyDir, targetDir)));
+ return;
+ }
+
+ // Verify File-System state
+ FileStatus[] rootFiles = fs.listStatus(copyDir);
+ assertEquals(filesExpected > 0 ? 2 : 1, rootFiles.length);
+ for (FileStatus fileStatus: rootFiles) {
+ String name = fileStatus.getPath().getName();
+ assertTrue(fileStatus.isDirectory());
+ assertTrue(name.equals(HConstants.SNAPSHOT_DIR_NAME) ||
+ name.equals(HConstants.HFILE_ARCHIVE_DIRECTORY));
+ }
+
+ // compare the snapshot metadata and verify the hfiles
+ final FileSystem hdfs = FileSystem.get(hdfsUri, conf);
+ final Path snapshotDir = new Path(HConstants.SNAPSHOT_DIR_NAME, Bytes.toString(snapshotName));
+ final Path targetDir = new Path(HConstants.SNAPSHOT_DIR_NAME, Bytes.toString(targetName));
+ verifySnapshotDir(hdfs, new Path(sourceDir, snapshotDir),
+ fs, new Path(copyDir, targetDir));
+ Set<String> snapshotFiles = verifySnapshot(conf, fs, copyDir, tableName,
+ Bytes.toString(targetName), bypassregionPredicate);
+ assertEquals(filesExpected, snapshotFiles.size());
+ }
+
+ /**
+ * Check that ExportSnapshot will succeed if something fails but the retry succeed.
+ */
+ @Test
+ public void testExportRetry() throws Exception {
+ Path copyDir = getLocalDestinationDir();
+ FileSystem fs = FileSystem.get(copyDir.toUri(), new Configuration());
+ copyDir = copyDir.makeQualified(fs);
+ Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
+ conf.setBoolean(ExportSnapshot.Testing.CONF_TEST_FAILURE, true);
+ conf.setInt(ExportSnapshot.Testing.CONF_TEST_FAILURE_COUNT, 2);
+ conf.setInt("mapreduce.map.maxattempts", 3);
+ testExportFileSystemState(conf, tableName, snapshotName, snapshotName, tableNumFiles,
+ TEST_UTIL.getDefaultRootDirPath(), copyDir, true, getBypassRegionPredicate(), true);
+ }
+
+ /**
+ * Check that ExportSnapshot will fail if we inject failure more times than MR will retry.
+ */
+ @Test
+ public void testExportFailure() throws Exception {
+ Path copyDir = getLocalDestinationDir();
+ FileSystem fs = FileSystem.get(copyDir.toUri(), new Configuration());
+ copyDir = copyDir.makeQualified(fs);
+ Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
+ conf.setBoolean(ExportSnapshot.Testing.CONF_TEST_FAILURE, true);
+ conf.setInt(ExportSnapshot.Testing.CONF_TEST_FAILURE_COUNT, 4);
+ conf.setInt("mapreduce.map.maxattempts", 3);
+ testExportFileSystemState(conf, tableName, snapshotName, snapshotName, tableNumFiles,
+ TEST_UTIL.getDefaultRootDirPath(), copyDir, true, getBypassRegionPredicate(), false);
+ }
+
+ /*
+ * verify if the snapshot folder on file-system 1 match the one on file-system 2
+ */
+ protected static void verifySnapshotDir(final FileSystem fs1, final Path root1,
+ final FileSystem fs2, final Path root2) throws IOException {
+ assertEquals(listFiles(fs1, root1, root1), listFiles(fs2, root2, root2));
+ }
+
+ protected Set<String> verifySnapshot(final FileSystem fs, final Path rootDir,
+ final TableName tableName, final String snapshotName) throws IOException {
+ return verifySnapshot(TEST_UTIL.getConfiguration(), fs, rootDir, tableName,
+ snapshotName, getBypassRegionPredicate());
+ }
+
+ /*
+ * Verify if the files exists
+ */
+ protected static Set<String> verifySnapshot(final Configuration conf, final FileSystem fs,
+ final Path rootDir, final TableName tableName, final String snapshotName,
+ final RegionPredicate bypassregionPredicate) throws IOException {
+ final Path exportedSnapshot = new Path(rootDir,
+ new Path(HConstants.SNAPSHOT_DIR_NAME, snapshotName));
+ final Set<String> snapshotFiles = new HashSet<>();
+ final Path exportedArchive = new Path(rootDir, HConstants.HFILE_ARCHIVE_DIRECTORY);
+ SnapshotReferenceUtil.visitReferencedFiles(conf, fs, exportedSnapshot,
+ new SnapshotReferenceUtil.SnapshotVisitor() {
+ @Override
+ public void storeFile(final HRegionInfo regionInfo, final String family,
+ final SnapshotRegionManifest.StoreFile storeFile) throws IOException {
+ if (bypassregionPredicate != null && bypassregionPredicate.evaluate(regionInfo))
+ return;
+
+ String hfile = storeFile.getName();
+ snapshotFiles.add(hfile);
+ if (storeFile.hasReference()) {
+ // Nothing to do here, we have already the reference embedded
+ } else {
+ verifyNonEmptyFile(new Path(exportedArchive,
+ new Path(FSUtils.getTableDir(new Path("./"), tableName),
+ new Path(regionInfo.getEncodedName(), new Path(family, hfile)))));
+ }
+ }
+
+ private void verifyNonEmptyFile(final Path path) throws IOException {
+ assertTrue(path + " should exists", fs.exists(path));
+ assertTrue(path + " should not be empty", fs.getFileStatus(path).getLen() > 0);
+ }
+ });
+
+ // Verify Snapshot description
+ SnapshotDescription desc = SnapshotDescriptionUtils.readSnapshotInfo(fs, exportedSnapshot);
+ assertTrue(desc.getName().equals(snapshotName));
+ assertTrue(desc.getTable().equals(tableName.getNameAsString()));
+ return snapshotFiles;
+ }
+
+ private static Set<String> listFiles(final FileSystem fs, final Path root, final Path dir)
+ throws IOException {
+ Set<String> files = new HashSet<>();
+ int rootPrefix = root.makeQualified(fs).toString().length();
+ FileStatus[] list = FSUtils.listStatus(fs, dir);
+ if (list != null) {
+ for (FileStatus fstat: list) {
+ LOG.debug(fstat.getPath());
+ if (fstat.isDirectory()) {
+ files.addAll(listFiles(fs, root, fstat.getPath()));
+ } else {
+ files.add(fstat.getPath().makeQualified(fs).toString().substring(rootPrefix));
+ }
+ }
+ }
+ return files;
+ }
+
+ private Path getHdfsDestinationDir() {
+ Path rootDir = TEST_UTIL.getHBaseCluster().getMaster().getMasterFileSystem().getRootDir();
+ Path path = new Path(new Path(rootDir, "export-test"), "export-" + System.currentTimeMillis());
+ LOG.info("HDFS export destination path: " + path);
+ return path;
+ }
+
+ private Path getLocalDestinationDir() {
+ Path path = TEST_UTIL.getDataTestDir("local-export-" + System.currentTimeMillis());
+ LOG.info("Local export destination path: " + path);
+ return path;
+ }
+
+ private static void removeExportDir(final Path path) throws IOException {
+ FileSystem fs = FileSystem.get(path.toUri(), new Configuration());
+ fs.delete(path, true);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/snapshot/TestExportSnapshotHelpers.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/snapshot/TestExportSnapshotHelpers.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/snapshot/TestExportSnapshotHelpers.java
new file mode 100644
index 0000000..e31e81e
--- /dev/null
+++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/snapshot/TestExportSnapshotHelpers.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.snapshot;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotFileInfo;
+import org.apache.hadoop.hbase.testclassification.RegionServerTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.util.Pair;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+/**
+ * Test Export Snapshot Tool helpers
+ */
+@Category({RegionServerTests.class, SmallTests.class})
+public class TestExportSnapshotHelpers {
+ /**
+ * Verfy the result of getBalanceSplits() method.
+ * The result are groups of files, used as input list for the "export" mappers.
+ * All the groups should have similar amount of data.
+ *
+ * The input list is a pair of file path and length.
+ * The getBalanceSplits() function sort it by length,
+ * and assign to each group a file, going back and forth through the groups.
+ */
+ @Test
+ public void testBalanceSplit() throws Exception {
+ // Create a list of files
+ List<Pair<SnapshotFileInfo, Long>> files = new ArrayList<>(21);
+ for (long i = 0; i <= 20; i++) {
+ SnapshotFileInfo fileInfo = SnapshotFileInfo.newBuilder()
+ .setType(SnapshotFileInfo.Type.HFILE)
+ .setHfile("file-" + i)
+ .build();
+ files.add(new Pair<>(fileInfo, i));
+ }
+
+ // Create 5 groups (total size 210)
+ // group 0: 20, 11, 10, 1 (total size: 42)
+ // group 1: 19, 12, 9, 2 (total size: 42)
+ // group 2: 18, 13, 8, 3 (total size: 42)
+ // group 3: 17, 12, 7, 4 (total size: 42)
+ // group 4: 16, 11, 6, 5 (total size: 42)
+ List<List<Pair<SnapshotFileInfo, Long>>> splits = ExportSnapshot.getBalancedSplits(files, 5);
+ assertEquals(5, splits.size());
+
+ String[] split0 = new String[] {"file-20", "file-11", "file-10", "file-1", "file-0"};
+ verifyBalanceSplit(splits.get(0), split0, 42);
+ String[] split1 = new String[] {"file-19", "file-12", "file-9", "file-2"};
+ verifyBalanceSplit(splits.get(1), split1, 42);
+ String[] split2 = new String[] {"file-18", "file-13", "file-8", "file-3"};
+ verifyBalanceSplit(splits.get(2), split2, 42);
+ String[] split3 = new String[] {"file-17", "file-14", "file-7", "file-4"};
+ verifyBalanceSplit(splits.get(3), split3, 42);
+ String[] split4 = new String[] {"file-16", "file-15", "file-6", "file-5"};
+ verifyBalanceSplit(splits.get(4), split4, 42);
+ }
+
+ private void verifyBalanceSplit(final List<Pair<SnapshotFileInfo, Long>> split,
+ final String[] expected, final long expectedSize) {
+ assertEquals(expected.length, split.size());
+ long totalSize = 0;
+ for (int i = 0; i < expected.length; ++i) {
+ Pair<SnapshotFileInfo, Long> fileInfo = split.get(i);
+ assertEquals(expected[i], fileInfo.getFirst().getHfile());
+ totalSize += fileInfo.getSecond();
+ }
+ assertEquals(expectedSize, totalSize);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/snapshot/TestExportSnapshotNoCluster.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/snapshot/TestExportSnapshotNoCluster.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/snapshot/TestExportSnapshotNoCluster.java
new file mode 100644
index 0000000..00778502
--- /dev/null
+++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/snapshot/TestExportSnapshotNoCluster.java
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.snapshot;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.CategoryBasedTimeout;
+import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.master.snapshot.SnapshotManager;
+import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils.SnapshotMock;
+import org.apache.hadoop.hbase.testclassification.MapReduceTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestRule;
+
+/**
+ * Test Export Snapshot Tool
+ */
+@Category({MapReduceTests.class, MediumTests.class})
+public class TestExportSnapshotNoCluster {
+ @Rule public final TestRule timeout = CategoryBasedTimeout.builder().
+ withTimeout(this.getClass()).withLookingForStuckThread(true).build();
+ private static final Log LOG = LogFactory.getLog(TestExportSnapshotNoCluster.class);
+
+ protected final static HBaseCommonTestingUtility TEST_UTIL = new HBaseCommonTestingUtility();
+
+ private static FileSystem fs;
+ private static Path testDir;
+
+ public static void setUpBaseConf(Configuration conf) {
+ conf.setBoolean(SnapshotManager.HBASE_SNAPSHOT_ENABLED, true);
+ conf.setInt("hbase.regionserver.msginterval", 100);
+ conf.setInt("hbase.client.pause", 250);
+ conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 6);
+ conf.setBoolean("hbase.master.enabletable.roundrobin", true);
+ conf.setInt("mapreduce.map.maxattempts", 10);
+ conf.set(HConstants.HBASE_DIR, testDir.toString());
+ }
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ testDir = TEST_UTIL.getDataTestDir();
+ fs = testDir.getFileSystem(TEST_UTIL.getConfiguration());
+
+ setUpBaseConf(TEST_UTIL.getConfiguration());
+ }
+
+ /**
+ * Mock a snapshot with files in the archive dir,
+ * two regions, and one reference file.
+ */
+ @Test
+ public void testSnapshotWithRefsExportFileSystemState() throws Exception {
+ SnapshotMock snapshotMock = new SnapshotMock(TEST_UTIL.getConfiguration(), fs, testDir);
+ SnapshotMock.SnapshotBuilder builder = snapshotMock.createSnapshotV2("tableWithRefsV1",
+ "tableWithRefsV1");
+ testSnapshotWithRefsExportFileSystemState(builder);
+
+ snapshotMock = new SnapshotMock(TEST_UTIL.getConfiguration(), fs, testDir);
+ builder = snapshotMock.createSnapshotV2("tableWithRefsV2", "tableWithRefsV2");
+ testSnapshotWithRefsExportFileSystemState(builder);
+ }
+
+ /**
+ * Generates a couple of regions for the specified SnapshotMock,
+ * and then it will run the export and verification.
+ */
+ private void testSnapshotWithRefsExportFileSystemState(SnapshotMock.SnapshotBuilder builder)
+ throws Exception {
+ Path[] r1Files = builder.addRegion();
+ Path[] r2Files = builder.addRegion();
+ builder.commit();
+ int snapshotFilesCount = r1Files.length + r2Files.length;
+
+ byte[] snapshotName = Bytes.toBytes(builder.getSnapshotDescription().getName());
+ TableName tableName = builder.getTableDescriptor().getTableName();
+ TestExportSnapshot.testExportFileSystemState(TEST_UTIL.getConfiguration(),
+ tableName, snapshotName, snapshotName, snapshotFilesCount,
+ testDir, getDestinationDir(), false, null, true);
+ }
+
+ private Path getDestinationDir() {
+ Path path = new Path(new Path(testDir, "export-test"), "export-" + System.currentTimeMillis());
+ LOG.info("HDFS export destination path: " + path);
+ return path;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/snapshot/TestMobExportSnapshot.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/snapshot/TestMobExportSnapshot.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/snapshot/TestMobExportSnapshot.java
new file mode 100644
index 0000000..7407a7d
--- /dev/null
+++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/snapshot/TestMobExportSnapshot.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.snapshot;
+
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.mob.MobConstants;
+import org.apache.hadoop.hbase.mob.MobUtils;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.testclassification.VerySlowRegionServerTests;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.experimental.categories.Category;
+
+/**
+ * Test Export Snapshot Tool
+ */
+@Ignore
+@Category({VerySlowRegionServerTests.class, LargeTests.class})
+public class TestMobExportSnapshot extends TestExportSnapshot {
+
+ public static void setUpBaseConf(Configuration conf) {
+ TestExportSnapshot.setUpBaseConf(conf);
+ conf.setInt(MobConstants.MOB_FILE_CACHE_SIZE_KEY, 0);
+ }
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ setUpBaseConf(TEST_UTIL.getConfiguration());
+ TEST_UTIL.startMiniCluster(1, 3);
+ TEST_UTIL.startMiniMapReduceCluster();
+ }
+
+ @Override
+ protected void createTable() throws Exception {
+ MobSnapshotTestingUtils.createPreSplitMobTable(TEST_UTIL, tableName, 2, FAMILY);
+ }
+
+ @Override
+ protected RegionPredicate getBypassRegionPredicate() {
+ return new RegionPredicate() {
+ @Override
+ public boolean evaluate(final HRegionInfo regionInfo) {
+ return MobUtils.isMobRegionInfo(regionInfo);
+ }
+ };
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/snapshot/TestMobSecureExportSnapshot.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/snapshot/TestMobSecureExportSnapshot.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/snapshot/TestMobSecureExportSnapshot.java
new file mode 100644
index 0000000..98d03c0
--- /dev/null
+++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/snapshot/TestMobSecureExportSnapshot.java
@@ -0,0 +1,59 @@
+/**
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.snapshot;
+
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.testclassification.VerySlowRegionServerTests;
+import org.apache.hadoop.hbase.mapreduce.HadoopSecurityEnabledUserProviderForTesting;
+import org.apache.hadoop.hbase.security.UserProvider;
+import org.apache.hadoop.hbase.security.access.AccessControlLists;
+import org.apache.hadoop.hbase.security.access.SecureTestUtil;
+
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.experimental.categories.Category;
+
+/**
+ * Reruns TestMobExportSnapshot using MobExportSnapshot in secure mode.
+ */
+@Ignore
+@Category({VerySlowRegionServerTests.class, LargeTests.class})
+public class TestMobSecureExportSnapshot extends TestMobExportSnapshot {
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ setUpBaseConf(TEST_UTIL.getConfiguration());
+ // Setup separate test-data directory for MR cluster and set corresponding configurations.
+ // Otherwise, different test classes running MR cluster can step on each other.
+ TEST_UTIL.getDataTestDir();
+
+ // set the always on security provider
+ UserProvider.setUserProviderForTesting(TEST_UTIL.getConfiguration(),
+ HadoopSecurityEnabledUserProviderForTesting.class);
+
+ // setup configuration
+ SecureTestUtil.enableSecurity(TEST_UTIL.getConfiguration());
+
+ TEST_UTIL.startMiniCluster(1, 3);
+ TEST_UTIL.startMiniMapReduceCluster();
+
+ // Wait for the ACL table to become available
+ TEST_UTIL.waitTableEnabled(AccessControlLists.ACL_TABLE_NAME);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/snapshot/TestSecureExportSnapshot.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/snapshot/TestSecureExportSnapshot.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/snapshot/TestSecureExportSnapshot.java
new file mode 100644
index 0000000..7d4832c
--- /dev/null
+++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/snapshot/TestSecureExportSnapshot.java
@@ -0,0 +1,64 @@
+/**
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.snapshot;
+
+import org.apache.hadoop.hbase.CategoryBasedTimeout;
+import org.apache.hadoop.hbase.mapreduce.HadoopSecurityEnabledUserProviderForTesting;
+import org.apache.hadoop.hbase.security.UserProvider;
+import org.apache.hadoop.hbase.security.access.AccessControlLists;
+import org.apache.hadoop.hbase.security.access.SecureTestUtil;
+
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.testclassification.VerySlowRegionServerTests;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Rule;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestRule;
+
+/**
+ * Reruns TestExportSnapshot using ExportSnapshot in secure mode.
+ */
+@Ignore
+@Category({VerySlowRegionServerTests.class, LargeTests.class})
+public class TestSecureExportSnapshot extends TestExportSnapshot {
+ @Rule public final TestRule timeout = CategoryBasedTimeout.builder().
+ withTimeout(this.getClass()).withLookingForStuckThread(true).build();
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ setUpBaseConf(TEST_UTIL.getConfiguration());
+ // Setup separate test-data directory for MR cluster and set corresponding configurations.
+ // Otherwise, different test classes running MR cluster can step on each other.
+ TEST_UTIL.getDataTestDir();
+
+ // set the always on security provider
+ UserProvider.setUserProviderForTesting(TEST_UTIL.getConfiguration(),
+ HadoopSecurityEnabledUserProviderForTesting.class);
+
+ // setup configuration
+ SecureTestUtil.enableSecurity(TEST_UTIL.getConfiguration());
+
+ TEST_UTIL.startMiniCluster(1, 3);
+ TEST_UTIL.startMiniMapReduceCluster();
+
+ // Wait for the ACL table to become available
+ TEST_UTIL.waitTableEnabled(AccessControlLists.ACL_TABLE_NAME);
+ }
+}