You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by bu...@apache.org on 2017/08/27 05:33:29 UTC
[29/50] [abbrv] hbase git commit: HBASE-18640 Move mapreduce out of
hbase-server into separate module.
http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatScanBase.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatScanBase.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatScanBase.java
new file mode 100644
index 0000000..13b6a96
--- /dev/null
+++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatScanBase.java
@@ -0,0 +1,287 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.NavigableMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+
+
+/**
+ * <p>
+ * Tests various scan start and stop row scenarios. This is set in a scan and
+ * tested in a MapReduce job to see if that is handed over and done properly
+ * too.
+ * </p>
+ * <p>
+ * This test is broken into two parts in order to side-step the test timeout
+ * period of 900, as documented in HBASE-8326.
+ * </p>
+ */
+public abstract class TestTableInputFormatScanBase {
+
+ private static final Log LOG = LogFactory.getLog(TestTableInputFormatScanBase.class);
+ static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+
+ static final TableName TABLE_NAME = TableName.valueOf("scantest");
+ static final byte[][] INPUT_FAMILYS = {Bytes.toBytes("content1"), Bytes.toBytes("content2")};
+ static final String KEY_STARTROW = "startRow";
+ static final String KEY_LASTROW = "stpRow";
+
+ private static Table table = null;
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ // test intermittently fails under hadoop2 (2.0.2-alpha) if shortcircuit-read (scr) is on.
+ // this turns it off for this test. TODO: Figure out why scr breaks recovery.
+ System.setProperty("hbase.tests.use.shortcircuit.reads", "false");
+
+ // switch TIF to log at DEBUG level
+ TEST_UTIL.enableDebug(TableInputFormat.class);
+ TEST_UTIL.enableDebug(TableInputFormatBase.class);
+ // start mini hbase cluster
+ TEST_UTIL.startMiniCluster(3);
+ // create and fill table
+ table = TEST_UTIL.createMultiRegionTable(TABLE_NAME, INPUT_FAMILYS);
+ TEST_UTIL.loadTable(table, INPUT_FAMILYS, null, false);
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ TEST_UTIL.shutdownMiniCluster();
+ }
+
+ /**
+ * Pass the key and value to reduce.
+ */
+ public static class ScanMapper
+ extends TableMapper<ImmutableBytesWritable, ImmutableBytesWritable> {
+
+ /**
+ * Pass the key and value to reduce.
+ *
+ * @param key The key, here "aaa", "aab" etc.
+ * @param value The value is the same as the key.
+ * @param context The task context.
+ * @throws IOException When reading the rows fails.
+ */
+ @Override
+ public void map(ImmutableBytesWritable key, Result value,
+ Context context)
+ throws IOException, InterruptedException {
+ if (value.size() != 2) {
+ throw new IOException("There should be two input columns");
+ }
+ Map<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>>
+ cfMap = value.getMap();
+
+ if (!cfMap.containsKey(INPUT_FAMILYS[0]) || !cfMap.containsKey(INPUT_FAMILYS[1])) {
+ throw new IOException("Wrong input columns. Missing: '" +
+ Bytes.toString(INPUT_FAMILYS[0]) + "' or '" + Bytes.toString(INPUT_FAMILYS[1]) + "'.");
+ }
+
+ String val0 = Bytes.toStringBinary(value.getValue(INPUT_FAMILYS[0], null));
+ String val1 = Bytes.toStringBinary(value.getValue(INPUT_FAMILYS[1], null));
+ LOG.info("map: key -> " + Bytes.toStringBinary(key.get()) +
+ ", value -> (" + val0 + ", " + val1 + ")");
+ context.write(key, key);
+ }
+ }
+
+ /**
+ * Checks the last and first key seen against the scanner boundaries.
+ */
+ public static class ScanReducer
+ extends Reducer<ImmutableBytesWritable, ImmutableBytesWritable,
+ NullWritable, NullWritable> {
+
+ private String first = null;
+ private String last = null;
+
+ protected void reduce(ImmutableBytesWritable key,
+ Iterable<ImmutableBytesWritable> values, Context context)
+ throws IOException ,InterruptedException {
+ int count = 0;
+ for (ImmutableBytesWritable value : values) {
+ String val = Bytes.toStringBinary(value.get());
+ LOG.info("reduce: key[" + count + "] -> " +
+ Bytes.toStringBinary(key.get()) + ", value -> " + val);
+ if (first == null) first = val;
+ last = val;
+ count++;
+ }
+ }
+
+ protected void cleanup(Context context)
+ throws IOException, InterruptedException {
+ Configuration c = context.getConfiguration();
+ String startRow = c.get(KEY_STARTROW);
+ String lastRow = c.get(KEY_LASTROW);
+ LOG.info("cleanup: first -> \"" + first + "\", start row -> \"" + startRow + "\"");
+ LOG.info("cleanup: last -> \"" + last + "\", last row -> \"" + lastRow + "\"");
+ if (startRow != null && startRow.length() > 0) {
+ assertEquals(startRow, first);
+ }
+ if (lastRow != null && lastRow.length() > 0) {
+ assertEquals(lastRow, last);
+ }
+ }
+
+ }
+
+ /**
+ * Tests an MR Scan initialized from properties set in the Configuration.
+ *
+ * @throws IOException
+ * @throws ClassNotFoundException
+ * @throws InterruptedException
+ */
+ protected void testScanFromConfiguration(String start, String stop, String last)
+ throws IOException, InterruptedException, ClassNotFoundException {
+ String jobName = "ScanFromConfig" + (start != null ? start.toUpperCase(Locale.ROOT) : "Empty") +
+ "To" + (stop != null ? stop.toUpperCase(Locale.ROOT) : "Empty");
+ Configuration c = new Configuration(TEST_UTIL.getConfiguration());
+ c.set(TableInputFormat.INPUT_TABLE, TABLE_NAME.getNameAsString());
+ c.set(TableInputFormat.SCAN_COLUMN_FAMILY, Bytes.toString(INPUT_FAMILYS[0]) + ", "
+ + Bytes.toString(INPUT_FAMILYS[1]));
+ c.set(KEY_STARTROW, start != null ? start : "");
+ c.set(KEY_LASTROW, last != null ? last : "");
+
+ if (start != null) {
+ c.set(TableInputFormat.SCAN_ROW_START, start);
+ }
+
+ if (stop != null) {
+ c.set(TableInputFormat.SCAN_ROW_STOP, stop);
+ }
+
+ Job job = new Job(c, jobName);
+ job.setMapperClass(ScanMapper.class);
+ job.setReducerClass(ScanReducer.class);
+ job.setMapOutputKeyClass(ImmutableBytesWritable.class);
+ job.setMapOutputValueClass(ImmutableBytesWritable.class);
+ job.setInputFormatClass(TableInputFormat.class);
+ job.setNumReduceTasks(1);
+ FileOutputFormat.setOutputPath(job, new Path(job.getJobName()));
+ TableMapReduceUtil.addDependencyJars(job);
+ assertTrue(job.waitForCompletion(true));
+ }
+
+ /**
+ * Tests a MR scan using specific start and stop rows.
+ *
+ * @throws IOException
+ * @throws ClassNotFoundException
+ * @throws InterruptedException
+ */
+ protected void testScan(String start, String stop, String last)
+ throws IOException, InterruptedException, ClassNotFoundException {
+ String jobName = "Scan" + (start != null ? start.toUpperCase(Locale.ROOT) : "Empty") +
+ "To" + (stop != null ? stop.toUpperCase(Locale.ROOT) : "Empty");
+ LOG.info("Before map/reduce startup - job " + jobName);
+ Configuration c = new Configuration(TEST_UTIL.getConfiguration());
+ Scan scan = new Scan();
+ scan.addFamily(INPUT_FAMILYS[0]);
+ scan.addFamily(INPUT_FAMILYS[1]);
+ if (start != null) {
+ scan.setStartRow(Bytes.toBytes(start));
+ }
+ c.set(KEY_STARTROW, start != null ? start : "");
+ if (stop != null) {
+ scan.setStopRow(Bytes.toBytes(stop));
+ }
+ c.set(KEY_LASTROW, last != null ? last : "");
+ LOG.info("scan before: " + scan);
+ Job job = new Job(c, jobName);
+ TableMapReduceUtil.initTableMapperJob(
+ TABLE_NAME, scan, ScanMapper.class,
+ ImmutableBytesWritable.class, ImmutableBytesWritable.class, job);
+ job.setReducerClass(ScanReducer.class);
+ job.setNumReduceTasks(1); // one to get final "first" and "last" key
+ FileOutputFormat.setOutputPath(job, new Path(job.getJobName()));
+ LOG.info("Started " + job.getJobName());
+ assertTrue(job.waitForCompletion(true));
+ LOG.info("After map/reduce completion - job " + jobName);
+ }
+
+
+ /**
+ * Tests a MR scan using data skew auto-balance
+ *
+ * @throws IOException
+ * @throws ClassNotFoundException
+ * @throws InterruptedException
+ */
+ public void testNumOfSplits(String ratio, int expectedNumOfSplits) throws IOException,
+ InterruptedException,
+ ClassNotFoundException {
+ String jobName = "TestJobForNumOfSplits";
+ LOG.info("Before map/reduce startup - job " + jobName);
+ Configuration c = new Configuration(TEST_UTIL.getConfiguration());
+ Scan scan = new Scan();
+ scan.addFamily(INPUT_FAMILYS[0]);
+ scan.addFamily(INPUT_FAMILYS[1]);
+ c.set("hbase.mapreduce.input.autobalance", "true");
+ c.set("hbase.mapreduce.input.autobalance.maxskewratio", ratio);
+ c.set(KEY_STARTROW, "");
+ c.set(KEY_LASTROW, "");
+ Job job = new Job(c, jobName);
+ TableMapReduceUtil.initTableMapperJob(TABLE_NAME.getNameAsString(), scan, ScanMapper.class,
+ ImmutableBytesWritable.class, ImmutableBytesWritable.class, job);
+ TableInputFormat tif = new TableInputFormat();
+ tif.setConf(job.getConfiguration());
+ Assert.assertEquals(TABLE_NAME, table.getName());
+ List<InputSplit> splits = tif.getSplits(job);
+ Assert.assertEquals(expectedNumOfSplits, splits.size());
+ }
+
+ /**
+ * Tests for the getSplitKey() method in TableInputFormatBase.java
+ */
+ public void testGetSplitKey(byte[] startKey, byte[] endKey, byte[] splitKey, boolean isText) {
+ byte[] result = TableInputFormatBase.getSplitKey(startKey, endKey, isText);
+ Assert.assertArrayEquals(splitKey, result);
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduce.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduce.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduce.java
new file mode 100644
index 0000000..d702e0d
--- /dev/null
+++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduce.java
@@ -0,0 +1,174 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Map;
+import java.util.NavigableMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.TableNotEnabledException;
+import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.testclassification.VerySlowMapReduceTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+/**
+ * Test Map/Reduce job over HBase tables. The map/reduce process we're testing
+ * on our tables is simple - take every row in the table, reverse the value of
+ * a particular cell, and write it back to the table.
+ */
+
+@Category({VerySlowMapReduceTests.class, LargeTests.class})
+public class TestTableMapReduce extends TestTableMapReduceBase {
+ private static final Log LOG = LogFactory.getLog(TestTableMapReduce.class);
+
+ @Override
+ protected Log getLog() { return LOG; }
+
+ /**
+ * Pass the given key and processed record reduce
+ */
+ static class ProcessContentsMapper extends TableMapper<ImmutableBytesWritable, Put> {
+
+ /**
+ * Pass the key, and reversed value to reduce
+ *
+ * @param key
+ * @param value
+ * @param context
+ * @throws IOException
+ */
+ @Override
+ public void map(ImmutableBytesWritable key, Result value,
+ Context context)
+ throws IOException, InterruptedException {
+ if (value.size() != 1) {
+ throw new IOException("There should only be one input column");
+ }
+ Map<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>>
+ cf = value.getMap();
+ if(!cf.containsKey(INPUT_FAMILY)) {
+ throw new IOException("Wrong input columns. Missing: '" +
+ Bytes.toString(INPUT_FAMILY) + "'.");
+ }
+
+ // Get the original value and reverse it
+ String originalValue = Bytes.toString(value.getValue(INPUT_FAMILY, INPUT_FAMILY));
+ StringBuilder newValue = new StringBuilder(originalValue);
+ newValue.reverse();
+ // Now set the value to be collected
+ Put outval = new Put(key.get());
+ outval.addColumn(OUTPUT_FAMILY, null, Bytes.toBytes(newValue.toString()));
+ context.write(key, outval);
+ }
+ }
+
+ @Override
+ protected void runTestOnTable(Table table) throws IOException {
+ Job job = null;
+ try {
+ LOG.info("Before map/reduce startup");
+ job = new Job(table.getConfiguration(), "process column contents");
+ job.setNumReduceTasks(1);
+ Scan scan = new Scan();
+ scan.addFamily(INPUT_FAMILY);
+ TableMapReduceUtil.initTableMapperJob(
+ table.getName().getNameAsString(), scan,
+ ProcessContentsMapper.class, ImmutableBytesWritable.class,
+ Put.class, job);
+ TableMapReduceUtil.initTableReducerJob(
+ table.getName().getNameAsString(),
+ IdentityTableReducer.class, job);
+ FileOutputFormat.setOutputPath(job, new Path("test"));
+ LOG.info("Started " + table.getName().getNameAsString());
+ assertTrue(job.waitForCompletion(true));
+ LOG.info("After map/reduce completion");
+
+ // verify map-reduce results
+ verify(table.getName());
+
+ verifyJobCountersAreEmitted(job);
+ } catch (InterruptedException e) {
+ throw new IOException(e);
+ } catch (ClassNotFoundException e) {
+ throw new IOException(e);
+ } finally {
+ table.close();
+ if (job != null) {
+ FileUtil.fullyDelete(
+ new File(job.getConfiguration().get("hadoop.tmp.dir")));
+ }
+ }
+ }
+
+ /**
+ * Verify scan counters are emitted from the job
+ * @param job
+ * @throws IOException
+ */
+ private void verifyJobCountersAreEmitted(Job job) throws IOException {
+ Counters counters = job.getCounters();
+ Counter counter
+ = counters.findCounter(TableRecordReaderImpl.HBASE_COUNTER_GROUP_NAME, "RPC_CALLS");
+ assertNotNull("Unable to find Job counter for HBase scan metrics, RPC_CALLS", counter);
+ assertTrue("Counter value for RPC_CALLS should be larger than 0", counter.getValue() > 0);
+ }
+
+ @Test(expected = TableNotEnabledException.class)
+ public void testWritingToDisabledTable() throws IOException {
+
+ try (Admin admin = UTIL.getConnection().getAdmin();
+ Table table = UTIL.getConnection().getTable(TABLE_FOR_NEGATIVE_TESTS)) {
+ admin.disableTable(table.getName());
+ runTestOnTable(table);
+ fail("Should not have reached here, should have thrown an exception");
+ }
+ }
+
+ @Test(expected = TableNotFoundException.class)
+ public void testWritingToNonExistentTable() throws IOException {
+
+ try (Table table = UTIL.getConnection().getTable(TableName.valueOf("table-does-not-exist"))) {
+ runTestOnTable(table);
+ fail("Should not have reached here, should have thrown an exception");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduceBase.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduceBase.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduceBase.java
new file mode 100644
index 0000000..27bf063
--- /dev/null
+++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduceBase.java
@@ -0,0 +1,233 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.NavigableMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.CategoryBasedTimeout;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestRule;
+
+/**
+ * A base class for a test Map/Reduce job over HBase tables. The map/reduce process we're testing
+ * on our tables is simple - take every row in the table, reverse the value of a particular cell,
+ * and write it back to the table. Implements common components between mapred and mapreduce
+ * implementations.
+ */
+public abstract class TestTableMapReduceBase {
+ @Rule public final TestRule timeout = CategoryBasedTimeout.builder().
+ withTimeout(this.getClass()).withLookingForStuckThread(true).build();
+ protected static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
+ protected static final TableName MULTI_REGION_TABLE_NAME = TableName.valueOf("mrtest");
+ protected static final TableName TABLE_FOR_NEGATIVE_TESTS = TableName.valueOf("testfailuretable");
+ protected static final byte[] INPUT_FAMILY = Bytes.toBytes("contents");
+ protected static final byte[] OUTPUT_FAMILY = Bytes.toBytes("text");
+
+ protected static final byte[][] columns = new byte[][] {
+ INPUT_FAMILY,
+ OUTPUT_FAMILY
+ };
+
+ /**
+ * Retrieve my logger instance.
+ */
+ protected abstract Log getLog();
+
+ /**
+ * Handles API-specifics for setting up and executing the job.
+ */
+ protected abstract void runTestOnTable(Table table) throws IOException;
+
+ @BeforeClass
+ public static void beforeClass() throws Exception {
+ UTIL.startMiniCluster();
+ Table table =
+ UTIL.createMultiRegionTable(MULTI_REGION_TABLE_NAME, new byte[][] { INPUT_FAMILY,
+ OUTPUT_FAMILY });
+ UTIL.loadTable(table, INPUT_FAMILY, false);
+ UTIL.createTable(TABLE_FOR_NEGATIVE_TESTS, new byte[][] { INPUT_FAMILY, OUTPUT_FAMILY });
+ }
+
+ @AfterClass
+ public static void afterClass() throws Exception {
+ UTIL.deleteTable(TABLE_FOR_NEGATIVE_TESTS);
+ UTIL.shutdownMiniCluster();
+ }
+
+ /**
+ * Test a map/reduce against a multi-region table
+ * @throws IOException
+ */
+ @Test
+ public void testMultiRegionTable() throws IOException {
+ runTestOnTable(UTIL.getConnection().getTable(MULTI_REGION_TABLE_NAME));
+ }
+
+ @Test
+ public void testCombiner() throws IOException {
+ Configuration conf = new Configuration(UTIL.getConfiguration());
+ // force use of combiner for testing purposes
+ conf.setInt("mapreduce.map.combine.minspills", 1);
+ runTestOnTable(UTIL.getConnection().getTable(MULTI_REGION_TABLE_NAME));
+ }
+
+ /**
+ * Implements mapper logic for use across APIs.
+ */
+ protected static Put map(ImmutableBytesWritable key, Result value) throws IOException {
+ if (value.size() != 1) {
+ throw new IOException("There should only be one input column");
+ }
+ Map<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>>
+ cf = value.getMap();
+ if(!cf.containsKey(INPUT_FAMILY)) {
+ throw new IOException("Wrong input columns. Missing: '" +
+ Bytes.toString(INPUT_FAMILY) + "'.");
+ }
+
+ // Get the original value and reverse it
+
+ String originalValue = Bytes.toString(value.getValue(INPUT_FAMILY, INPUT_FAMILY));
+ StringBuilder newValue = new StringBuilder(originalValue);
+ newValue.reverse();
+
+ // Now set the value to be collected
+
+ Put outval = new Put(key.get());
+ outval.addColumn(OUTPUT_FAMILY, null, Bytes.toBytes(newValue.toString()));
+ return outval;
+ }
+
+ protected void verify(TableName tableName) throws IOException {
+ Table table = UTIL.getConnection().getTable(tableName);
+ boolean verified = false;
+ long pause = UTIL.getConfiguration().getLong("hbase.client.pause", 5 * 1000);
+ int numRetries = UTIL.getConfiguration().getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 5);
+ for (int i = 0; i < numRetries; i++) {
+ try {
+ getLog().info("Verification attempt #" + i);
+ verifyAttempt(table);
+ verified = true;
+ break;
+ } catch (NullPointerException e) {
+ // If here, a cell was empty. Presume its because updates came in
+ // after the scanner had been opened. Wait a while and retry.
+ getLog().debug("Verification attempt failed: " + e.getMessage());
+ }
+ try {
+ Thread.sleep(pause);
+ } catch (InterruptedException e) {
+ // continue
+ }
+ }
+ assertTrue(verified);
+ }
+
+ /**
+ * Looks at every value of the mapreduce output and verifies that indeed
+ * the values have been reversed.
+ * @param table Table to scan.
+ * @throws IOException
+ * @throws NullPointerException if we failed to find a cell value
+ */
+ private void verifyAttempt(final Table table) throws IOException, NullPointerException {
+ Scan scan = new Scan();
+ TableInputFormat.addColumns(scan, columns);
+ ResultScanner scanner = table.getScanner(scan);
+ try {
+ Iterator<Result> itr = scanner.iterator();
+ assertTrue(itr.hasNext());
+ while(itr.hasNext()) {
+ Result r = itr.next();
+ if (getLog().isDebugEnabled()) {
+ if (r.size() > 2 ) {
+ throw new IOException("Too many results, expected 2 got " +
+ r.size());
+ }
+ }
+ byte[] firstValue = null;
+ byte[] secondValue = null;
+ int count = 0;
+ for(Cell kv : r.listCells()) {
+ if (count == 0) {
+ firstValue = CellUtil.cloneValue(kv);
+ }
+ if (count == 1) {
+ secondValue = CellUtil.cloneValue(kv);
+ }
+ count++;
+ if (count == 2) {
+ break;
+ }
+ }
+
+
+ if (firstValue == null) {
+ throw new NullPointerException(Bytes.toString(r.getRow()) +
+ ": first value is null");
+ }
+ String first = Bytes.toString(firstValue);
+
+ if (secondValue == null) {
+ throw new NullPointerException(Bytes.toString(r.getRow()) +
+ ": second value is null");
+ }
+ byte[] secondReversed = new byte[secondValue.length];
+ for (int i = 0, j = secondValue.length - 1; j >= 0; j--, i++) {
+ secondReversed[i] = secondValue[j];
+ }
+ String second = Bytes.toString(secondReversed);
+
+ if (first.compareTo(second) != 0) {
+ if (getLog().isDebugEnabled()) {
+ getLog().debug("second key is not the reverse of first. row=" +
+ Bytes.toStringBinary(r.getRow()) + ", first value=" + first +
+ ", second value=" + second);
+ }
+ fail();
+ }
+ }
+ } finally {
+ scanner.close();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduceUtil.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduceUtil.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduceUtil.java
new file mode 100644
index 0000000..506bf4f
--- /dev/null
+++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduceUtil.java
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with the License. You may
+ * obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.mapreduce;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.testclassification.MapReduceTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+/**
+ * Test different variants of initTableMapperJob method
+ */
+@Category({MapReduceTests.class, SmallTests.class})
+public class TestTableMapReduceUtil {
+
+ /*
+ * initTableSnapshotMapperJob is tested in {@link TestTableSnapshotInputFormat} because
+ * the method depends on an online cluster.
+ */
+
+ @Test
+ public void testInitTableMapperJob1() throws Exception {
+ Configuration configuration = new Configuration();
+ Job job = new Job(configuration, "tableName");
+ // test
+ TableMapReduceUtil.initTableMapperJob("Table", new Scan(), Import.Importer.class, Text.class,
+ Text.class, job, false, WALInputFormat.class);
+ assertEquals(WALInputFormat.class, job.getInputFormatClass());
+ assertEquals(Import.Importer.class, job.getMapperClass());
+ assertEquals(LongWritable.class, job.getOutputKeyClass());
+ assertEquals(Text.class, job.getOutputValueClass());
+ assertNull(job.getCombinerClass());
+ assertEquals("Table", job.getConfiguration().get(TableInputFormat.INPUT_TABLE));
+ }
+
+ @Test
+ public void testInitTableMapperJob2() throws Exception {
+ Configuration configuration = new Configuration();
+ Job job = new Job(configuration, "tableName");
+ TableMapReduceUtil.initTableMapperJob(Bytes.toBytes("Table"), new Scan(),
+ Import.Importer.class, Text.class, Text.class, job, false, WALInputFormat.class);
+ assertEquals(WALInputFormat.class, job.getInputFormatClass());
+ assertEquals(Import.Importer.class, job.getMapperClass());
+ assertEquals(LongWritable.class, job.getOutputKeyClass());
+ assertEquals(Text.class, job.getOutputValueClass());
+ assertNull(job.getCombinerClass());
+ assertEquals("Table", job.getConfiguration().get(TableInputFormat.INPUT_TABLE));
+ }
+
+ @Test
+ public void testInitTableMapperJob3() throws Exception {
+ Configuration configuration = new Configuration();
+ Job job = new Job(configuration, "tableName");
+ TableMapReduceUtil.initTableMapperJob(Bytes.toBytes("Table"), new Scan(),
+ Import.Importer.class, Text.class, Text.class, job);
+ assertEquals(TableInputFormat.class, job.getInputFormatClass());
+ assertEquals(Import.Importer.class, job.getMapperClass());
+ assertEquals(LongWritable.class, job.getOutputKeyClass());
+ assertEquals(Text.class, job.getOutputValueClass());
+ assertNull(job.getCombinerClass());
+ assertEquals("Table", job.getConfiguration().get(TableInputFormat.INPUT_TABLE));
+ }
+
+ @Test
+ public void testInitTableMapperJob4() throws Exception {
+ Configuration configuration = new Configuration();
+ Job job = new Job(configuration, "tableName");
+ TableMapReduceUtil.initTableMapperJob(Bytes.toBytes("Table"), new Scan(),
+ Import.Importer.class, Text.class, Text.class, job, false);
+ assertEquals(TableInputFormat.class, job.getInputFormatClass());
+ assertEquals(Import.Importer.class, job.getMapperClass());
+ assertEquals(LongWritable.class, job.getOutputKeyClass());
+ assertEquals(Text.class, job.getOutputValueClass());
+ assertNull(job.getCombinerClass());
+ assertEquals("Table", job.getConfiguration().get(TableInputFormat.INPUT_TABLE));
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableSnapshotInputFormat.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableSnapshotInputFormat.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableSnapshotInputFormat.java
new file mode 100644
index 0000000..028df98
--- /dev/null
+++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableSnapshotInputFormat.java
@@ -0,0 +1,373 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.mapreduce;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.CategoryBasedTimeout;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HDFSBlocksDistribution;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.TestTableSnapshotScanner;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat.TableSnapshotRegionSplit;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.testclassification.VerySlowMapReduceTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+import org.junit.rules.TestRule;
+
+import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists;
+
+import java.util.Arrays;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils;
+import org.apache.hadoop.hbase.util.FSUtils;
+
+@Category({VerySlowMapReduceTests.class, LargeTests.class})
+public class TestTableSnapshotInputFormat extends TableSnapshotInputFormatTestBase {
+ private static final Log LOG = LogFactory.getLog(TestTableSnapshotInputFormat.class);
+ @Rule public final TestRule timeout = CategoryBasedTimeout.builder().
+ withTimeout(this.getClass()).withLookingForStuckThread(true).build();
+
+ private static final byte[] bbb = Bytes.toBytes("bbb");
+ private static final byte[] yyy = Bytes.toBytes("yyy");
+
+ @Rule
+ public TestName name = new TestName();
+
+ @Override
+ protected byte[] getStartRow() {
+ return bbb;
+ }
+
+ @Override
+ protected byte[] getEndRow() {
+ return yyy;
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ }
+
+ @Test
+ public void testGetBestLocations() throws IOException {
+ TableSnapshotInputFormatImpl tsif = new TableSnapshotInputFormatImpl();
+ Configuration conf = UTIL.getConfiguration();
+
+ HDFSBlocksDistribution blockDistribution = new HDFSBlocksDistribution();
+ Assert.assertEquals(Lists.newArrayList(),
+ TableSnapshotInputFormatImpl.getBestLocations(conf, blockDistribution));
+
+ blockDistribution.addHostsAndBlockWeight(new String[] {"h1"}, 1);
+ Assert.assertEquals(Lists.newArrayList("h1"),
+ TableSnapshotInputFormatImpl.getBestLocations(conf, blockDistribution));
+
+ blockDistribution.addHostsAndBlockWeight(new String[] {"h1"}, 1);
+ Assert.assertEquals(Lists.newArrayList("h1"),
+ TableSnapshotInputFormatImpl.getBestLocations(conf, blockDistribution));
+
+ blockDistribution.addHostsAndBlockWeight(new String[] {"h2"}, 1);
+ Assert.assertEquals(Lists.newArrayList("h1"),
+ TableSnapshotInputFormatImpl.getBestLocations(conf, blockDistribution));
+
+ blockDistribution = new HDFSBlocksDistribution();
+ blockDistribution.addHostsAndBlockWeight(new String[] {"h1"}, 10);
+ blockDistribution.addHostsAndBlockWeight(new String[] {"h2"}, 7);
+ blockDistribution.addHostsAndBlockWeight(new String[] {"h3"}, 5);
+ blockDistribution.addHostsAndBlockWeight(new String[] {"h4"}, 1);
+ Assert.assertEquals(Lists.newArrayList("h1"),
+ TableSnapshotInputFormatImpl.getBestLocations(conf, blockDistribution));
+
+ blockDistribution.addHostsAndBlockWeight(new String[] {"h2"}, 2);
+ Assert.assertEquals(Lists.newArrayList("h1", "h2"),
+ TableSnapshotInputFormatImpl.getBestLocations(conf, blockDistribution));
+
+ blockDistribution.addHostsAndBlockWeight(new String[] {"h2"}, 3);
+ Assert.assertEquals(Lists.newArrayList("h2", "h1"),
+ TableSnapshotInputFormatImpl.getBestLocations(conf, blockDistribution));
+
+ blockDistribution.addHostsAndBlockWeight(new String[] {"h3"}, 6);
+ blockDistribution.addHostsAndBlockWeight(new String[] {"h4"}, 9);
+
+ Assert.assertEquals(Lists.newArrayList("h2", "h3", "h4", "h1"),
+ TableSnapshotInputFormatImpl.getBestLocations(conf, blockDistribution));
+ }
+
+ public static enum TestTableSnapshotCounters {
+ VALIDATION_ERROR
+ }
+
+ public static class TestTableSnapshotMapper
+ extends TableMapper<ImmutableBytesWritable, NullWritable> {
+ @Override
+ protected void map(ImmutableBytesWritable key, Result value,
+ Context context) throws IOException, InterruptedException {
+ // Validate a single row coming from the snapshot, and emit the row key
+ verifyRowFromMap(key, value);
+ context.write(key, NullWritable.get());
+ }
+ }
+
+ public static class TestTableSnapshotReducer
+ extends Reducer<ImmutableBytesWritable, NullWritable, NullWritable, NullWritable> {
+ HBaseTestingUtility.SeenRowTracker rowTracker =
+ new HBaseTestingUtility.SeenRowTracker(bbb, yyy);
+ @Override
+ protected void reduce(ImmutableBytesWritable key, Iterable<NullWritable> values,
+ Context context) throws IOException, InterruptedException {
+ rowTracker.addRow(key.get());
+ }
+
+ @Override
+ protected void cleanup(Context context) throws IOException,
+ InterruptedException {
+ rowTracker.validate();
+ }
+ }
+
+ @Test
+ public void testInitTableSnapshotMapperJobConfig() throws Exception {
+ setupCluster();
+ final TableName tableName = TableName.valueOf(name.getMethodName());
+ String snapshotName = "foo";
+
+ try {
+ createTableAndSnapshot(UTIL, tableName, snapshotName, getStartRow(), getEndRow(), 1);
+ Job job = new Job(UTIL.getConfiguration());
+ Path tmpTableDir = UTIL.getDataTestDirOnTestFS(snapshotName);
+
+ TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName,
+ new Scan(), TestTableSnapshotMapper.class, ImmutableBytesWritable.class,
+ NullWritable.class, job, false, tmpTableDir);
+
+ // TODO: would be better to examine directly the cache instance that results from this
+ // config. Currently this is not possible because BlockCache initialization is static.
+ Assert.assertEquals(
+ "Snapshot job should be configured for default LruBlockCache.",
+ HConstants.HFILE_BLOCK_CACHE_SIZE_DEFAULT,
+ job.getConfiguration().getFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, -1), 0.01);
+ Assert.assertEquals(
+ "Snapshot job should not use BucketCache.",
+ 0, job.getConfiguration().getFloat("hbase.bucketcache.size", -1), 0.01);
+ } finally {
+ UTIL.getAdmin().deleteSnapshot(snapshotName);
+ UTIL.deleteTable(tableName);
+ tearDownCluster();
+ }
+ }
+
+ @Override
+ public void testRestoreSnapshotDoesNotCreateBackRefLinksInit(TableName tableName,
+ String snapshotName, Path tmpTableDir) throws Exception {
+ Job job = new Job(UTIL.getConfiguration());
+ TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName,
+ new Scan(), TestTableSnapshotMapper.class, ImmutableBytesWritable.class,
+ NullWritable.class, job, false, tmpTableDir);
+ }
+
+ @Override
+ public void testWithMockedMapReduce(HBaseTestingUtility util, String snapshotName,
+ int numRegions, int expectedNumSplits) throws Exception {
+ setupCluster();
+ final TableName tableName = TableName.valueOf(name.getMethodName());
+ try {
+ createTableAndSnapshot(
+ util, tableName, snapshotName, getStartRow(), getEndRow(), numRegions);
+
+ Job job = new Job(util.getConfiguration());
+ Path tmpTableDir = util.getDataTestDirOnTestFS(snapshotName);
+ Scan scan = new Scan(getStartRow(), getEndRow()); // limit the scan
+
+ TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName,
+ scan, TestTableSnapshotMapper.class, ImmutableBytesWritable.class,
+ NullWritable.class, job, false, tmpTableDir);
+
+ verifyWithMockedMapReduce(job, numRegions, expectedNumSplits, getStartRow(), getEndRow());
+
+ } finally {
+ util.getAdmin().deleteSnapshot(snapshotName);
+ util.deleteTable(tableName);
+ tearDownCluster();
+ }
+ }
+
+ @Test
+ public void testNoDuplicateResultsWhenSplitting() throws Exception {
+ setupCluster();
+ TableName tableName = TableName.valueOf("testNoDuplicateResultsWhenSplitting");
+ String snapshotName = "testSnapshotBug";
+ try {
+ if (UTIL.getAdmin().tableExists(tableName)) {
+ UTIL.deleteTable(tableName);
+ }
+
+ UTIL.createTable(tableName, FAMILIES);
+ Admin admin = UTIL.getAdmin();
+
+ // put some stuff in the table
+ Table table = UTIL.getConnection().getTable(tableName);
+ UTIL.loadTable(table, FAMILIES);
+
+ // split to 2 regions
+ admin.split(tableName, Bytes.toBytes("eee"));
+ TestTableSnapshotScanner.blockUntilSplitFinished(UTIL, tableName, 2);
+
+ Path rootDir = FSUtils.getRootDir(UTIL.getConfiguration());
+ FileSystem fs = rootDir.getFileSystem(UTIL.getConfiguration());
+
+ SnapshotTestingUtils.createSnapshotAndValidate(admin, tableName, Arrays.asList(FAMILIES),
+ null, snapshotName, rootDir, fs, true);
+
+ // load different values
+ byte[] value = Bytes.toBytes("after_snapshot_value");
+ UTIL.loadTable(table, FAMILIES, value);
+
+ // cause flush to create new files in the region
+ admin.flush(tableName);
+ table.close();
+
+ Job job = new Job(UTIL.getConfiguration());
+ Path tmpTableDir = UTIL.getDataTestDirOnTestFS(snapshotName);
+ // limit the scan
+ Scan scan = new Scan().withStartRow(getStartRow()).withStopRow(getEndRow());
+
+ TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName, scan,
+ TestTableSnapshotMapper.class, ImmutableBytesWritable.class, NullWritable.class, job, false,
+ tmpTableDir);
+
+ verifyWithMockedMapReduce(job, 2, 2, getStartRow(), getEndRow());
+ } finally {
+ UTIL.getAdmin().deleteSnapshot(snapshotName);
+ UTIL.deleteTable(tableName);
+ tearDownCluster();
+ }
+ }
+
+ private void verifyWithMockedMapReduce(Job job, int numRegions, int expectedNumSplits,
+ byte[] startRow, byte[] stopRow)
+ throws IOException, InterruptedException {
+ TableSnapshotInputFormat tsif = new TableSnapshotInputFormat();
+ List<InputSplit> splits = tsif.getSplits(job);
+
+ Assert.assertEquals(expectedNumSplits, splits.size());
+
+ HBaseTestingUtility.SeenRowTracker rowTracker =
+ new HBaseTestingUtility.SeenRowTracker(startRow, stopRow);
+
+ for (int i = 0; i < splits.size(); i++) {
+ // validate input split
+ InputSplit split = splits.get(i);
+ Assert.assertTrue(split instanceof TableSnapshotRegionSplit);
+
+ // validate record reader
+ TaskAttemptContext taskAttemptContext = mock(TaskAttemptContext.class);
+ when(taskAttemptContext.getConfiguration()).thenReturn(job.getConfiguration());
+ RecordReader<ImmutableBytesWritable, Result> rr =
+ tsif.createRecordReader(split, taskAttemptContext);
+ rr.initialize(split, taskAttemptContext);
+
+ // validate we can read all the data back
+ while (rr.nextKeyValue()) {
+ byte[] row = rr.getCurrentKey().get();
+ verifyRowFromMap(rr.getCurrentKey(), rr.getCurrentValue());
+ rowTracker.addRow(row);
+ }
+
+ rr.close();
+ }
+
+ // validate all rows are seen
+ rowTracker.validate();
+ }
+
+ @Override
+ protected void testWithMapReduceImpl(HBaseTestingUtility util, TableName tableName,
+ String snapshotName, Path tableDir, int numRegions, int expectedNumSplits,
+ boolean shutdownCluster) throws Exception {
+ doTestWithMapReduce(util, tableName, snapshotName, getStartRow(), getEndRow(), tableDir,
+ numRegions, expectedNumSplits, shutdownCluster);
+ }
+
+ // this is also called by the IntegrationTestTableSnapshotInputFormat
+ public static void doTestWithMapReduce(HBaseTestingUtility util, TableName tableName,
+ String snapshotName, byte[] startRow, byte[] endRow, Path tableDir, int numRegions,
+ int expectedNumSplits, boolean shutdownCluster) throws Exception {
+
+ LOG.info("testing with MapReduce");
+
+ LOG.info("create the table and snapshot");
+ createTableAndSnapshot(util, tableName, snapshotName, startRow, endRow, numRegions);
+
+ if (shutdownCluster) {
+ LOG.info("shutting down hbase cluster.");
+ util.shutdownMiniHBaseCluster();
+ }
+
+ try {
+ // create the job
+ Job job = new Job(util.getConfiguration());
+ Scan scan = new Scan(startRow, endRow); // limit the scan
+
+ job.setJarByClass(util.getClass());
+ TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(),
+ TestTableSnapshotInputFormat.class);
+
+ TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName,
+ scan, TestTableSnapshotMapper.class, ImmutableBytesWritable.class,
+ NullWritable.class, job, true, tableDir);
+
+ job.setReducerClass(TestTableSnapshotInputFormat.TestTableSnapshotReducer.class);
+ job.setNumReduceTasks(1);
+ job.setOutputFormatClass(NullOutputFormat.class);
+
+ Assert.assertTrue(job.waitForCompletion(true));
+ } finally {
+ if (!shutdownCluster) {
+ util.getAdmin().deleteSnapshot(snapshotName);
+ util.deleteTable(tableName);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableSplit.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableSplit.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableSplit.java
new file mode 100644
index 0000000..4382c9c
--- /dev/null
+++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableSplit.java
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.testclassification.MapReduceTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+import java.util.HashSet;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+@Category({MapReduceTests.class, SmallTests.class})
+public class TestTableSplit {
+ @Rule
+ public TestName name = new TestName();
+
+ @Test
+ public void testHashCode() {
+ TableSplit split1 = new TableSplit(TableName.valueOf(name.getMethodName()),
+ "row-start".getBytes(),
+ "row-end".getBytes(), "location");
+ TableSplit split2 = new TableSplit(TableName.valueOf(name.getMethodName()),
+ "row-start".getBytes(),
+ "row-end".getBytes(), "location");
+ assertEquals (split1, split2);
+ assertTrue (split1.hashCode() == split2.hashCode());
+ HashSet<TableSplit> set = new HashSet<>(2);
+ set.add(split1);
+ set.add(split2);
+ assertTrue(set.size() == 1);
+ }
+
+ /**
+ * length of region should not influence hashcode
+ * */
+ @Test
+ public void testHashCode_length() {
+ TableSplit split1 = new TableSplit(TableName.valueOf(name.getMethodName()),
+ "row-start".getBytes(),
+ "row-end".getBytes(), "location", 1984);
+ TableSplit split2 = new TableSplit(TableName.valueOf(name.getMethodName()),
+ "row-start".getBytes(),
+ "row-end".getBytes(), "location", 1982);
+
+ assertEquals (split1, split2);
+ assertTrue (split1.hashCode() == split2.hashCode());
+ HashSet<TableSplit> set = new HashSet<>(2);
+ set.add(split1);
+ set.add(split2);
+ assertTrue(set.size() == 1);
+ }
+
+ /**
+ * Length of region need to be properly serialized.
+ * */
+ @Test
+ public void testLengthIsSerialized() throws Exception {
+ TableSplit split1 = new TableSplit(TableName.valueOf(name.getMethodName()),
+ "row-start".getBytes(),
+ "row-end".getBytes(), "location", 666);
+
+ TableSplit deserialized = new TableSplit(TableName.valueOf(name.getMethodName()),
+ "row-start2".getBytes(),
+ "row-end2".getBytes(), "location1");
+ ReflectionUtils.copy(new Configuration(), split1, deserialized);
+
+ Assert.assertEquals(666, deserialized.getLength());
+ }
+
+ @Test
+ public void testToString() {
+ TableSplit split =
+ new TableSplit(TableName.valueOf(name.getMethodName()), "row-start".getBytes(), "row-end".getBytes(),
+ "location");
+ String str =
+ "HBase table split(table name: " + name.getMethodName() + ", scan: , start row: row-start, "
+ + "end row: row-end, region location: location, "
+ + "encoded region name: )";
+ Assert.assertEquals(str, split.toString());
+
+ split =
+ new TableSplit(TableName.valueOf(name.getMethodName()), null, "row-start".getBytes(),
+ "row-end".getBytes(), "location", "encoded-region-name", 1000L);
+ str =
+ "HBase table split(table name: " + name.getMethodName() + ", scan: , start row: row-start, "
+ + "end row: row-end, region location: location, "
+ + "encoded region name: encoded-region-name)";
+ Assert.assertEquals(str, split.toString());
+
+ split = new TableSplit((TableName) null, null, null, null);
+ str =
+ "HBase table split(table name: null, scan: , start row: null, "
+ + "end row: null, region location: null, "
+ + "encoded region name: )";
+ Assert.assertEquals(str, split.toString());
+
+ split = new TableSplit((TableName) null, null, null, null, null, null, 1000L);
+ str =
+ "HBase table split(table name: null, scan: , start row: null, "
+ + "end row: null, region location: null, "
+ + "encoded region name: null)";
+ Assert.assertEquals(str, split.toString());
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTimeRangeMapRed.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTimeRangeMapRed.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTimeRangeMapRed.java
new file mode 100644
index 0000000..6796c94
--- /dev/null
+++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTimeRangeMapRed.java
@@ -0,0 +1,211 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Durability;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.testclassification.MapReduceTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.MapWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+
+@Category({MapReduceTests.class, LargeTests.class})
+public class TestTimeRangeMapRed {
+ private final static Log log = LogFactory.getLog(TestTimeRangeMapRed.class);
+ private static final HBaseTestingUtility UTIL =
+ new HBaseTestingUtility();
+ private Admin admin;
+
+ private static final byte [] KEY = Bytes.toBytes("row1");
+ private static final NavigableMap<Long, Boolean> TIMESTAMP = new TreeMap<>();
+ static {
+ TIMESTAMP.put((long)1245620000, false);
+ TIMESTAMP.put((long)1245620005, true); // include
+ TIMESTAMP.put((long)1245620010, true); // include
+ TIMESTAMP.put((long)1245620055, true); // include
+ TIMESTAMP.put((long)1245620100, true); // include
+ TIMESTAMP.put((long)1245620150, false);
+ TIMESTAMP.put((long)1245620250, false);
+ }
+ static final long MINSTAMP = 1245620005;
+ static final long MAXSTAMP = 1245620100 + 1; // maxStamp itself is excluded. so increment it.
+
+ static final TableName TABLE_NAME = TableName.valueOf("table123");
+ static final byte[] FAMILY_NAME = Bytes.toBytes("text");
+ static final byte[] COLUMN_NAME = Bytes.toBytes("input");
+
+ @BeforeClass
+ public static void beforeClass() throws Exception {
+ UTIL.startMiniCluster();
+ }
+
+ @AfterClass
+ public static void afterClass() throws Exception {
+ UTIL.shutdownMiniCluster();
+ }
+
+ @Before
+ public void before() throws Exception {
+ this.admin = UTIL.getAdmin();
+ }
+
+ private static class ProcessTimeRangeMapper
+ extends TableMapper<ImmutableBytesWritable, MapWritable>
+ implements Configurable {
+
+ private Configuration conf = null;
+ private Table table = null;
+
+ @Override
+ public void map(ImmutableBytesWritable key, Result result,
+ Context context)
+ throws IOException {
+ List<Long> tsList = new ArrayList<>();
+ for (Cell kv : result.listCells()) {
+ tsList.add(kv.getTimestamp());
+ }
+
+ List<Put> puts = new ArrayList<>();
+ for (Long ts : tsList) {
+ Put put = new Put(key.get());
+ put.setDurability(Durability.SKIP_WAL);
+ put.addColumn(FAMILY_NAME, COLUMN_NAME, ts, Bytes.toBytes(true));
+ puts.add(put);
+ }
+ table.put(puts);
+ }
+
+ @Override
+ public Configuration getConf() {
+ return conf;
+ }
+
+ @Override
+ public void setConf(Configuration configuration) {
+ this.conf = configuration;
+ try {
+ Connection connection = ConnectionFactory.createConnection(conf);
+ table = connection.getTable(TABLE_NAME);
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+ @Test
+ public void testTimeRangeMapRed()
+ throws IOException, InterruptedException, ClassNotFoundException {
+ final HTableDescriptor desc = new HTableDescriptor(TABLE_NAME);
+ final HColumnDescriptor col = new HColumnDescriptor(FAMILY_NAME);
+ col.setMaxVersions(Integer.MAX_VALUE);
+ desc.addFamily(col);
+ admin.createTable(desc);
+ List<Put> puts = new ArrayList<>();
+ for (Map.Entry<Long, Boolean> entry : TIMESTAMP.entrySet()) {
+ Put put = new Put(KEY);
+ put.setDurability(Durability.SKIP_WAL);
+ put.addColumn(FAMILY_NAME, COLUMN_NAME, entry.getKey(), Bytes.toBytes(false));
+ puts.add(put);
+ }
+ Table table = UTIL.getConnection().getTable(desc.getTableName());
+ table.put(puts);
+ runTestOnTable();
+ verify(table);
+ table.close();
+ }
+
+ private void runTestOnTable()
+ throws IOException, InterruptedException, ClassNotFoundException {
+ Job job = null;
+ try {
+ job = new Job(UTIL.getConfiguration(), "test123");
+ job.setOutputFormatClass(NullOutputFormat.class);
+ job.setNumReduceTasks(0);
+ Scan scan = new Scan();
+ scan.addColumn(FAMILY_NAME, COLUMN_NAME);
+ scan.setTimeRange(MINSTAMP, MAXSTAMP);
+ scan.setMaxVersions();
+ TableMapReduceUtil.initTableMapperJob(TABLE_NAME,
+ scan, ProcessTimeRangeMapper.class, Text.class, Text.class, job);
+ job.waitForCompletion(true);
+ } catch (IOException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ } finally {
+ if (job != null) {
+ FileUtil.fullyDelete(
+ new File(job.getConfiguration().get("hadoop.tmp.dir")));
+ }
+ }
+ }
+
+ private void verify(final Table table) throws IOException {
+ Scan scan = new Scan();
+ scan.addColumn(FAMILY_NAME, COLUMN_NAME);
+ scan.setMaxVersions(1);
+ ResultScanner scanner = table.getScanner(scan);
+ for (Result r: scanner) {
+ for (Cell kv : r.listCells()) {
+ log.debug(Bytes.toString(r.getRow()) + "\t" + Bytes.toString(CellUtil.cloneFamily(kv))
+ + "\t" + Bytes.toString(CellUtil.cloneQualifier(kv))
+ + "\t" + kv.getTimestamp() + "\t" + Bytes.toBoolean(CellUtil.cloneValue(kv)));
+ org.junit.Assert.assertEquals(TIMESTAMP.get(kv.getTimestamp()),
+ Bytes.toBoolean(CellUtil.cloneValue(kv)));
+ }
+ }
+ scanner.close();
+ }
+
+}
+
http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALPlayer.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALPlayer.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALPlayer.java
new file mode 100644
index 0000000..427c5cc
--- /dev/null
+++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALPlayer.java
@@ -0,0 +1,231 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.ByteArrayOutputStream;
+import java.io.PrintStream;
+import java.util.ArrayList;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.MiniHBaseCluster;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.WALPlayer.WALKeyValueMapper;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.testclassification.MapReduceTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.LauncherSecurityManager;
+import org.apache.hadoop.hbase.wal.WAL;
+import org.apache.hadoop.hbase.wal.WALKey;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Mapper.Context;
+import org.apache.hadoop.util.ToolRunner;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+/**
+ * Basic test for the WALPlayer M/R tool
+ */
+@Category({MapReduceTests.class, LargeTests.class})
+public class TestWALPlayer {
+ private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+ private static MiniHBaseCluster cluster;
+ private static Path rootDir;
+ private static Path walRootDir;
+ private static FileSystem fs;
+ private static FileSystem logFs;
+ private static Configuration conf;
+
+ @Rule
+ public TestName name = new TestName();
+
+ @BeforeClass
+ public static void beforeClass() throws Exception {
+ conf= TEST_UTIL.getConfiguration();
+ rootDir = TEST_UTIL.createRootDir();
+ walRootDir = TEST_UTIL.createWALRootDir();
+ fs = FSUtils.getRootDirFileSystem(conf);
+ logFs = FSUtils.getWALFileSystem(conf);
+ cluster = TEST_UTIL.startMiniCluster();
+ }
+
+ @AfterClass
+ public static void afterClass() throws Exception {
+ TEST_UTIL.shutdownMiniCluster();
+ fs.delete(rootDir, true);
+ logFs.delete(walRootDir, true);
+ }
+
+ /**
+ * Simple end-to-end test
+ * @throws Exception
+ */
+ @Test
+ public void testWALPlayer() throws Exception {
+ final TableName tableName1 = TableName.valueOf(name.getMethodName() + "1");
+ final TableName tableName2 = TableName.valueOf(name.getMethodName() + "2");
+ final byte[] FAMILY = Bytes.toBytes("family");
+ final byte[] COLUMN1 = Bytes.toBytes("c1");
+ final byte[] COLUMN2 = Bytes.toBytes("c2");
+ final byte[] ROW = Bytes.toBytes("row");
+ Table t1 = TEST_UTIL.createTable(tableName1, FAMILY);
+ Table t2 = TEST_UTIL.createTable(tableName2, FAMILY);
+
+ // put a row into the first table
+ Put p = new Put(ROW);
+ p.addColumn(FAMILY, COLUMN1, COLUMN1);
+ p.addColumn(FAMILY, COLUMN2, COLUMN2);
+ t1.put(p);
+ // delete one column
+ Delete d = new Delete(ROW);
+ d.addColumns(FAMILY, COLUMN1);
+ t1.delete(d);
+
+ // replay the WAL, map table 1 to table 2
+ WAL log = cluster.getRegionServer(0).getWAL(null);
+ log.rollWriter();
+ String walInputDir = new Path(cluster.getMaster().getMasterFileSystem()
+ .getWALRootDir(), HConstants.HREGION_LOGDIR_NAME).toString();
+
+ Configuration configuration= TEST_UTIL.getConfiguration();
+ WALPlayer player = new WALPlayer(configuration);
+ String optionName="_test_.name";
+ configuration.set(optionName, "1000");
+ player.setupTime(configuration, optionName);
+ assertEquals(1000,configuration.getLong(optionName,0));
+ assertEquals(0, ToolRunner.run(configuration, player,
+ new String[] {walInputDir, tableName1.getNameAsString(),
+ tableName2.getNameAsString() }));
+
+
+ // verify the WAL was player into table 2
+ Get g = new Get(ROW);
+ Result r = t2.get(g);
+ assertEquals(1, r.size());
+ assertTrue(CellUtil.matchingQualifier(r.rawCells()[0], COLUMN2));
+ }
+
+ /**
+ * Test WALKeyValueMapper setup and map
+ */
+ @Test
+ public void testWALKeyValueMapper() throws Exception {
+ testWALKeyValueMapper(WALPlayer.TABLES_KEY);
+ }
+
+ @Test
+ public void testWALKeyValueMapperWithDeprecatedConfig() throws Exception {
+ testWALKeyValueMapper("hlog.input.tables");
+ }
+
+ private void testWALKeyValueMapper(final String tableConfigKey) throws Exception {
+ Configuration configuration = new Configuration();
+ configuration.set(tableConfigKey, "table");
+ WALKeyValueMapper mapper = new WALKeyValueMapper();
+ WALKey key = mock(WALKey.class);
+ when(key.getTablename()).thenReturn(TableName.valueOf("table"));
+ @SuppressWarnings("unchecked")
+ Mapper<WALKey, WALEdit, ImmutableBytesWritable, KeyValue>.Context context = mock(Context.class);
+ when(context.getConfiguration()).thenReturn(configuration);
+
+ WALEdit value = mock(WALEdit.class);
+ ArrayList<Cell> values = new ArrayList<>();
+ KeyValue kv1 = new KeyValue(Bytes.toBytes("row"), Bytes.toBytes("family"), null);
+
+ values.add(kv1);
+ when(value.getCells()).thenReturn(values);
+ mapper.setup(context);
+
+ doAnswer(new Answer<Void>() {
+
+ @Override
+ public Void answer(InvocationOnMock invocation) throws Throwable {
+ ImmutableBytesWritable writer = (ImmutableBytesWritable) invocation.getArguments()[0];
+ KeyValue key = (KeyValue) invocation.getArguments()[1];
+ assertEquals("row", Bytes.toString(writer.get()));
+ assertEquals("row", Bytes.toString(CellUtil.cloneRow(key)));
+ return null;
+ }
+ }).when(context).write(any(ImmutableBytesWritable.class), any(KeyValue.class));
+
+ mapper.map(key, value, context);
+
+ }
+
+ /**
+ * Test main method
+ */
+ @Test
+ public void testMainMethod() throws Exception {
+
+ PrintStream oldPrintStream = System.err;
+ SecurityManager SECURITY_MANAGER = System.getSecurityManager();
+ LauncherSecurityManager newSecurityManager= new LauncherSecurityManager();
+ System.setSecurityManager(newSecurityManager);
+ ByteArrayOutputStream data = new ByteArrayOutputStream();
+ String[] args = {};
+ System.setErr(new PrintStream(data));
+ try {
+ System.setErr(new PrintStream(data));
+ try {
+ WALPlayer.main(args);
+ fail("should be SecurityException");
+ } catch (SecurityException e) {
+ assertEquals(-1, newSecurityManager.getExitCode());
+ assertTrue(data.toString().contains("ERROR: Wrong number of arguments:"));
+ assertTrue(data.toString().contains("Usage: WALPlayer [options] <wal inputdir>" +
+ " <tables> [<tableMappings>]"));
+ assertTrue(data.toString().contains("-Dwal.bulk.output=/path/for/output"));
+ }
+
+ } finally {
+ System.setErr(oldPrintStream);
+ System.setSecurityManager(SECURITY_MANAGER);
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALRecordReader.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALRecordReader.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALRecordReader.java
new file mode 100644
index 0000000..34725b4
--- /dev/null
+++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALRecordReader.java
@@ -0,0 +1,276 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.util.List;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.mapreduce.WALInputFormat.WALKeyRecordReader;
+import org.apache.hadoop.hbase.mapreduce.WALInputFormat.WALRecordReader;
+import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.wal.WAL;
+import org.apache.hadoop.hbase.wal.WALFactory;
+import org.apache.hadoop.hbase.wal.WALKey;
+import org.apache.hadoop.hbase.testclassification.MapReduceTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.MapReduceTestUtil;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+/**
+ * JUnit tests for the WALRecordReader
+ */
+@Category({MapReduceTests.class, MediumTests.class})
+public class TestWALRecordReader {
+ private static final Log LOG = LogFactory.getLog(TestWALRecordReader.class);
+ private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+ private static Configuration conf;
+ private static FileSystem fs;
+ private static Path hbaseDir;
+ private static FileSystem walFs;
+ private static Path walRootDir;
+ // visible for TestHLogRecordReader
+ static final TableName tableName = TableName.valueOf(getName());
+ private static final byte [] rowName = tableName.getName();
+ // visible for TestHLogRecordReader
+ static final HRegionInfo info = new HRegionInfo(tableName,
+ Bytes.toBytes(""), Bytes.toBytes(""), false);
+ private static final byte [] family = Bytes.toBytes("column");
+ private static final byte [] value = Bytes.toBytes("value");
+ private static HTableDescriptor htd;
+ private static Path logDir;
+ protected MultiVersionConcurrencyControl mvcc;
+ protected static NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
+
+ private static String getName() {
+ return "TestWALRecordReader";
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ fs.delete(hbaseDir, true);
+ walFs.delete(walRootDir, true);
+ mvcc = new MultiVersionConcurrencyControl();
+ }
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ // Make block sizes small.
+ conf = TEST_UTIL.getConfiguration();
+ conf.setInt("dfs.blocksize", 1024 * 1024);
+ conf.setInt("dfs.replication", 1);
+ TEST_UTIL.startMiniDFSCluster(1);
+
+ conf = TEST_UTIL.getConfiguration();
+ fs = TEST_UTIL.getDFSCluster().getFileSystem();
+
+ hbaseDir = TEST_UTIL.createRootDir();
+ walRootDir = TEST_UTIL.createWALRootDir();
+ walFs = FSUtils.getWALFileSystem(conf);
+ logDir = new Path(walRootDir, HConstants.HREGION_LOGDIR_NAME);
+
+ htd = new HTableDescriptor(tableName);
+ htd.addFamily(new HColumnDescriptor(family));
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ fs.delete(hbaseDir, true);
+ walFs.delete(walRootDir, true);
+ TEST_UTIL.shutdownMiniCluster();
+ }
+
+ /**
+ * Test partial reads from the log based on passed time range
+ * @throws Exception
+ */
+ @Test
+ public void testPartialRead() throws Exception {
+ final WALFactory walfactory = new WALFactory(conf, null, getName());
+ WAL log = walfactory.getWAL(info.getEncodedNameAsBytes(), info.getTable().getNamespace());
+ // This test depends on timestamp being millisecond based and the filename of the WAL also
+ // being millisecond based.
+ long ts = System.currentTimeMillis();
+ WALEdit edit = new WALEdit();
+ edit.add(new KeyValue(rowName, family, Bytes.toBytes("1"), ts, value));
+ log.append(info, getWalKey(ts, scopes), edit, true);
+ edit = new WALEdit();
+ edit.add(new KeyValue(rowName, family, Bytes.toBytes("2"), ts+1, value));
+ log.append(info, getWalKey(ts+1, scopes), edit, true);
+ log.sync();
+ LOG.info("Before 1st WAL roll " + log.toString());
+ log.rollWriter();
+ LOG.info("Past 1st WAL roll " + log.toString());
+
+ Thread.sleep(1);
+ long ts1 = System.currentTimeMillis();
+
+ edit = new WALEdit();
+ edit.add(new KeyValue(rowName, family, Bytes.toBytes("3"), ts1+1, value));
+ log.append(info, getWalKey(ts1+1, scopes), edit, true);
+ edit = new WALEdit();
+ edit.add(new KeyValue(rowName, family, Bytes.toBytes("4"), ts1+2, value));
+ log.append(info, getWalKey(ts1+2, scopes), edit, true);
+ log.sync();
+ log.shutdown();
+ walfactory.shutdown();
+ LOG.info("Closed WAL " + log.toString());
+
+
+ WALInputFormat input = new WALInputFormat();
+ Configuration jobConf = new Configuration(conf);
+ jobConf.set("mapreduce.input.fileinputformat.inputdir", logDir.toString());
+ jobConf.setLong(WALInputFormat.END_TIME_KEY, ts);
+
+ // only 1st file is considered, and only its 1st entry is used
+ List<InputSplit> splits = input.getSplits(MapreduceTestingShim.createJobContext(jobConf));
+
+ assertEquals(1, splits.size());
+ testSplit(splits.get(0), Bytes.toBytes("1"));
+
+ jobConf.setLong(WALInputFormat.START_TIME_KEY, ts+1);
+ jobConf.setLong(WALInputFormat.END_TIME_KEY, ts1+1);
+ splits = input.getSplits(MapreduceTestingShim.createJobContext(jobConf));
+ // both files need to be considered
+ assertEquals(2, splits.size());
+ // only the 2nd entry from the 1st file is used
+ testSplit(splits.get(0), Bytes.toBytes("2"));
+ // only the 1nd entry from the 2nd file is used
+ testSplit(splits.get(1), Bytes.toBytes("3"));
+ }
+
+ /**
+ * Test basic functionality
+ * @throws Exception
+ */
+ @Test
+ public void testWALRecordReader() throws Exception {
+ final WALFactory walfactory = new WALFactory(conf, null, getName());
+ WAL log = walfactory.getWAL(info.getEncodedNameAsBytes(), info.getTable().getNamespace());
+ byte [] value = Bytes.toBytes("value");
+ final AtomicLong sequenceId = new AtomicLong(0);
+ WALEdit edit = new WALEdit();
+ edit.add(new KeyValue(rowName, family, Bytes.toBytes("1"),
+ System.currentTimeMillis(), value));
+ long txid = log.append(info, getWalKey(System.currentTimeMillis(), scopes), edit, true);
+ log.sync(txid);
+
+ Thread.sleep(1); // make sure 2nd log gets a later timestamp
+ long secondTs = System.currentTimeMillis();
+ log.rollWriter();
+
+ edit = new WALEdit();
+ edit.add(new KeyValue(rowName, family, Bytes.toBytes("2"),
+ System.currentTimeMillis(), value));
+ txid = log.append(info, getWalKey(System.currentTimeMillis(), scopes), edit, true);
+ log.sync(txid);
+ log.shutdown();
+ walfactory.shutdown();
+ long thirdTs = System.currentTimeMillis();
+
+ // should have 2 log files now
+ WALInputFormat input = new WALInputFormat();
+ Configuration jobConf = new Configuration(conf);
+ jobConf.set("mapreduce.input.fileinputformat.inputdir", logDir.toString());
+
+ // make sure both logs are found
+ List<InputSplit> splits = input.getSplits(MapreduceTestingShim.createJobContext(jobConf));
+ assertEquals(2, splits.size());
+
+ // should return exactly one KV
+ testSplit(splits.get(0), Bytes.toBytes("1"));
+ // same for the 2nd split
+ testSplit(splits.get(1), Bytes.toBytes("2"));
+
+ // now test basic time ranges:
+
+ // set an endtime, the 2nd log file can be ignored completely.
+ jobConf.setLong(WALInputFormat.END_TIME_KEY, secondTs-1);
+ splits = input.getSplits(MapreduceTestingShim.createJobContext(jobConf));
+ assertEquals(1, splits.size());
+ testSplit(splits.get(0), Bytes.toBytes("1"));
+
+ // now set a start time
+ jobConf.setLong(WALInputFormat.END_TIME_KEY, Long.MAX_VALUE);
+ jobConf.setLong(WALInputFormat.START_TIME_KEY, thirdTs);
+ splits = input.getSplits(MapreduceTestingShim.createJobContext(jobConf));
+ // both logs need to be considered
+ assertEquals(2, splits.size());
+ // but both readers skip all edits
+ testSplit(splits.get(0));
+ testSplit(splits.get(1));
+ }
+
+ protected WALKey getWalKey(final long time, NavigableMap<byte[], Integer> scopes) {
+ return new WALKey(info.getEncodedNameAsBytes(), tableName, time, mvcc, scopes);
+ }
+
+ protected WALRecordReader getReader() {
+ return new WALKeyRecordReader();
+ }
+
+ /**
+ * Create a new reader from the split, and match the edits against the passed columns.
+ */
+ private void testSplit(InputSplit split, byte[]... columns) throws Exception {
+ final WALRecordReader reader = getReader();
+ reader.initialize(split, MapReduceTestUtil.createDummyMapTaskAttemptContext(conf));
+
+ for (byte[] column : columns) {
+ assertTrue(reader.nextKeyValue());
+ Cell cell = reader.getCurrentValue().getCells().get(0);
+ if (!Bytes.equals(column, 0, column.length, cell.getQualifierArray(),
+ cell.getQualifierOffset(), cell.getQualifierLength())) {
+ assertTrue(
+ "expected ["
+ + Bytes.toString(column)
+ + "], actual ["
+ + Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(),
+ cell.getQualifierLength()) + "]", false);
+ }
+ }
+ assertFalse(reader.nextKeyValue());
+ reader.close();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TsvImporterCustomTestMapper.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TsvImporterCustomTestMapper.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TsvImporterCustomTestMapper.java
new file mode 100644
index 0000000..aea5036
--- /dev/null
+++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TsvImporterCustomTestMapper.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Durability;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.KeyValue;
+
+import java.io.IOException;
+
+/**
+ * Dummy mapper used for unit tests to verify that the mapper can be injected.
+ * This approach would be used if a custom transformation needed to be done after
+ * reading the input data before writing it to HFiles.
+ */
+public class TsvImporterCustomTestMapper extends TsvImporterMapper {
+
+ @Override
+ protected void setup(Context context) {
+ doSetup(context);
+ }
+
+ /**
+ * Convert a line of TSV text into an HBase table row after transforming the
+ * values by multiplying them by 3.
+ */
+ @Override
+ public void map(LongWritable offset, Text value, Context context)
+ throws IOException {
+ byte[] family = Bytes.toBytes("FAM");
+ final byte[][] qualifiers = { Bytes.toBytes("A"), Bytes.toBytes("B") };
+
+ // do some basic line parsing
+ byte[] lineBytes = value.getBytes();
+ String[] valueTokens = new String(lineBytes, "UTF-8").split("\u001b");
+
+ // create the rowKey and Put
+ ImmutableBytesWritable rowKey =
+ new ImmutableBytesWritable(Bytes.toBytes(valueTokens[0]));
+ Put put = new Put(rowKey.copyBytes());
+ put.setDurability(Durability.SKIP_WAL);
+
+ //The value should look like this: VALUE1 or VALUE2. Let's multiply
+ //the integer by 3
+ for(int i = 1; i < valueTokens.length; i++) {
+ String prefix = valueTokens[i].substring(0, "VALUE".length());
+ String suffix = valueTokens[i].substring("VALUE".length());
+ String newValue = prefix + Integer.parseInt(suffix) * 3;
+
+ KeyValue kv = new KeyValue(rowKey.copyBytes(), family,
+ qualifiers[i-1], Bytes.toBytes(newValue));
+ put.add(kv);
+ }
+
+ try {
+ context.write(rowKey, put);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+}