You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by bu...@apache.org on 2017/08/27 05:33:30 UTC
[30/50] [abbrv] hbase git commit: HBASE-18640 Move mapreduce out of
hbase-server into separate module.
http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultithreadedTableMapper.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultithreadedTableMapper.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultithreadedTableMapper.java
new file mode 100644
index 0000000..694a359
--- /dev/null
+++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultithreadedTableMapper.java
@@ -0,0 +1,264 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.NavigableMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.testclassification.MapReduceTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+/**
+ * Test Map/Reduce job over HBase tables. The map/reduce process we're testing
+ * on our tables is simple - take every row in the table, reverse the value of
+ * a particular cell, and write it back to the table.
+ */
+@Category({MapReduceTests.class, LargeTests.class})
+public class TestMultithreadedTableMapper {
+ private static final Log LOG = LogFactory.getLog(TestMultithreadedTableMapper.class);
+ private static final HBaseTestingUtility UTIL =
+ new HBaseTestingUtility();
+ static final TableName MULTI_REGION_TABLE_NAME = TableName.valueOf("mrtest");
+ static final byte[] INPUT_FAMILY = Bytes.toBytes("contents");
+ static final byte[] OUTPUT_FAMILY = Bytes.toBytes("text");
+ static final int NUMBER_OF_THREADS = 10;
+
+ @BeforeClass
+ public static void beforeClass() throws Exception {
+ // Up the handlers; this test needs more than usual.
+ UTIL.getConfiguration().setInt(HConstants.REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT, 10);
+ UTIL.startMiniCluster();
+ Table table =
+ UTIL.createMultiRegionTable(MULTI_REGION_TABLE_NAME, new byte[][] { INPUT_FAMILY,
+ OUTPUT_FAMILY });
+ UTIL.loadTable(table, INPUT_FAMILY, false);
+ UTIL.waitUntilAllRegionsAssigned(MULTI_REGION_TABLE_NAME);
+ }
+
+ @AfterClass
+ public static void afterClass() throws Exception {
+ UTIL.shutdownMiniCluster();
+ }
+
+ /**
+ * Pass the given key and processed record reduce
+ */
+ public static class ProcessContentsMapper
+ extends TableMapper<ImmutableBytesWritable, Put> {
+
+ /**
+ * Pass the key, and reversed value to reduce
+ *
+ * @param key
+ * @param value
+ * @param context
+ * @throws IOException
+ */
+ @Override
+ public void map(ImmutableBytesWritable key, Result value,
+ Context context)
+ throws IOException, InterruptedException {
+ if (value.size() != 1) {
+ throw new IOException("There should only be one input column");
+ }
+ Map<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>>
+ cf = value.getMap();
+ if(!cf.containsKey(INPUT_FAMILY)) {
+ throw new IOException("Wrong input columns. Missing: '" +
+ Bytes.toString(INPUT_FAMILY) + "'.");
+ }
+ // Get the original value and reverse it
+ String originalValue = Bytes.toString(value.getValue(INPUT_FAMILY, INPUT_FAMILY));
+ StringBuilder newValue = new StringBuilder(originalValue);
+ newValue.reverse();
+ // Now set the value to be collected
+ Put outval = new Put(key.get());
+ outval.addColumn(OUTPUT_FAMILY, null, Bytes.toBytes(newValue.toString()));
+ context.write(key, outval);
+ }
+ }
+
+ /**
+ * Test multithreadedTableMappper map/reduce against a multi-region table
+ * @throws IOException
+ * @throws ClassNotFoundException
+ * @throws InterruptedException
+ */
+ @Test
+ public void testMultithreadedTableMapper()
+ throws IOException, InterruptedException, ClassNotFoundException {
+ runTestOnTable(UTIL.getConnection().getTable(MULTI_REGION_TABLE_NAME));
+ }
+
+ private void runTestOnTable(Table table)
+ throws IOException, InterruptedException, ClassNotFoundException {
+ Job job = null;
+ try {
+ LOG.info("Before map/reduce startup");
+ job = new Job(table.getConfiguration(), "process column contents");
+ job.setNumReduceTasks(1);
+ Scan scan = new Scan();
+ scan.addFamily(INPUT_FAMILY);
+ TableMapReduceUtil.initTableMapperJob(
+ table.getName(), scan,
+ MultithreadedTableMapper.class, ImmutableBytesWritable.class,
+ Put.class, job);
+ MultithreadedTableMapper.setMapperClass(job, ProcessContentsMapper.class);
+ MultithreadedTableMapper.setNumberOfThreads(job, NUMBER_OF_THREADS);
+ TableMapReduceUtil.initTableReducerJob(
+ table.getName().getNameAsString(),
+ IdentityTableReducer.class, job);
+ FileOutputFormat.setOutputPath(job, new Path("test"));
+ LOG.info("Started " + table.getName());
+ assertTrue(job.waitForCompletion(true));
+ LOG.info("After map/reduce completion");
+ // verify map-reduce results
+ verify(table.getName());
+ } finally {
+ table.close();
+ if (job != null) {
+ FileUtil.fullyDelete(
+ new File(job.getConfiguration().get("hadoop.tmp.dir")));
+ }
+ }
+ }
+
+ private void verify(TableName tableName) throws IOException {
+ Table table = UTIL.getConnection().getTable(tableName);
+ boolean verified = false;
+ long pause = UTIL.getConfiguration().getLong("hbase.client.pause", 5 * 1000);
+ int numRetries = UTIL.getConfiguration().getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 5);
+ for (int i = 0; i < numRetries; i++) {
+ try {
+ LOG.info("Verification attempt #" + i);
+ verifyAttempt(table);
+ verified = true;
+ break;
+ } catch (NullPointerException e) {
+ // If here, a cell was empty. Presume its because updates came in
+ // after the scanner had been opened. Wait a while and retry.
+ LOG.debug("Verification attempt failed: " + e.getMessage());
+ }
+ try {
+ Thread.sleep(pause);
+ } catch (InterruptedException e) {
+ // continue
+ }
+ }
+ assertTrue(verified);
+ table.close();
+ }
+
+ /**
+ * Looks at every value of the mapreduce output and verifies that indeed
+ * the values have been reversed.
+ *
+ * @param table Table to scan.
+ * @throws IOException
+ * @throws NullPointerException if we failed to find a cell value
+ */
+ private void verifyAttempt(final Table table)
+ throws IOException, NullPointerException {
+ Scan scan = new Scan();
+ scan.addFamily(INPUT_FAMILY);
+ scan.addFamily(OUTPUT_FAMILY);
+ ResultScanner scanner = table.getScanner(scan);
+ try {
+ Iterator<Result> itr = scanner.iterator();
+ assertTrue(itr.hasNext());
+ while(itr.hasNext()) {
+ Result r = itr.next();
+ if (LOG.isDebugEnabled()) {
+ if (r.size() > 2 ) {
+ throw new IOException("Too many results, expected 2 got " +
+ r.size());
+ }
+ }
+ byte[] firstValue = null;
+ byte[] secondValue = null;
+ int count = 0;
+ for(Cell kv : r.listCells()) {
+ if (count == 0) {
+ firstValue = CellUtil.cloneValue(kv);
+ }else if (count == 1) {
+ secondValue = CellUtil.cloneValue(kv);
+ }else if (count == 2) {
+ break;
+ }
+ count++;
+ }
+ String first = "";
+ if (firstValue == null) {
+ throw new NullPointerException(Bytes.toString(r.getRow()) +
+ ": first value is null");
+ }
+ first = Bytes.toString(firstValue);
+ String second = "";
+ if (secondValue == null) {
+ throw new NullPointerException(Bytes.toString(r.getRow()) +
+ ": second value is null");
+ }
+ byte[] secondReversed = new byte[secondValue.length];
+ for (int i = 0, j = secondValue.length - 1; j >= 0; j--, i++) {
+ secondReversed[i] = secondValue[j];
+ }
+ second = Bytes.toString(secondReversed);
+ if (first.compareTo(second) != 0) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("second key is not the reverse of first. row=" +
+ Bytes.toStringBinary(r.getRow()) + ", first value=" + first +
+ ", second value=" + second);
+ }
+ fail();
+ }
+ }
+ } finally {
+ scanner.close();
+ }
+ }
+
+}
+
http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestRegionSizeCalculator.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestRegionSizeCalculator.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestRegionSizeCalculator.java
new file mode 100644
index 0000000..301cfef
--- /dev/null
+++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestRegionSizeCalculator.java
@@ -0,0 +1,160 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.RegionLoad;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.testclassification.MiscTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.client.RegionLocator;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.mockito.Mockito;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+import static org.apache.hadoop.hbase.HConstants.DEFAULT_REGIONSERVER_PORT;
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.when;
+
+@Category({MiscTests.class, SmallTests.class})
+public class TestRegionSizeCalculator {
+
+ private Configuration configuration = new Configuration();
+ private final long megabyte = 1024L * 1024L;
+ private final ServerName sn = ServerName.valueOf("local-rs", DEFAULT_REGIONSERVER_PORT,
+ ServerName.NON_STARTCODE);
+
+ @Test
+ public void testSimpleTestCase() throws Exception {
+
+ RegionLocator regionLocator = mockRegionLocator("region1", "region2", "region3");
+
+ Admin admin = mockAdmin(
+ mockRegion("region1", 123),
+ mockRegion("region3", 1232),
+ mockRegion("region2", 54321)
+ );
+
+ RegionSizeCalculator calculator = new RegionSizeCalculator(regionLocator, admin);
+
+ assertEquals(123 * megabyte, calculator.getRegionSize("region1".getBytes()));
+ assertEquals(54321 * megabyte, calculator.getRegionSize("region2".getBytes()));
+ assertEquals(1232 * megabyte, calculator.getRegionSize("region3".getBytes()));
+ // if regionCalculator does not know about a region, it should return 0
+ assertEquals(0 * megabyte, calculator.getRegionSize("otherTableRegion".getBytes()));
+
+ assertEquals(3, calculator.getRegionSizeMap().size());
+ }
+
+
+ /**
+ * When size of region in megabytes is larger than largest possible integer there could be
+ * error caused by lost of precision.
+ * */
+ @Test
+ public void testLargeRegion() throws Exception {
+
+ RegionLocator regionLocator = mockRegionLocator("largeRegion");
+
+ Admin admin = mockAdmin(
+ mockRegion("largeRegion", Integer.MAX_VALUE)
+ );
+
+ RegionSizeCalculator calculator = new RegionSizeCalculator(regionLocator, admin);
+
+ assertEquals(((long) Integer.MAX_VALUE) * megabyte, calculator.getRegionSize("largeRegion".getBytes()));
+ }
+
+ /** When calculator is disabled, it should return 0 for each request.*/
+ @Test
+ public void testDisabled() throws Exception {
+ String regionName = "cz.goout:/index.html";
+ RegionLocator table = mockRegionLocator(regionName);
+
+ Admin admin = mockAdmin(
+ mockRegion(regionName, 999)
+ );
+
+ //first request on enabled calculator
+ RegionSizeCalculator calculator = new RegionSizeCalculator(table, admin);
+ assertEquals(999 * megabyte, calculator.getRegionSize(regionName.getBytes()));
+
+ //then disabled calculator.
+ configuration.setBoolean(RegionSizeCalculator.ENABLE_REGIONSIZECALCULATOR, false);
+ RegionSizeCalculator disabledCalculator = new RegionSizeCalculator(table, admin);
+ assertEquals(0 * megabyte, disabledCalculator.getRegionSize(regionName.getBytes()));
+
+ assertEquals(0, disabledCalculator.getRegionSizeMap().size());
+ }
+
+ /**
+ * Makes some table with given region names.
+ * */
+ private RegionLocator mockRegionLocator(String... regionNames) throws IOException {
+ RegionLocator mockedTable = Mockito.mock(RegionLocator.class);
+ when(mockedTable.getName()).thenReturn(TableName.valueOf("sizeTestTable"));
+ List<HRegionLocation> regionLocations = new ArrayList<>(regionNames.length);
+ when(mockedTable.getAllRegionLocations()).thenReturn(regionLocations);
+
+ for (String regionName : regionNames) {
+ HRegionInfo info = Mockito.mock(HRegionInfo.class);
+ when(info.getRegionName()).thenReturn(regionName.getBytes());
+ regionLocations.add(new HRegionLocation(info, sn));
+ }
+
+ return mockedTable;
+ }
+
+ /**
+ * Creates mock returning RegionLoad info about given servers.
+ */
+ private Admin mockAdmin(RegionLoad... regionLoadArray) throws Exception {
+ Admin mockAdmin = Mockito.mock(Admin.class);
+ Map<byte[], RegionLoad> regionLoads = new TreeMap<>(Bytes.BYTES_COMPARATOR);
+ for (RegionLoad regionLoad : regionLoadArray) {
+ regionLoads.put(regionLoad.getName(), regionLoad);
+ }
+ when(mockAdmin.getConfiguration()).thenReturn(configuration);
+ when(mockAdmin.getRegionLoad(sn, TableName.valueOf("sizeTestTable"))).thenReturn(regionLoads);
+ return mockAdmin;
+ }
+
+ /**
+ * Creates mock of region with given name and size.
+ *
+ * @param fileSizeMb number of megabytes occupied by region in file store in megabytes
+ * */
+ private RegionLoad mockRegion(String regionName, int fileSizeMb) {
+ RegionLoad region = Mockito.mock(RegionLoad.class);
+ when(region.getName()).thenReturn(regionName.getBytes());
+ when(region.getNameAsString()).thenReturn(regionName);
+ when(region.getStorefileSizeMB()).thenReturn(fileSizeMb);
+ return region;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestRowCounter.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestRowCounter.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestRowCounter.java
new file mode 100644
index 0000000..3b84e2d
--- /dev/null
+++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestRowCounter.java
@@ -0,0 +1,400 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.mapreduce;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.util.ArrayList;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.CategoryBasedTimeout;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.testclassification.MapReduceTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.LauncherSecurityManager;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.Job;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestRule;
+
+/**
+ * Test the rowcounter map reduce job.
+ */
+@Category({MapReduceTests.class, LargeTests.class})
+public class TestRowCounter {
+ @Rule public final TestRule timeout = CategoryBasedTimeout.builder().
+ withTimeout(this.getClass()).withLookingForStuckThread(true).build();
+ private static final Log LOG = LogFactory.getLog(TestRowCounter.class);
+ private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+ private final static String TABLE_NAME = "testRowCounter";
+ private final static String TABLE_NAME_TS_RANGE = "testRowCounter_ts_range";
+ private final static String COL_FAM = "col_fam";
+ private final static String COL1 = "c1";
+ private final static String COL2 = "c2";
+ private final static String COMPOSITE_COLUMN = "C:A:A";
+ private final static int TOTAL_ROWS = 10;
+ private final static int ROWS_WITH_ONE_COL = 2;
+
+ /**
+ * @throws java.lang.Exception
+ */
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ TEST_UTIL.startMiniCluster();
+ Table table = TEST_UTIL.createTable(TableName.valueOf(TABLE_NAME), Bytes.toBytes(COL_FAM));
+ writeRows(table, TOTAL_ROWS, ROWS_WITH_ONE_COL);
+ table.close();
+ }
+
+ /**
+ * @throws java.lang.Exception
+ */
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ TEST_UTIL.shutdownMiniCluster();
+ }
+
+ /**
+ * Test a case when no column was specified in command line arguments.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testRowCounterNoColumn() throws Exception {
+ String[] args = new String[] {
+ TABLE_NAME
+ };
+ runRowCount(args, 10);
+ }
+
+ /**
+ * Test a case when the column specified in command line arguments is
+ * exclusive for few rows.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testRowCounterExclusiveColumn() throws Exception {
+ String[] args = new String[] {
+ TABLE_NAME, COL_FAM + ":" + COL1
+ };
+ runRowCount(args, 8);
+ }
+
+ /**
+ * Test a case when the column specified in command line arguments is
+ * one for which the qualifier contains colons.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testRowCounterColumnWithColonInQualifier() throws Exception {
+ String[] args = new String[] {
+ TABLE_NAME, COL_FAM + ":" + COMPOSITE_COLUMN
+ };
+ runRowCount(args, 8);
+ }
+
+ /**
+ * Test a case when the column specified in command line arguments is not part
+ * of first KV for a row.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testRowCounterHiddenColumn() throws Exception {
+ String[] args = new String[] {
+ TABLE_NAME, COL_FAM + ":" + COL2
+ };
+ runRowCount(args, 10);
+ }
+
+
+ /**
+ * Test a case when the column specified in command line arguments is
+ * exclusive for few rows and also a row range filter is specified
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testRowCounterColumnAndRowRange() throws Exception {
+ String[] args = new String[] {
+ TABLE_NAME, "--range=\\x00rov,\\x00rox", COL_FAM + ":" + COL1
+ };
+ runRowCount(args, 8);
+ }
+
+ /**
+ * Test a case when a range is specified with single range of start-end keys
+ * @throws Exception
+ */
+ @Test
+ public void testRowCounterRowSingleRange() throws Exception {
+ String[] args = new String[] {
+ TABLE_NAME, "--range=\\x00row1,\\x00row3"
+ };
+ runRowCount(args, 2);
+ }
+
+ /**
+ * Test a case when a range is specified with single range with end key only
+ * @throws Exception
+ */
+ @Test
+ public void testRowCounterRowSingleRangeUpperBound() throws Exception {
+ String[] args = new String[] {
+ TABLE_NAME, "--range=,\\x00row3"
+ };
+ runRowCount(args, 3);
+ }
+
+ /**
+ * Test a case when a range is specified with two ranges where one range is with end key only
+ * @throws Exception
+ */
+ @Test
+ public void testRowCounterRowMultiRangeUpperBound() throws Exception {
+ String[] args = new String[] {
+ TABLE_NAME, "--range=,\\x00row3;\\x00row5,\\x00row7"
+ };
+ runRowCount(args, 5);
+ }
+
+ /**
+ * Test a case when a range is specified with multiple ranges of start-end keys
+ * @throws Exception
+ */
+ @Test
+ public void testRowCounterRowMultiRange() throws Exception {
+ String[] args = new String[] {
+ TABLE_NAME, "--range=\\x00row1,\\x00row3;\\x00row5,\\x00row8"
+ };
+ runRowCount(args, 5);
+ }
+
+ /**
+ * Test a case when a range is specified with multiple ranges of start-end keys;
+ * one range is filled, another two are not
+ * @throws Exception
+ */
+ @Test
+ public void testRowCounterRowMultiEmptyRange() throws Exception {
+ String[] args = new String[] {
+ TABLE_NAME, "--range=\\x00row1,\\x00row3;;"
+ };
+ runRowCount(args, 2);
+ }
+
+ @Test
+ public void testRowCounter10kRowRange() throws Exception {
+ String tableName = TABLE_NAME + "10k";
+
+ try (Table table = TEST_UTIL.createTable(
+ TableName.valueOf(tableName), Bytes.toBytes(COL_FAM))) {
+ writeRows(table, 10000, 0);
+ }
+ String[] args = new String[] {
+ tableName, "--range=\\x00row9872,\\x00row9875"
+ };
+ runRowCount(args, 3);
+ }
+
+ /**
+ * Test a case when the timerange is specified with --starttime and --endtime options
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testRowCounterTimeRange() throws Exception {
+ final byte[] family = Bytes.toBytes(COL_FAM);
+ final byte[] col1 = Bytes.toBytes(COL1);
+ Put put1 = new Put(Bytes.toBytes("row_timerange_" + 1));
+ Put put2 = new Put(Bytes.toBytes("row_timerange_" + 2));
+ Put put3 = new Put(Bytes.toBytes("row_timerange_" + 3));
+
+ long ts;
+
+ // clean up content of TABLE_NAME
+ Table table = TEST_UTIL.createTable(TableName.valueOf(TABLE_NAME_TS_RANGE), Bytes.toBytes(COL_FAM));
+
+ ts = System.currentTimeMillis();
+ put1.addColumn(family, col1, ts, Bytes.toBytes("val1"));
+ table.put(put1);
+ Thread.sleep(100);
+
+ ts = System.currentTimeMillis();
+ put2.addColumn(family, col1, ts, Bytes.toBytes("val2"));
+ put3.addColumn(family, col1, ts, Bytes.toBytes("val3"));
+ table.put(put2);
+ table.put(put3);
+ table.close();
+
+ String[] args = new String[] {
+ TABLE_NAME_TS_RANGE, COL_FAM + ":" + COL1,
+ "--starttime=" + 0,
+ "--endtime=" + ts
+ };
+ runRowCount(args, 1);
+
+ args = new String[] {
+ TABLE_NAME_TS_RANGE, COL_FAM + ":" + COL1,
+ "--starttime=" + 0,
+ "--endtime=" + (ts - 10)
+ };
+ runRowCount(args, 1);
+
+ args = new String[] {
+ TABLE_NAME_TS_RANGE, COL_FAM + ":" + COL1,
+ "--starttime=" + ts,
+ "--endtime=" + (ts + 1000)
+ };
+ runRowCount(args, 2);
+
+ args = new String[] {
+ TABLE_NAME_TS_RANGE, COL_FAM + ":" + COL1,
+ "--starttime=" + (ts - 30 * 1000),
+ "--endtime=" + (ts + 30 * 1000),
+ };
+ runRowCount(args, 3);
+ }
+
+ /**
+ * Run the RowCounter map reduce job and verify the row count.
+ *
+ * @param args the command line arguments to be used for rowcounter job.
+ * @param expectedCount the expected row count (result of map reduce job).
+ * @throws Exception
+ */
+ private void runRowCount(String[] args, int expectedCount) throws Exception {
+ Job job = RowCounter.createSubmittableJob(TEST_UTIL.getConfiguration(), args);
+ long start = System.currentTimeMillis();
+ job.waitForCompletion(true);
+ long duration = System.currentTimeMillis() - start;
+ LOG.debug("row count duration (ms): " + duration);
+ assertTrue(job.isSuccessful());
+ Counter counter = job.getCounters().findCounter(RowCounter.RowCounterMapper.Counters.ROWS);
+ assertEquals(expectedCount, counter.getValue());
+ }
+
+ /**
+ * Writes TOTAL_ROWS number of distinct rows in to the table. Few rows have
+ * two columns, Few have one.
+ *
+ * @param table
+ * @throws IOException
+ */
+ private static void writeRows(Table table, int totalRows, int rowsWithOneCol) throws IOException {
+ final byte[] family = Bytes.toBytes(COL_FAM);
+ final byte[] value = Bytes.toBytes("abcd");
+ final byte[] col1 = Bytes.toBytes(COL1);
+ final byte[] col2 = Bytes.toBytes(COL2);
+ final byte[] col3 = Bytes.toBytes(COMPOSITE_COLUMN);
+ ArrayList<Put> rowsUpdate = new ArrayList<>();
+ // write few rows with two columns
+ int i = 0;
+ for (; i < totalRows - rowsWithOneCol; i++) {
+ // Use binary rows values to test for HBASE-15287.
+ byte[] row = Bytes.toBytesBinary("\\x00row" + i);
+ Put put = new Put(row);
+ put.addColumn(family, col1, value);
+ put.addColumn(family, col2, value);
+ put.addColumn(family, col3, value);
+ rowsUpdate.add(put);
+ }
+
+ // write few rows with only one column
+ for (; i < totalRows; i++) {
+ byte[] row = Bytes.toBytes("row" + i);
+ Put put = new Put(row);
+ put.addColumn(family, col2, value);
+ rowsUpdate.add(put);
+ }
+ table.put(rowsUpdate);
+ }
+
+ /**
+ * test main method. Import should print help and call System.exit
+ */
+ @Test
+ public void testImportMain() throws Exception {
+ PrintStream oldPrintStream = System.err;
+ SecurityManager SECURITY_MANAGER = System.getSecurityManager();
+ LauncherSecurityManager newSecurityManager= new LauncherSecurityManager();
+ System.setSecurityManager(newSecurityManager);
+ ByteArrayOutputStream data = new ByteArrayOutputStream();
+ String[] args = {};
+ System.setErr(new PrintStream(data));
+ try {
+ System.setErr(new PrintStream(data));
+
+ try {
+ RowCounter.main(args);
+ fail("should be SecurityException");
+ } catch (SecurityException e) {
+ assertEquals(-1, newSecurityManager.getExitCode());
+ assertTrue(data.toString().contains("Wrong number of parameters:"));
+ assertTrue(data.toString().contains(
+ "Usage: RowCounter [options] <tablename> " +
+ "[--starttime=[start] --endtime=[end] " +
+ "[--range=[startKey],[endKey][;[startKey],[endKey]...]] " +
+ "[<column1> <column2>...]"));
+ assertTrue(data.toString().contains("-Dhbase.client.scanner.caching=100"));
+ assertTrue(data.toString().contains("-Dmapreduce.map.speculative=false"));
+ }
+ data.reset();
+ try {
+ args = new String[2];
+ args[0] = "table";
+ args[1] = "--range=1";
+ RowCounter.main(args);
+ fail("should be SecurityException");
+ } catch (SecurityException e) {
+ assertEquals(-1, newSecurityManager.getExitCode());
+ assertTrue(data.toString().contains(
+ "Please specify range in such format as \"--range=a,b\" or, with only one boundary," +
+ " \"--range=,b\" or \"--range=a,\""));
+ assertTrue(data.toString().contains(
+ "Usage: RowCounter [options] <tablename> " +
+ "[--starttime=[start] --endtime=[end] " +
+ "[--range=[startKey],[endKey][;[startKey],[endKey]...]] " +
+ "[<column1> <column2>...]"));
+ }
+
+ } finally {
+ System.setErr(oldPrintStream);
+ System.setSecurityManager(SECURITY_MANAGER);
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestSecureLoadIncrementalHFiles.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestSecureLoadIncrementalHFiles.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestSecureLoadIncrementalHFiles.java
new file mode 100644
index 0000000..78fddbc
--- /dev/null
+++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestSecureLoadIncrementalHFiles.java
@@ -0,0 +1,70 @@
+/**
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.codec.KeyValueCodecWithTags;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.testclassification.MapReduceTests;
+import org.apache.hadoop.hbase.security.UserProvider;
+import org.apache.hadoop.hbase.security.access.AccessControlLists;
+import org.apache.hadoop.hbase.security.access.SecureTestUtil;
+
+import org.junit.BeforeClass;
+import org.junit.experimental.categories.Category;
+
+/**
+ * Reruns TestLoadIncrementalHFiles using LoadIncrementalHFiles in secure mode.
+ * This suite is unable to verify the security handoff/turnover
+ * as miniCluster is running as system user thus has root privileges
+ * and delegation tokens don't seem to work on miniDFS.
+ *
+ * Thus SecureBulkload can only be completely verified by running
+ * integration tests against a secure cluster. This suite is still
+ * invaluable as it verifies the other mechanisms that need to be
+ * supported as part of a LoadIncrementalFiles call.
+ */
+@Category({MapReduceTests.class, LargeTests.class})
+public class TestSecureLoadIncrementalHFiles extends TestLoadIncrementalHFiles{
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ // set the always on security provider
+ UserProvider.setUserProviderForTesting(util.getConfiguration(),
+ HadoopSecurityEnabledUserProviderForTesting.class);
+ // setup configuration
+ SecureTestUtil.enableSecurity(util.getConfiguration());
+ util.getConfiguration().setInt(
+ LoadIncrementalHFiles.MAX_FILES_PER_REGION_PER_FAMILY,
+ MAX_FILES_PER_REGION_PER_FAMILY);
+ // change default behavior so that tag values are returned with normal rpcs
+ util.getConfiguration().set(HConstants.RPC_CODEC_CONF_KEY,
+ KeyValueCodecWithTags.class.getCanonicalName());
+
+ util.startMiniCluster();
+
+ // Wait for the ACL table to become available
+ util.waitTableEnabled(AccessControlLists.ACL_TABLE_NAME);
+
+ setupNamespace();
+ }
+
+}
+
http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestSecureLoadIncrementalHFilesSplitRecovery.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestSecureLoadIncrementalHFilesSplitRecovery.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestSecureLoadIncrementalHFilesSplitRecovery.java
new file mode 100644
index 0000000..0e877ad
--- /dev/null
+++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestSecureLoadIncrementalHFilesSplitRecovery.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.testclassification.MapReduceTests;
+import org.apache.hadoop.hbase.security.UserProvider;
+import org.apache.hadoop.hbase.security.access.AccessControlLists;
+import org.apache.hadoop.hbase.security.access.SecureTestUtil;
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+
+/**
+ * Reruns TestSecureLoadIncrementalHFilesSplitRecovery
+ * using LoadIncrementalHFiles in secure mode.
+ * This suite is unable to verify the security handoff/turnove
+ * as miniCluster is running as system user thus has root privileges
+ * and delegation tokens don't seem to work on miniDFS.
+ *
+ * Thus SecureBulkload can only be completely verified by running
+ * integration tests against a secure cluster. This suite is still
+ * invaluable as it verifies the other mechanisms that need to be
+ * supported as part of a LoadIncrementalFiles call.
+ */
+@Category({MapReduceTests.class, LargeTests.class})
+public class TestSecureLoadIncrementalHFilesSplitRecovery extends TestLoadIncrementalHFilesSplitRecovery {
+
+ //This "overrides" the parent static method
+ //make sure they are in sync
+ @BeforeClass
+ public static void setupCluster() throws Exception {
+ util = new HBaseTestingUtility();
+ // set the always on security provider
+ UserProvider.setUserProviderForTesting(util.getConfiguration(),
+ HadoopSecurityEnabledUserProviderForTesting.class);
+ // setup configuration
+ SecureTestUtil.enableSecurity(util.getConfiguration());
+
+ util.startMiniCluster();
+
+ // Wait for the ACL table to become available
+ util.waitTableEnabled(AccessControlLists.ACL_TABLE_NAME);
+ }
+
+ //Disabling this test as it does not work in secure mode
+ @Test (timeout=180000)
+ @Override
+ public void testBulkLoadPhaseFailure() {
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestSimpleTotalOrderPartitioner.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestSimpleTotalOrderPartitioner.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestSimpleTotalOrderPartitioner.java
new file mode 100644
index 0000000..5629cb4
--- /dev/null
+++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestSimpleTotalOrderPartitioner.java
@@ -0,0 +1,81 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.*;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.testclassification.MapReduceTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.experimental.categories.Category;
+
+import org.junit.Test;
+
+/**
+ * Test of simple partitioner.
+ */
+@Category({MapReduceTests.class, SmallTests.class})
+public class TestSimpleTotalOrderPartitioner {
+ protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+ Configuration conf = TEST_UTIL.getConfiguration();
+
+ @Test
+ public void testSplit() throws Exception {
+ String start = "a";
+ String end = "{";
+ SimpleTotalOrderPartitioner<byte []> p = new SimpleTotalOrderPartitioner<>();
+
+ this.conf.set(SimpleTotalOrderPartitioner.START, start);
+ this.conf.set(SimpleTotalOrderPartitioner.END, end);
+ p.setConf(this.conf);
+ ImmutableBytesWritable c = new ImmutableBytesWritable(Bytes.toBytes("c"));
+ // If one reduce, partition should be 0.
+ int partition = p.getPartition(c, HConstants.EMPTY_BYTE_ARRAY, 1);
+ assertEquals(0, partition);
+ // If two reduces, partition should be 0.
+ partition = p.getPartition(c, HConstants.EMPTY_BYTE_ARRAY, 2);
+ assertEquals(0, partition);
+ // Divide in 3.
+ partition = p.getPartition(c, HConstants.EMPTY_BYTE_ARRAY, 3);
+ assertEquals(0, partition);
+ ImmutableBytesWritable q = new ImmutableBytesWritable(Bytes.toBytes("q"));
+ partition = p.getPartition(q, HConstants.EMPTY_BYTE_ARRAY, 2);
+ assertEquals(1, partition);
+ partition = p.getPartition(q, HConstants.EMPTY_BYTE_ARRAY, 3);
+ assertEquals(2, partition);
+ // What about end and start keys.
+ ImmutableBytesWritable startBytes =
+ new ImmutableBytesWritable(Bytes.toBytes(start));
+ partition = p.getPartition(startBytes, HConstants.EMPTY_BYTE_ARRAY, 2);
+ assertEquals(0, partition);
+ partition = p.getPartition(startBytes, HConstants.EMPTY_BYTE_ARRAY, 3);
+ assertEquals(0, partition);
+ ImmutableBytesWritable endBytes =
+ new ImmutableBytesWritable(Bytes.toBytes("z"));
+ partition = p.getPartition(endBytes, HConstants.EMPTY_BYTE_ARRAY, 2);
+ assertEquals(1, partition);
+ partition = p.getPartition(endBytes, HConstants.EMPTY_BYTE_ARRAY, 3);
+ assertEquals(2, partition);
+ }
+
+}
+
http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestSyncTable.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestSyncTable.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestSyncTable.java
new file mode 100644
index 0000000..9a0c160
--- /dev/null
+++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestSyncTable.java
@@ -0,0 +1,339 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.util.Arrays;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.CategoryBasedTimeout;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.mapreduce.SyncTable.SyncMapper.Counter;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.mapreduce.Counters;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+import org.junit.rules.TestRule;
+
+import org.apache.hadoop.hbase.shaded.com.google.common.base.Throwables;
+
+/**
+ * Basic test for the SyncTable M/R tool
+ */
+@Category(LargeTests.class)
+public class TestSyncTable {
+ @Rule public final TestRule timeout = CategoryBasedTimeout.builder().
+ withTimeout(this.getClass()).withLookingForStuckThread(true).build();
+ private static final Log LOG = LogFactory.getLog(TestSyncTable.class);
+
+ private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+
+ @Rule
+ public TestName name = new TestName();
+
+ @BeforeClass
+ public static void beforeClass() throws Exception {
+ TEST_UTIL.startMiniCluster(3);
+ }
+
+ @AfterClass
+ public static void afterClass() throws Exception {
+ TEST_UTIL.shutdownMiniCluster();
+ }
+
+ private static byte[][] generateSplits(int numRows, int numRegions) {
+ byte[][] splitRows = new byte[numRegions-1][];
+ for (int i = 1; i < numRegions; i++) {
+ splitRows[i-1] = Bytes.toBytes(numRows * i / numRegions);
+ }
+ return splitRows;
+ }
+
+ @Test
+ public void testSyncTable() throws Exception {
+ final TableName sourceTableName = TableName.valueOf(name.getMethodName() + "_source");
+ final TableName targetTableName = TableName.valueOf(name.getMethodName() + "_target");
+ Path testDir = TEST_UTIL.getDataTestDirOnTestFS("testSyncTable");
+
+ writeTestData(sourceTableName, targetTableName);
+ hashSourceTable(sourceTableName, testDir);
+ Counters syncCounters = syncTables(sourceTableName, targetTableName, testDir);
+ assertEqualTables(90, sourceTableName, targetTableName);
+
+ assertEquals(60, syncCounters.findCounter(Counter.ROWSWITHDIFFS).getValue());
+ assertEquals(10, syncCounters.findCounter(Counter.SOURCEMISSINGROWS).getValue());
+ assertEquals(10, syncCounters.findCounter(Counter.TARGETMISSINGROWS).getValue());
+ assertEquals(50, syncCounters.findCounter(Counter.SOURCEMISSINGCELLS).getValue());
+ assertEquals(50, syncCounters.findCounter(Counter.TARGETMISSINGCELLS).getValue());
+ assertEquals(20, syncCounters.findCounter(Counter.DIFFERENTCELLVALUES).getValue());
+
+ TEST_UTIL.deleteTable(sourceTableName);
+ TEST_UTIL.deleteTable(targetTableName);
+ TEST_UTIL.cleanupDataTestDirOnTestFS();
+ }
+
+ private void assertEqualTables(int expectedRows, TableName sourceTableName,
+ TableName targetTableName) throws Exception {
+ Table sourceTable = TEST_UTIL.getConnection().getTable(sourceTableName);
+ Table targetTable = TEST_UTIL.getConnection().getTable(targetTableName);
+
+ ResultScanner sourceScanner = sourceTable.getScanner(new Scan());
+ ResultScanner targetScanner = targetTable.getScanner(new Scan());
+
+ for (int i = 0; i < expectedRows; i++) {
+ Result sourceRow = sourceScanner.next();
+ Result targetRow = targetScanner.next();
+
+ LOG.debug("SOURCE row: " + (sourceRow == null ? "null" : Bytes.toInt(sourceRow.getRow()))
+ + " cells:" + sourceRow);
+ LOG.debug("TARGET row: " + (targetRow == null ? "null" : Bytes.toInt(targetRow.getRow()))
+ + " cells:" + targetRow);
+
+ if (sourceRow == null) {
+ Assert.fail("Expected " + expectedRows
+ + " source rows but only found " + i);
+ }
+ if (targetRow == null) {
+ Assert.fail("Expected " + expectedRows
+ + " target rows but only found " + i);
+ }
+ Cell[] sourceCells = sourceRow.rawCells();
+ Cell[] targetCells = targetRow.rawCells();
+ if (sourceCells.length != targetCells.length) {
+ LOG.debug("Source cells: " + Arrays.toString(sourceCells));
+ LOG.debug("Target cells: " + Arrays.toString(targetCells));
+ Assert.fail("Row " + Bytes.toInt(sourceRow.getRow())
+ + " has " + sourceCells.length
+ + " cells in source table but " + targetCells.length
+ + " cells in target table");
+ }
+ for (int j = 0; j < sourceCells.length; j++) {
+ Cell sourceCell = sourceCells[j];
+ Cell targetCell = targetCells[j];
+ try {
+ if (!CellUtil.matchingRow(sourceCell, targetCell)) {
+ Assert.fail("Rows don't match");
+ }
+ if (!CellUtil.matchingFamily(sourceCell, targetCell)) {
+ Assert.fail("Families don't match");
+ }
+ if (!CellUtil.matchingQualifier(sourceCell, targetCell)) {
+ Assert.fail("Qualifiers don't match");
+ }
+ if (!CellUtil.matchingTimestamp(sourceCell, targetCell)) {
+ Assert.fail("Timestamps don't match");
+ }
+ if (!CellUtil.matchingValue(sourceCell, targetCell)) {
+ Assert.fail("Values don't match");
+ }
+ } catch (Throwable t) {
+ LOG.debug("Source cell: " + sourceCell + " target cell: " + targetCell);
+ Throwables.propagate(t);
+ }
+ }
+ }
+ Result sourceRow = sourceScanner.next();
+ if (sourceRow != null) {
+ Assert.fail("Source table has more than " + expectedRows
+ + " rows. Next row: " + Bytes.toInt(sourceRow.getRow()));
+ }
+ Result targetRow = targetScanner.next();
+ if (targetRow != null) {
+ Assert.fail("Target table has more than " + expectedRows
+ + " rows. Next row: " + Bytes.toInt(targetRow.getRow()));
+ }
+ sourceScanner.close();
+ targetScanner.close();
+ sourceTable.close();
+ targetTable.close();
+ }
+
+ private Counters syncTables(TableName sourceTableName, TableName targetTableName,
+ Path testDir) throws Exception {
+ SyncTable syncTable = new SyncTable(TEST_UTIL.getConfiguration());
+ int code = syncTable.run(new String[] {
+ testDir.toString(),
+ sourceTableName.getNameAsString(),
+ targetTableName.getNameAsString()
+ });
+ assertEquals("sync table job failed", 0, code);
+
+ LOG.info("Sync tables completed");
+ return syncTable.counters;
+ }
+
+ private void hashSourceTable(TableName sourceTableName, Path testDir)
+ throws Exception, IOException {
+ int numHashFiles = 3;
+ long batchSize = 100; // should be 2 batches per region
+ int scanBatch = 1;
+ HashTable hashTable = new HashTable(TEST_UTIL.getConfiguration());
+ int code = hashTable.run(new String[] {
+ "--batchsize=" + batchSize,
+ "--numhashfiles=" + numHashFiles,
+ "--scanbatch=" + scanBatch,
+ sourceTableName.getNameAsString(),
+ testDir.toString()});
+ assertEquals("hash table job failed", 0, code);
+
+ FileSystem fs = TEST_UTIL.getTestFileSystem();
+
+ HashTable.TableHash tableHash = HashTable.TableHash.read(fs.getConf(), testDir);
+ assertEquals(sourceTableName.getNameAsString(), tableHash.tableName);
+ assertEquals(batchSize, tableHash.batchSize);
+ assertEquals(numHashFiles, tableHash.numHashFiles);
+ assertEquals(numHashFiles - 1, tableHash.partitions.size());
+
+ LOG.info("Hash table completed");
+ }
+
+ private void writeTestData(TableName sourceTableName, TableName targetTableName)
+ throws Exception {
+ final byte[] family = Bytes.toBytes("family");
+ final byte[] column1 = Bytes.toBytes("c1");
+ final byte[] column2 = Bytes.toBytes("c2");
+ final byte[] value1 = Bytes.toBytes("val1");
+ final byte[] value2 = Bytes.toBytes("val2");
+ final byte[] value3 = Bytes.toBytes("val3");
+
+ int numRows = 100;
+ int sourceRegions = 10;
+ int targetRegions = 6;
+
+ Table sourceTable = TEST_UTIL.createTable(sourceTableName,
+ family, generateSplits(numRows, sourceRegions));
+
+ Table targetTable = TEST_UTIL.createTable(targetTableName,
+ family, generateSplits(numRows, targetRegions));
+
+ long timestamp = 1430764183454L;
+
+ int rowIndex = 0;
+ // a bunch of identical rows
+ for (; rowIndex < 40; rowIndex++) {
+ Put sourcePut = new Put(Bytes.toBytes(rowIndex));
+ sourcePut.addColumn(family, column1, timestamp, value1);
+ sourcePut.addColumn(family, column2, timestamp, value2);
+ sourceTable.put(sourcePut);
+
+ Put targetPut = new Put(Bytes.toBytes(rowIndex));
+ targetPut.addColumn(family, column1, timestamp, value1);
+ targetPut.addColumn(family, column2, timestamp, value2);
+ targetTable.put(targetPut);
+ }
+ // some rows only in the source table
+ // ROWSWITHDIFFS: 10
+ // TARGETMISSINGROWS: 10
+ // TARGETMISSINGCELLS: 20
+ for (; rowIndex < 50; rowIndex++) {
+ Put put = new Put(Bytes.toBytes(rowIndex));
+ put.addColumn(family, column1, timestamp, value1);
+ put.addColumn(family, column2, timestamp, value2);
+ sourceTable.put(put);
+ }
+ // some rows only in the target table
+ // ROWSWITHDIFFS: 10
+ // SOURCEMISSINGROWS: 10
+ // SOURCEMISSINGCELLS: 20
+ for (; rowIndex < 60; rowIndex++) {
+ Put put = new Put(Bytes.toBytes(rowIndex));
+ put.addColumn(family, column1, timestamp, value1);
+ put.addColumn(family, column2, timestamp, value2);
+ targetTable.put(put);
+ }
+ // some rows with 1 missing cell in target table
+ // ROWSWITHDIFFS: 10
+ // TARGETMISSINGCELLS: 10
+ for (; rowIndex < 70; rowIndex++) {
+ Put sourcePut = new Put(Bytes.toBytes(rowIndex));
+ sourcePut.addColumn(family, column1, timestamp, value1);
+ sourcePut.addColumn(family, column2, timestamp, value2);
+ sourceTable.put(sourcePut);
+
+ Put targetPut = new Put(Bytes.toBytes(rowIndex));
+ targetPut.addColumn(family, column1, timestamp, value1);
+ targetTable.put(targetPut);
+ }
+ // some rows with 1 missing cell in source table
+ // ROWSWITHDIFFS: 10
+ // SOURCEMISSINGCELLS: 10
+ for (; rowIndex < 80; rowIndex++) {
+ Put sourcePut = new Put(Bytes.toBytes(rowIndex));
+ sourcePut.addColumn(family, column1, timestamp, value1);
+ sourceTable.put(sourcePut);
+
+ Put targetPut = new Put(Bytes.toBytes(rowIndex));
+ targetPut.addColumn(family, column1, timestamp, value1);
+ targetPut.addColumn(family, column2, timestamp, value2);
+ targetTable.put(targetPut);
+ }
+ // some rows differing only in timestamp
+ // ROWSWITHDIFFS: 10
+ // SOURCEMISSINGCELLS: 20
+ // TARGETMISSINGCELLS: 20
+ for (; rowIndex < 90; rowIndex++) {
+ Put sourcePut = new Put(Bytes.toBytes(rowIndex));
+ sourcePut.addColumn(family, column1, timestamp, column1);
+ sourcePut.addColumn(family, column2, timestamp, value2);
+ sourceTable.put(sourcePut);
+
+ Put targetPut = new Put(Bytes.toBytes(rowIndex));
+ targetPut.addColumn(family, column1, timestamp+1, column1);
+ targetPut.addColumn(family, column2, timestamp-1, value2);
+ targetTable.put(targetPut);
+ }
+ // some rows with different values
+ // ROWSWITHDIFFS: 10
+ // DIFFERENTCELLVALUES: 20
+ for (; rowIndex < numRows; rowIndex++) {
+ Put sourcePut = new Put(Bytes.toBytes(rowIndex));
+ sourcePut.addColumn(family, column1, timestamp, value1);
+ sourcePut.addColumn(family, column2, timestamp, value2);
+ sourceTable.put(sourcePut);
+
+ Put targetPut = new Put(Bytes.toBytes(rowIndex));
+ targetPut.addColumn(family, column1, timestamp, value3);
+ targetPut.addColumn(family, column2, timestamp, value3);
+ targetTable.put(targetPut);
+ }
+
+ sourceTable.close();
+ targetTable.close();
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormat.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormat.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormat.java
new file mode 100644
index 0000000..b4c6ab9
--- /dev/null
+++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormat.java
@@ -0,0 +1,481 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.anyObject;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.*;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.RegexStringComparator;
+import org.apache.hadoop.hbase.filter.RowFilter;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobConfigurable;
+import org.apache.hadoop.mapred.MiniMRCluster;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+/**
+ * This tests the TableInputFormat and its recovery semantics
+ *
+ */
+@Category(LargeTests.class)
+public class TestTableInputFormat {
+
+ private static final Log LOG = LogFactory.getLog(TestTableInputFormat.class);
+
+ private final static HBaseTestingUtility UTIL = new HBaseTestingUtility();
+ private static MiniMRCluster mrCluster;
+ static final byte[] FAMILY = Bytes.toBytes("family");
+
+ private static final byte[][] columns = new byte[][] { FAMILY };
+
+ @BeforeClass
+ public static void beforeClass() throws Exception {
+ UTIL.startMiniCluster();
+ }
+
+ @AfterClass
+ public static void afterClass() throws Exception {
+ UTIL.shutdownMiniCluster();
+ }
+
+ @Before
+ public void before() throws IOException {
+ LOG.info("before");
+ UTIL.ensureSomeRegionServersAvailable(1);
+ LOG.info("before done");
+ }
+
+ /**
+ * Setup a table with two rows and values.
+ *
+ * @param tableName
+ * @return
+ * @throws IOException
+ */
+ public static Table createTable(byte[] tableName) throws IOException {
+ return createTable(tableName, new byte[][] { FAMILY });
+ }
+
+ /**
+ * Setup a table with two rows and values per column family.
+ *
+ * @param tableName
+ * @return
+ * @throws IOException
+ */
+ public static Table createTable(byte[] tableName, byte[][] families) throws IOException {
+ Table table = UTIL.createTable(TableName.valueOf(tableName), families);
+ Put p = new Put("aaa".getBytes());
+ for (byte[] family : families) {
+ p.addColumn(family, null, "value aaa".getBytes());
+ }
+ table.put(p);
+ p = new Put("bbb".getBytes());
+ for (byte[] family : families) {
+ p.addColumn(family, null, "value bbb".getBytes());
+ }
+ table.put(p);
+ return table;
+ }
+
+ /**
+ * Verify that the result and key have expected values.
+ *
+ * @param r
+ * @param key
+ * @param expectedKey
+ * @param expectedValue
+ * @return
+ */
+ static boolean checkResult(Result r, ImmutableBytesWritable key,
+ byte[] expectedKey, byte[] expectedValue) {
+ assertEquals(0, key.compareTo(expectedKey));
+ Map<byte[], byte[]> vals = r.getFamilyMap(FAMILY);
+ byte[] value = vals.values().iterator().next();
+ assertTrue(Arrays.equals(value, expectedValue));
+ return true; // if succeed
+ }
+
+ /**
+ * Create table data and run tests on specified htable using the
+ * o.a.h.hbase.mapreduce API.
+ *
+ * @param table
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ static void runTestMapreduce(Table table) throws IOException,
+ InterruptedException {
+ org.apache.hadoop.hbase.mapreduce.TableRecordReaderImpl trr =
+ new org.apache.hadoop.hbase.mapreduce.TableRecordReaderImpl();
+ Scan s = new Scan();
+ s.setStartRow("aaa".getBytes());
+ s.setStopRow("zzz".getBytes());
+ s.addFamily(FAMILY);
+ trr.setScan(s);
+ trr.setHTable(table);
+
+ trr.initialize(null, null);
+ Result r = new Result();
+ ImmutableBytesWritable key = new ImmutableBytesWritable();
+
+ boolean more = trr.nextKeyValue();
+ assertTrue(more);
+ key = trr.getCurrentKey();
+ r = trr.getCurrentValue();
+ checkResult(r, key, "aaa".getBytes(), "value aaa".getBytes());
+
+ more = trr.nextKeyValue();
+ assertTrue(more);
+ key = trr.getCurrentKey();
+ r = trr.getCurrentValue();
+ checkResult(r, key, "bbb".getBytes(), "value bbb".getBytes());
+
+ // no more data
+ more = trr.nextKeyValue();
+ assertFalse(more);
+ }
+
+ /**
+ * Create a table that IOE's on first scanner next call
+ *
+ * @throws IOException
+ */
+ static Table createIOEScannerTable(byte[] name, final int failCnt)
+ throws IOException {
+ // build up a mock scanner stuff to fail the first time
+ Answer<ResultScanner> a = new Answer<ResultScanner>() {
+ int cnt = 0;
+
+ @Override
+ public ResultScanner answer(InvocationOnMock invocation) throws Throwable {
+ // first invocation return the busted mock scanner
+ if (cnt++ < failCnt) {
+ // create mock ResultScanner that always fails.
+ Scan scan = mock(Scan.class);
+ doReturn("bogus".getBytes()).when(scan).getStartRow(); // avoid npe
+ ResultScanner scanner = mock(ResultScanner.class);
+ // simulate TimeoutException / IOException
+ doThrow(new IOException("Injected exception")).when(scanner).next();
+ return scanner;
+ }
+
+ // otherwise return the real scanner.
+ return (ResultScanner) invocation.callRealMethod();
+ }
+ };
+
+ Table htable = spy(createTable(name));
+ doAnswer(a).when(htable).getScanner((Scan) anyObject());
+ return htable;
+ }
+
+ /**
+ * Create a table that throws a NotServingRegionException on first scanner
+ * next call
+ *
+ * @throws IOException
+ */
+ static Table createDNRIOEScannerTable(byte[] name, final int failCnt)
+ throws IOException {
+ // build up a mock scanner stuff to fail the first time
+ Answer<ResultScanner> a = new Answer<ResultScanner>() {
+ int cnt = 0;
+
+ @Override
+ public ResultScanner answer(InvocationOnMock invocation) throws Throwable {
+ // first invocation return the busted mock scanner
+ if (cnt++ < failCnt) {
+ // create mock ResultScanner that always fails.
+ Scan scan = mock(Scan.class);
+ doReturn("bogus".getBytes()).when(scan).getStartRow(); // avoid npe
+ ResultScanner scanner = mock(ResultScanner.class);
+
+ invocation.callRealMethod(); // simulate NotServingRegionException
+ doThrow(
+ new NotServingRegionException("Injected simulated TimeoutException"))
+ .when(scanner).next();
+ return scanner;
+ }
+
+ // otherwise return the real scanner.
+ return (ResultScanner) invocation.callRealMethod();
+ }
+ };
+
+ Table htable = spy(createTable(name));
+ doAnswer(a).when(htable).getScanner((Scan) anyObject());
+ return htable;
+ }
+
+ /**
+ * Run test assuming no errors using newer mapreduce api
+ *
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ @Test
+ public void testTableRecordReaderMapreduce() throws IOException,
+ InterruptedException {
+ Table table = createTable("table1-mr".getBytes());
+ runTestMapreduce(table);
+ }
+
+ /**
+ * Run test assuming Scanner IOException failure using newer mapreduce api
+ *
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ @Test
+ public void testTableRecordReaderScannerFailMapreduce() throws IOException,
+ InterruptedException {
+ Table htable = createIOEScannerTable("table2-mr".getBytes(), 1);
+ runTestMapreduce(htable);
+ }
+
+ /**
+ * Run test assuming Scanner IOException failure using newer mapreduce api
+ *
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ @Test(expected = IOException.class)
+ public void testTableRecordReaderScannerFailMapreduceTwice() throws IOException,
+ InterruptedException {
+ Table htable = createIOEScannerTable("table3-mr".getBytes(), 2);
+ runTestMapreduce(htable);
+ }
+
+ /**
+ * Run test assuming NotServingRegionException using newer mapreduce api
+ *
+ * @throws InterruptedException
+ * @throws org.apache.hadoop.hbase.DoNotRetryIOException
+ */
+ @Test
+ public void testTableRecordReaderScannerTimeoutMapreduce()
+ throws IOException, InterruptedException {
+ Table htable = createDNRIOEScannerTable("table4-mr".getBytes(), 1);
+ runTestMapreduce(htable);
+ }
+
+ /**
+ * Run test assuming NotServingRegionException using newer mapreduce api
+ *
+ * @throws InterruptedException
+ * @throws org.apache.hadoop.hbase.NotServingRegionException
+ */
+ @Test(expected = org.apache.hadoop.hbase.NotServingRegionException.class)
+ public void testTableRecordReaderScannerTimeoutMapreduceTwice()
+ throws IOException, InterruptedException {
+ Table htable = createDNRIOEScannerTable("table5-mr".getBytes(), 2);
+ runTestMapreduce(htable);
+ }
+
+ /**
+ * Verify the example we present in javadocs on TableInputFormatBase
+ */
+ @Test
+ public void testExtensionOfTableInputFormatBase()
+ throws IOException, InterruptedException, ClassNotFoundException {
+ LOG.info("testing use of an InputFormat taht extends InputFormatBase");
+ final Table htable = createTable(Bytes.toBytes("exampleTable"),
+ new byte[][] { Bytes.toBytes("columnA"), Bytes.toBytes("columnB") });
+ testInputFormat(ExampleTIF.class);
+ }
+
+ @Test
+ public void testJobConfigurableExtensionOfTableInputFormatBase()
+ throws IOException, InterruptedException, ClassNotFoundException {
+ LOG.info("testing use of an InputFormat taht extends InputFormatBase, " +
+ "using JobConfigurable.");
+ final Table htable = createTable(Bytes.toBytes("exampleJobConfigurableTable"),
+ new byte[][] { Bytes.toBytes("columnA"), Bytes.toBytes("columnB") });
+ testInputFormat(ExampleJobConfigurableTIF.class);
+ }
+
+ @Test
+ public void testDeprecatedExtensionOfTableInputFormatBase()
+ throws IOException, InterruptedException, ClassNotFoundException {
+ LOG.info("testing use of an InputFormat taht extends InputFormatBase, " +
+ "using the approach documented in 0.98.");
+ final Table htable = createTable(Bytes.toBytes("exampleDeprecatedTable"),
+ new byte[][] { Bytes.toBytes("columnA"), Bytes.toBytes("columnB") });
+ testInputFormat(ExampleDeprecatedTIF.class);
+ }
+
+ void testInputFormat(Class<? extends InputFormat> clazz)
+ throws IOException, InterruptedException, ClassNotFoundException {
+ final Job job = MapreduceTestingShim.createJob(UTIL.getConfiguration());
+ job.setInputFormatClass(clazz);
+ job.setOutputFormatClass(NullOutputFormat.class);
+ job.setMapperClass(ExampleVerifier.class);
+ job.setNumReduceTasks(0);
+
+ LOG.debug("submitting job.");
+ assertTrue("job failed!", job.waitForCompletion(true));
+ assertEquals("Saw the wrong number of instances of the filtered-for row.", 2, job.getCounters()
+ .findCounter(TestTableInputFormat.class.getName() + ":row", "aaa").getValue());
+ assertEquals("Saw any instances of the filtered out row.", 0, job.getCounters()
+ .findCounter(TestTableInputFormat.class.getName() + ":row", "bbb").getValue());
+ assertEquals("Saw the wrong number of instances of columnA.", 1, job.getCounters()
+ .findCounter(TestTableInputFormat.class.getName() + ":family", "columnA").getValue());
+ assertEquals("Saw the wrong number of instances of columnB.", 1, job.getCounters()
+ .findCounter(TestTableInputFormat.class.getName() + ":family", "columnB").getValue());
+ assertEquals("Saw the wrong count of values for the filtered-for row.", 2, job.getCounters()
+ .findCounter(TestTableInputFormat.class.getName() + ":value", "value aaa").getValue());
+ assertEquals("Saw the wrong count of values for the filtered-out row.", 0, job.getCounters()
+ .findCounter(TestTableInputFormat.class.getName() + ":value", "value bbb").getValue());
+ }
+
+ public static class ExampleVerifier extends TableMapper<NullWritable, NullWritable> {
+
+ @Override
+ public void map(ImmutableBytesWritable key, Result value, Context context)
+ throws IOException {
+ for (Cell cell : value.listCells()) {
+ context.getCounter(TestTableInputFormat.class.getName() + ":row",
+ Bytes.toString(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()))
+ .increment(1l);
+ context.getCounter(TestTableInputFormat.class.getName() + ":family",
+ Bytes.toString(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength()))
+ .increment(1l);
+ context.getCounter(TestTableInputFormat.class.getName() + ":value",
+ Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()))
+ .increment(1l);
+ }
+ }
+
+ }
+
+ public static class ExampleDeprecatedTIF extends TableInputFormatBase implements JobConfigurable {
+
+ @Override
+ public void configure(JobConf job) {
+ try {
+ Connection connection = ConnectionFactory.createConnection(job);
+ Table exampleTable = connection.getTable(TableName.valueOf(("exampleDeprecatedTable")));
+ // mandatory
+ initializeTable(connection, exampleTable.getName());
+ byte[][] inputColumns = new byte [][] { Bytes.toBytes("columnA"),
+ Bytes.toBytes("columnB") };
+ // optional
+ Scan scan = new Scan();
+ for (byte[] family : inputColumns) {
+ scan.addFamily(family);
+ }
+ Filter exampleFilter = new RowFilter(CompareOp.EQUAL, new RegexStringComparator("aa.*"));
+ scan.setFilter(exampleFilter);
+ setScan(scan);
+ } catch (IOException exception) {
+ throw new RuntimeException("Failed to configure for job.", exception);
+ }
+ }
+
+ }
+
+
+ public static class ExampleJobConfigurableTIF extends TableInputFormatBase
+ implements JobConfigurable {
+
+ @Override
+ public void configure(JobConf job) {
+ try {
+ Connection connection = ConnectionFactory.createConnection(HBaseConfiguration.create(job));
+ TableName tableName = TableName.valueOf("exampleJobConfigurableTable");
+ // mandatory
+ initializeTable(connection, tableName);
+ byte[][] inputColumns = new byte [][] { Bytes.toBytes("columnA"),
+ Bytes.toBytes("columnB") };
+ //optional
+ Scan scan = new Scan();
+ for (byte[] family : inputColumns) {
+ scan.addFamily(family);
+ }
+ Filter exampleFilter = new RowFilter(CompareOp.EQUAL, new RegexStringComparator("aa.*"));
+ scan.setFilter(exampleFilter);
+ setScan(scan);
+ } catch (IOException exception) {
+ throw new RuntimeException("Failed to initialize.", exception);
+ }
+ }
+ }
+
+
+ public static class ExampleTIF extends TableInputFormatBase {
+
+ @Override
+ protected void initialize(JobContext job) throws IOException {
+ Connection connection = ConnectionFactory.createConnection(HBaseConfiguration.create(
+ job.getConfiguration()));
+ TableName tableName = TableName.valueOf("exampleTable");
+ // mandatory
+ initializeTable(connection, tableName);
+ byte[][] inputColumns = new byte [][] { Bytes.toBytes("columnA"),
+ Bytes.toBytes("columnB") };
+ //optional
+ Scan scan = new Scan();
+ for (byte[] family : inputColumns) {
+ scan.addFamily(family);
+ }
+ Filter exampleFilter = new RowFilter(CompareOp.EQUAL, new RegexStringComparator("aa.*"));
+ scan.setFilter(exampleFilter);
+ setScan(scan);
+ }
+
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatBase.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatBase.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatBase.java
new file mode 100644
index 0000000..699e773
--- /dev/null
+++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatBase.java
@@ -0,0 +1,53 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import static org.junit.Assert.*;
+
+import java.net.Inet6Address;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({SmallTests.class})
+public class TestTableInputFormatBase {
+ @Test
+ public void testTableInputFormatBaseReverseDNSForIPv6()
+ throws UnknownHostException {
+ String address = "ipv6.google.com";
+ String localhost = null;
+ InetAddress addr = null;
+ TableInputFormat inputFormat = new TableInputFormat();
+ try {
+ localhost = InetAddress.getByName(address).getCanonicalHostName();
+ addr = Inet6Address.getByName(address);
+ } catch (UnknownHostException e) {
+ // google.com is down, we can probably forgive this test.
+ return;
+ }
+ System.out.println("Should retrun the hostname for this host " +
+ localhost + " addr : " + addr);
+ String actualHostName = inputFormat.reverseDNS(addr);
+ assertEquals("Should retrun the hostname for this host. Expected : " +
+ localhost + " Actual : " + actualHostName, localhost, actualHostName);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatScan1.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatScan1.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatScan1.java
new file mode 100644
index 0000000..99b40b9
--- /dev/null
+++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatScan1.java
@@ -0,0 +1,200 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.testclassification.VerySlowMapReduceTests;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+/**
+ * TestTableInputFormatScan part 1.
+ * @see TestTableInputFormatScanBase
+ */
+@Category({VerySlowMapReduceTests.class, LargeTests.class})
+public class TestTableInputFormatScan1 extends TestTableInputFormatScanBase {
+
+ /**
+ * Tests a MR scan using specific start and stop rows.
+ *
+ * @throws IOException
+ * @throws ClassNotFoundException
+ * @throws InterruptedException
+ */
+ @Test
+ public void testScanEmptyToEmpty()
+ throws IOException, InterruptedException, ClassNotFoundException {
+ testScan(null, null, null);
+ }
+
+ /**
+ * Tests a MR scan using specific start and stop rows.
+ *
+ * @throws IOException
+ * @throws ClassNotFoundException
+ * @throws InterruptedException
+ */
+ @Test
+ public void testScanEmptyToAPP()
+ throws IOException, InterruptedException, ClassNotFoundException {
+ testScan(null, "app", "apo");
+ }
+
+ /**
+ * Tests a MR scan using specific start and stop rows.
+ *
+ * @throws IOException
+ * @throws ClassNotFoundException
+ * @throws InterruptedException
+ */
+ @Test
+ public void testScanEmptyToBBA()
+ throws IOException, InterruptedException, ClassNotFoundException {
+ testScan(null, "bba", "baz");
+ }
+
+ /**
+ * Tests a MR scan using specific start and stop rows.
+ *
+ * @throws IOException
+ * @throws ClassNotFoundException
+ * @throws InterruptedException
+ */
+ @Test
+ public void testScanEmptyToBBB()
+ throws IOException, InterruptedException, ClassNotFoundException {
+ testScan(null, "bbb", "bba");
+ }
+
+ /**
+ * Tests a MR scan using specific start and stop rows.
+ *
+ * @throws IOException
+ * @throws ClassNotFoundException
+ * @throws InterruptedException
+ */
+ @Test
+ public void testScanEmptyToOPP()
+ throws IOException, InterruptedException, ClassNotFoundException {
+ testScan(null, "opp", "opo");
+ }
+
+ /**
+ * Tests a MR scan using specific number of mappers. The test table has 25 regions,
+ * and all region sizes are set as 0 as default. The average region size is 1 (the smallest
+ * positive). When we set hbase.mapreduce.input.ratio as -1, all regions will be cut into two
+ * MapRedcue input splits, the number of MR input splits should be 50; when we set hbase
+ * .mapreduce.input.ratio as 100, the sum of all region sizes is less then the average region
+ * size, all regions will be combined into 1 MapRedcue input split.
+ *
+ * @throws IOException
+ * @throws ClassNotFoundException
+ * @throws InterruptedException
+ */
+ @Test
+ public void testGetSplits() throws IOException, InterruptedException, ClassNotFoundException {
+ testNumOfSplits("-1", 52);
+ testNumOfSplits("100", 1);
+ }
+
+ /**
+ * Tests the getSplitKey() method in TableInputFormatBase.java
+ *
+ * @throws IOException
+ * @throws ClassNotFoundException
+ * @throws InterruptedException
+ */
+ @Test
+ public void testGetSplitsPoint() throws IOException, InterruptedException,
+ ClassNotFoundException {
+ byte[] start1 = { 'a', 'a', 'a', 'b', 'c', 'd', 'e', 'f' };
+ byte[] end1 = { 'a', 'a', 'a', 'f', 'f' };
+ byte[] splitPoint1 = { 'a', 'a', 'a', 'd', 'd', -78, 50, -77 };
+ testGetSplitKey(start1, end1, splitPoint1, true);
+
+ byte[] start2 = { '1', '1', '1', '0', '0', '0' };
+ byte[] end2 = { '1', '1', '2', '5', '7', '9', '0' };
+ byte[] splitPoint2 = { '1', '1', '1', -78, -77, -76, -104 };
+ testGetSplitKey(start2, end2, splitPoint2, true);
+
+ byte[] start3 = { 'a', 'a', 'a', 'a', 'a', 'a' };
+ byte[] end3 = { 'a', 'a', 'b' };
+ byte[] splitPoint3 = { 'a', 'a', 'a', -80, -80, -80 };
+ testGetSplitKey(start3, end3, splitPoint3, true);
+
+ byte[] start4 = { 'a', 'a', 'a' };
+ byte[] end4 = { 'a', 'a', 'a', 'z' };
+ byte[] splitPoint4 = { 'a', 'a', 'a', '=' };
+ testGetSplitKey(start4, end4, splitPoint4, true);
+
+ byte[] start5 = { 'a', 'a', 'a' };
+ byte[] end5 = { 'a', 'a', 'b', 'a' };
+ byte[] splitPoint5 = { 'a', 'a', 'a', -80 };
+ testGetSplitKey(start5, end5, splitPoint5, true);
+
+ // Test Case 6: empty key and "hhhqqqwww", split point is "h"
+ byte[] start6 = {};
+ byte[] end6 = { 'h', 'h', 'h', 'q', 'q', 'q', 'w', 'w' };
+ byte[] splitPointText6 = { 'h' };
+ byte[] splitPointBinary6 = { 104 };
+ testGetSplitKey(start6, end6, splitPointText6, true);
+ testGetSplitKey(start6, end6, splitPointBinary6, false);
+
+ // Test Case 7: "ffffaaa" and empty key, split point depends on the mode we choose(text key or
+ // binary key).
+ byte[] start7 = { 'f', 'f', 'f', 'f', 'a', 'a', 'a' };
+ byte[] end7 = {};
+ byte[] splitPointText7 = { 'f', '~', '~', '~', '~', '~', '~' };
+ byte[] splitPointBinary7 = { 'f', -1, -1, -1, -1, -1, -1 };
+ testGetSplitKey(start7, end7, splitPointText7, true);
+ testGetSplitKey(start7, end7, splitPointBinary7, false);
+
+ // Test Case 8: both start key and end key are empty. Split point depends on the mode we
+ // choose (text key or binary key).
+ byte[] start8 = {};
+ byte[] end8 = {};
+ byte[] splitPointText8 = { 'O' };
+ byte[] splitPointBinary8 = { 0 };
+ testGetSplitKey(start8, end8, splitPointText8, true);
+ testGetSplitKey(start8, end8, splitPointBinary8, false);
+
+ // Test Case 9: Binary Key example
+ byte[] start9 = { 13, -19, 126, 127 };
+ byte[] end9 = { 13, -19, 127, 0 };
+ byte[] splitPoint9 = { 13, -19, 126, -65 };
+ testGetSplitKey(start9, end9, splitPoint9, false);
+
+ // Test Case 10: Binary key split when the start key is an unsigned byte and the end byte is a
+ // signed byte
+ byte[] start10 = { 'x' };
+ byte[] end10 = { -128 };
+ byte[] splitPoint10 = { '|' };
+ testGetSplitKey(start10, end10, splitPoint10, false);
+
+ // Test Case 11: Binary key split when the start key is an signed byte and the end byte is a
+ // signed byte
+ byte[] start11 = { -100 };
+ byte[] end11 = { -90 };
+ byte[] splitPoint11 = { -95 };
+ testGetSplitKey(start11, end11, splitPoint11, false);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatScan2.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatScan2.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatScan2.java
new file mode 100644
index 0000000..02f893f
--- /dev/null
+++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatScan2.java
@@ -0,0 +1,118 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.testclassification.VerySlowMapReduceTests;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+/**
+ * TestTableInputFormatScan part 2.
+ * @see TestTableInputFormatScanBase
+ */
+@Category({VerySlowMapReduceTests.class, LargeTests.class})
+public class TestTableInputFormatScan2 extends TestTableInputFormatScanBase {
+
+ /**
+ * Tests a MR scan using specific start and stop rows.
+ *
+ * @throws IOException
+ * @throws ClassNotFoundException
+ * @throws InterruptedException
+ */
+ @Test
+ public void testScanOBBToOPP()
+ throws IOException, InterruptedException, ClassNotFoundException {
+ testScan("obb", "opp", "opo");
+ }
+
+ /**
+ * Tests a MR scan using specific start and stop rows.
+ *
+ * @throws IOException
+ * @throws ClassNotFoundException
+ * @throws InterruptedException
+ */
+ @Test
+ public void testScanOBBToQPP()
+ throws IOException, InterruptedException, ClassNotFoundException {
+ testScan("obb", "qpp", "qpo");
+ }
+
+ /**
+ * Tests a MR scan using specific start and stop rows.
+ *
+ * @throws IOException
+ * @throws ClassNotFoundException
+ * @throws InterruptedException
+ */
+ @Test
+ public void testScanOPPToEmpty()
+ throws IOException, InterruptedException, ClassNotFoundException {
+ testScan("opp", null, "zzz");
+ }
+
+ /**
+ * Tests a MR scan using specific start and stop rows.
+ *
+ * @throws IOException
+ * @throws ClassNotFoundException
+ * @throws InterruptedException
+ */
+ @Test
+ public void testScanYYXToEmpty()
+ throws IOException, InterruptedException, ClassNotFoundException {
+ testScan("yyx", null, "zzz");
+ }
+
+ /**
+ * Tests a MR scan using specific start and stop rows.
+ *
+ * @throws IOException
+ * @throws ClassNotFoundException
+ * @throws InterruptedException
+ */
+ @Test
+ public void testScanYYYToEmpty()
+ throws IOException, InterruptedException, ClassNotFoundException {
+ testScan("yyy", null, "zzz");
+ }
+
+ /**
+ * Tests a MR scan using specific start and stop rows.
+ *
+ * @throws IOException
+ * @throws ClassNotFoundException
+ * @throws InterruptedException
+ */
+ @Test
+ public void testScanYZYToEmpty()
+ throws IOException, InterruptedException, ClassNotFoundException {
+ testScan("yzy", null, "zzz");
+ }
+
+ @Test
+ public void testScanFromConfiguration()
+ throws IOException, InterruptedException, ClassNotFoundException {
+ testScanFromConfiguration("bba", "bbd", "bbc");
+ }
+}