You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by bu...@apache.org on 2017/08/27 05:33:09 UTC
[09/50] [abbrv] hbase git commit: HBASE-18640 Move mapreduce out of
hbase-server into separate module.
http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatScanBase.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatScanBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatScanBase.java
deleted file mode 100644
index 0f49333..0000000
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatScanBase.java
+++ /dev/null
@@ -1,287 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.mapreduce;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.Locale;
-import java.util.Map;
-import java.util.NavigableMap;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.client.Table;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.Reducer;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.junit.AfterClass;
-import org.junit.Assert;
-import org.junit.BeforeClass;
-
-
-/**
- * <p>
- * Tests various scan start and stop row scenarios. This is set in a scan and
- * tested in a MapReduce job to see if that is handed over and done properly
- * too.
- * </p>
- * <p>
- * This test is broken into two parts in order to side-step the test timeout
- * period of 900, as documented in HBASE-8326.
- * </p>
- */
-public abstract class TestTableInputFormatScanBase {
-
- private static final Log LOG = LogFactory.getLog(TestTableInputFormatScanBase.class);
- static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
-
- static final TableName TABLE_NAME = TableName.valueOf("scantest");
- static final byte[][] INPUT_FAMILYS = {Bytes.toBytes("content1"), Bytes.toBytes("content2")};
- static final String KEY_STARTROW = "startRow";
- static final String KEY_LASTROW = "stpRow";
-
- private static Table table = null;
-
- @BeforeClass
- public static void setUpBeforeClass() throws Exception {
- // test intermittently fails under hadoop2 (2.0.2-alpha) if shortcircuit-read (scr) is on.
- // this turns it off for this test. TODO: Figure out why scr breaks recovery.
- System.setProperty("hbase.tests.use.shortcircuit.reads", "false");
-
- // switch TIF to log at DEBUG level
- TEST_UTIL.enableDebug(TableInputFormat.class);
- TEST_UTIL.enableDebug(TableInputFormatBase.class);
- // start mini hbase cluster
- TEST_UTIL.startMiniCluster(3);
- // create and fill table
- table = TEST_UTIL.createMultiRegionTable(TABLE_NAME, INPUT_FAMILYS);
- TEST_UTIL.loadTable(table, INPUT_FAMILYS, null, false);
- }
-
- @AfterClass
- public static void tearDownAfterClass() throws Exception {
- TEST_UTIL.shutdownMiniCluster();
- }
-
- /**
- * Pass the key and value to reduce.
- */
- public static class ScanMapper
- extends TableMapper<ImmutableBytesWritable, ImmutableBytesWritable> {
-
- /**
- * Pass the key and value to reduce.
- *
- * @param key The key, here "aaa", "aab" etc.
- * @param value The value is the same as the key.
- * @param context The task context.
- * @throws IOException When reading the rows fails.
- */
- @Override
- public void map(ImmutableBytesWritable key, Result value,
- Context context)
- throws IOException, InterruptedException {
- if (value.size() != 2) {
- throw new IOException("There should be two input columns");
- }
- Map<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>>
- cfMap = value.getMap();
-
- if (!cfMap.containsKey(INPUT_FAMILYS[0]) || !cfMap.containsKey(INPUT_FAMILYS[1])) {
- throw new IOException("Wrong input columns. Missing: '" +
- Bytes.toString(INPUT_FAMILYS[0]) + "' or '" + Bytes.toString(INPUT_FAMILYS[1]) + "'.");
- }
-
- String val0 = Bytes.toStringBinary(value.getValue(INPUT_FAMILYS[0], null));
- String val1 = Bytes.toStringBinary(value.getValue(INPUT_FAMILYS[1], null));
- LOG.info("map: key -> " + Bytes.toStringBinary(key.get()) +
- ", value -> (" + val0 + ", " + val1 + ")");
- context.write(key, key);
- }
- }
-
- /**
- * Checks the last and first key seen against the scanner boundaries.
- */
- public static class ScanReducer
- extends Reducer<ImmutableBytesWritable, ImmutableBytesWritable,
- NullWritable, NullWritable> {
-
- private String first = null;
- private String last = null;
-
- protected void reduce(ImmutableBytesWritable key,
- Iterable<ImmutableBytesWritable> values, Context context)
- throws IOException ,InterruptedException {
- int count = 0;
- for (ImmutableBytesWritable value : values) {
- String val = Bytes.toStringBinary(value.get());
- LOG.info("reduce: key[" + count + "] -> " +
- Bytes.toStringBinary(key.get()) + ", value -> " + val);
- if (first == null) first = val;
- last = val;
- count++;
- }
- }
-
- protected void cleanup(Context context)
- throws IOException, InterruptedException {
- Configuration c = context.getConfiguration();
- String startRow = c.get(KEY_STARTROW);
- String lastRow = c.get(KEY_LASTROW);
- LOG.info("cleanup: first -> \"" + first + "\", start row -> \"" + startRow + "\"");
- LOG.info("cleanup: last -> \"" + last + "\", last row -> \"" + lastRow + "\"");
- if (startRow != null && startRow.length() > 0) {
- assertEquals(startRow, first);
- }
- if (lastRow != null && lastRow.length() > 0) {
- assertEquals(lastRow, last);
- }
- }
-
- }
-
- /**
- * Tests an MR Scan initialized from properties set in the Configuration.
- *
- * @throws IOException
- * @throws ClassNotFoundException
- * @throws InterruptedException
- */
- protected void testScanFromConfiguration(String start, String stop, String last)
- throws IOException, InterruptedException, ClassNotFoundException {
- String jobName = "ScanFromConfig" + (start != null ? start.toUpperCase(Locale.ROOT) : "Empty") +
- "To" + (stop != null ? stop.toUpperCase(Locale.ROOT) : "Empty");
- Configuration c = new Configuration(TEST_UTIL.getConfiguration());
- c.set(TableInputFormat.INPUT_TABLE, TABLE_NAME.getNameAsString());
- c.set(TableInputFormat.SCAN_COLUMN_FAMILY, Bytes.toString(INPUT_FAMILYS[0]) + ", "
- + Bytes.toString(INPUT_FAMILYS[1]));
- c.set(KEY_STARTROW, start != null ? start : "");
- c.set(KEY_LASTROW, last != null ? last : "");
-
- if (start != null) {
- c.set(TableInputFormat.SCAN_ROW_START, start);
- }
-
- if (stop != null) {
- c.set(TableInputFormat.SCAN_ROW_STOP, stop);
- }
-
- Job job = new Job(c, jobName);
- job.setMapperClass(ScanMapper.class);
- job.setReducerClass(ScanReducer.class);
- job.setMapOutputKeyClass(ImmutableBytesWritable.class);
- job.setMapOutputValueClass(ImmutableBytesWritable.class);
- job.setInputFormatClass(TableInputFormat.class);
- job.setNumReduceTasks(1);
- FileOutputFormat.setOutputPath(job, new Path(job.getJobName()));
- TableMapReduceUtil.addDependencyJars(job);
- assertTrue(job.waitForCompletion(true));
- }
-
- /**
- * Tests a MR scan using specific start and stop rows.
- *
- * @throws IOException
- * @throws ClassNotFoundException
- * @throws InterruptedException
- */
- protected void testScan(String start, String stop, String last)
- throws IOException, InterruptedException, ClassNotFoundException {
- String jobName = "Scan" + (start != null ? start.toUpperCase(Locale.ROOT) : "Empty") +
- "To" + (stop != null ? stop.toUpperCase(Locale.ROOT) : "Empty");
- LOG.info("Before map/reduce startup - job " + jobName);
- Configuration c = new Configuration(TEST_UTIL.getConfiguration());
- Scan scan = new Scan();
- scan.addFamily(INPUT_FAMILYS[0]);
- scan.addFamily(INPUT_FAMILYS[1]);
- if (start != null) {
- scan.setStartRow(Bytes.toBytes(start));
- }
- c.set(KEY_STARTROW, start != null ? start : "");
- if (stop != null) {
- scan.setStopRow(Bytes.toBytes(stop));
- }
- c.set(KEY_LASTROW, last != null ? last : "");
- LOG.info("scan before: " + scan);
- Job job = new Job(c, jobName);
- TableMapReduceUtil.initTableMapperJob(
- TABLE_NAME, scan, ScanMapper.class,
- ImmutableBytesWritable.class, ImmutableBytesWritable.class, job);
- job.setReducerClass(ScanReducer.class);
- job.setNumReduceTasks(1); // one to get final "first" and "last" key
- FileOutputFormat.setOutputPath(job, new Path(job.getJobName()));
- LOG.info("Started " + job.getJobName());
- assertTrue(job.waitForCompletion(true));
- LOG.info("After map/reduce completion - job " + jobName);
- }
-
-
- /**
- * Tests a MR scan using data skew auto-balance
- *
- * @throws IOException
- * @throws ClassNotFoundException
- * @throws InterruptedException
- */
- public void testNumOfSplits(String ratio, int expectedNumOfSplits) throws IOException,
- InterruptedException,
- ClassNotFoundException {
- String jobName = "TestJobForNumOfSplits";
- LOG.info("Before map/reduce startup - job " + jobName);
- Configuration c = new Configuration(TEST_UTIL.getConfiguration());
- Scan scan = new Scan();
- scan.addFamily(INPUT_FAMILYS[0]);
- scan.addFamily(INPUT_FAMILYS[1]);
- c.set("hbase.mapreduce.input.autobalance", "true");
- c.set("hbase.mapreduce.input.autobalance.maxskewratio", ratio);
- c.set(KEY_STARTROW, "");
- c.set(KEY_LASTROW, "");
- Job job = new Job(c, jobName);
- TableMapReduceUtil.initTableMapperJob(TABLE_NAME.getNameAsString(), scan, ScanMapper.class,
- ImmutableBytesWritable.class, ImmutableBytesWritable.class, job);
- TableInputFormat tif = new TableInputFormat();
- tif.setConf(job.getConfiguration());
- Assert.assertEquals(TABLE_NAME, table.getName());
- List<InputSplit> splits = tif.getSplits(job);
- Assert.assertEquals(expectedNumOfSplits, splits.size());
- }
-
- /**
- * Tests for the getSplitKey() method in TableInputFormatBase.java
- */
- public void testGetSplitKey(byte[] startKey, byte[] endKey, byte[] splitKey, boolean isText) {
- byte[] result = TableInputFormatBase.getSplitKey(startKey, endKey, isText);
- Assert.assertArrayEquals(splitKey, result);
- }
-}
-
http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduce.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduce.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduce.java
deleted file mode 100644
index d702e0d..0000000
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduce.java
+++ /dev/null
@@ -1,174 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.mapreduce;
-
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.Map;
-import java.util.NavigableMap;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.fs.FileUtil;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.TableNotEnabledException;
-import org.apache.hadoop.hbase.TableNotFoundException;
-import org.apache.hadoop.hbase.client.Admin;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.client.Table;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.testclassification.LargeTests;
-import org.apache.hadoop.hbase.testclassification.VerySlowMapReduceTests;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.mapreduce.Counter;
-import org.apache.hadoop.mapreduce.Counters;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-
-/**
- * Test Map/Reduce job over HBase tables. The map/reduce process we're testing
- * on our tables is simple - take every row in the table, reverse the value of
- * a particular cell, and write it back to the table.
- */
-
-@Category({VerySlowMapReduceTests.class, LargeTests.class})
-public class TestTableMapReduce extends TestTableMapReduceBase {
- private static final Log LOG = LogFactory.getLog(TestTableMapReduce.class);
-
- @Override
- protected Log getLog() { return LOG; }
-
- /**
- * Pass the given key and processed record reduce
- */
- static class ProcessContentsMapper extends TableMapper<ImmutableBytesWritable, Put> {
-
- /**
- * Pass the key, and reversed value to reduce
- *
- * @param key
- * @param value
- * @param context
- * @throws IOException
- */
- @Override
- public void map(ImmutableBytesWritable key, Result value,
- Context context)
- throws IOException, InterruptedException {
- if (value.size() != 1) {
- throw new IOException("There should only be one input column");
- }
- Map<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>>
- cf = value.getMap();
- if(!cf.containsKey(INPUT_FAMILY)) {
- throw new IOException("Wrong input columns. Missing: '" +
- Bytes.toString(INPUT_FAMILY) + "'.");
- }
-
- // Get the original value and reverse it
- String originalValue = Bytes.toString(value.getValue(INPUT_FAMILY, INPUT_FAMILY));
- StringBuilder newValue = new StringBuilder(originalValue);
- newValue.reverse();
- // Now set the value to be collected
- Put outval = new Put(key.get());
- outval.addColumn(OUTPUT_FAMILY, null, Bytes.toBytes(newValue.toString()));
- context.write(key, outval);
- }
- }
-
- @Override
- protected void runTestOnTable(Table table) throws IOException {
- Job job = null;
- try {
- LOG.info("Before map/reduce startup");
- job = new Job(table.getConfiguration(), "process column contents");
- job.setNumReduceTasks(1);
- Scan scan = new Scan();
- scan.addFamily(INPUT_FAMILY);
- TableMapReduceUtil.initTableMapperJob(
- table.getName().getNameAsString(), scan,
- ProcessContentsMapper.class, ImmutableBytesWritable.class,
- Put.class, job);
- TableMapReduceUtil.initTableReducerJob(
- table.getName().getNameAsString(),
- IdentityTableReducer.class, job);
- FileOutputFormat.setOutputPath(job, new Path("test"));
- LOG.info("Started " + table.getName().getNameAsString());
- assertTrue(job.waitForCompletion(true));
- LOG.info("After map/reduce completion");
-
- // verify map-reduce results
- verify(table.getName());
-
- verifyJobCountersAreEmitted(job);
- } catch (InterruptedException e) {
- throw new IOException(e);
- } catch (ClassNotFoundException e) {
- throw new IOException(e);
- } finally {
- table.close();
- if (job != null) {
- FileUtil.fullyDelete(
- new File(job.getConfiguration().get("hadoop.tmp.dir")));
- }
- }
- }
-
- /**
- * Verify scan counters are emitted from the job
- * @param job
- * @throws IOException
- */
- private void verifyJobCountersAreEmitted(Job job) throws IOException {
- Counters counters = job.getCounters();
- Counter counter
- = counters.findCounter(TableRecordReaderImpl.HBASE_COUNTER_GROUP_NAME, "RPC_CALLS");
- assertNotNull("Unable to find Job counter for HBase scan metrics, RPC_CALLS", counter);
- assertTrue("Counter value for RPC_CALLS should be larger than 0", counter.getValue() > 0);
- }
-
- @Test(expected = TableNotEnabledException.class)
- public void testWritingToDisabledTable() throws IOException {
-
- try (Admin admin = UTIL.getConnection().getAdmin();
- Table table = UTIL.getConnection().getTable(TABLE_FOR_NEGATIVE_TESTS)) {
- admin.disableTable(table.getName());
- runTestOnTable(table);
- fail("Should not have reached here, should have thrown an exception");
- }
- }
-
- @Test(expected = TableNotFoundException.class)
- public void testWritingToNonExistentTable() throws IOException {
-
- try (Table table = UTIL.getConnection().getTable(TableName.valueOf("table-does-not-exist"))) {
- runTestOnTable(table);
- fail("Should not have reached here, should have thrown an exception");
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduceBase.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduceBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduceBase.java
deleted file mode 100644
index 27bf063..0000000
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduceBase.java
+++ /dev/null
@@ -1,233 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.mapreduce;
-
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.io.IOException;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.NavigableMap;
-
-import org.apache.commons.logging.Log;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.CategoryBasedTimeout;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.CellUtil;
-import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.ResultScanner;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.client.Table;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TestRule;
-
-/**
- * A base class for a test Map/Reduce job over HBase tables. The map/reduce process we're testing
- * on our tables is simple - take every row in the table, reverse the value of a particular cell,
- * and write it back to the table. Implements common components between mapred and mapreduce
- * implementations.
- */
-public abstract class TestTableMapReduceBase {
- @Rule public final TestRule timeout = CategoryBasedTimeout.builder().
- withTimeout(this.getClass()).withLookingForStuckThread(true).build();
- protected static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
- protected static final TableName MULTI_REGION_TABLE_NAME = TableName.valueOf("mrtest");
- protected static final TableName TABLE_FOR_NEGATIVE_TESTS = TableName.valueOf("testfailuretable");
- protected static final byte[] INPUT_FAMILY = Bytes.toBytes("contents");
- protected static final byte[] OUTPUT_FAMILY = Bytes.toBytes("text");
-
- protected static final byte[][] columns = new byte[][] {
- INPUT_FAMILY,
- OUTPUT_FAMILY
- };
-
- /**
- * Retrieve my logger instance.
- */
- protected abstract Log getLog();
-
- /**
- * Handles API-specifics for setting up and executing the job.
- */
- protected abstract void runTestOnTable(Table table) throws IOException;
-
- @BeforeClass
- public static void beforeClass() throws Exception {
- UTIL.startMiniCluster();
- Table table =
- UTIL.createMultiRegionTable(MULTI_REGION_TABLE_NAME, new byte[][] { INPUT_FAMILY,
- OUTPUT_FAMILY });
- UTIL.loadTable(table, INPUT_FAMILY, false);
- UTIL.createTable(TABLE_FOR_NEGATIVE_TESTS, new byte[][] { INPUT_FAMILY, OUTPUT_FAMILY });
- }
-
- @AfterClass
- public static void afterClass() throws Exception {
- UTIL.deleteTable(TABLE_FOR_NEGATIVE_TESTS);
- UTIL.shutdownMiniCluster();
- }
-
- /**
- * Test a map/reduce against a multi-region table
- * @throws IOException
- */
- @Test
- public void testMultiRegionTable() throws IOException {
- runTestOnTable(UTIL.getConnection().getTable(MULTI_REGION_TABLE_NAME));
- }
-
- @Test
- public void testCombiner() throws IOException {
- Configuration conf = new Configuration(UTIL.getConfiguration());
- // force use of combiner for testing purposes
- conf.setInt("mapreduce.map.combine.minspills", 1);
- runTestOnTable(UTIL.getConnection().getTable(MULTI_REGION_TABLE_NAME));
- }
-
- /**
- * Implements mapper logic for use across APIs.
- */
- protected static Put map(ImmutableBytesWritable key, Result value) throws IOException {
- if (value.size() != 1) {
- throw new IOException("There should only be one input column");
- }
- Map<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>>
- cf = value.getMap();
- if(!cf.containsKey(INPUT_FAMILY)) {
- throw new IOException("Wrong input columns. Missing: '" +
- Bytes.toString(INPUT_FAMILY) + "'.");
- }
-
- // Get the original value and reverse it
-
- String originalValue = Bytes.toString(value.getValue(INPUT_FAMILY, INPUT_FAMILY));
- StringBuilder newValue = new StringBuilder(originalValue);
- newValue.reverse();
-
- // Now set the value to be collected
-
- Put outval = new Put(key.get());
- outval.addColumn(OUTPUT_FAMILY, null, Bytes.toBytes(newValue.toString()));
- return outval;
- }
-
- protected void verify(TableName tableName) throws IOException {
- Table table = UTIL.getConnection().getTable(tableName);
- boolean verified = false;
- long pause = UTIL.getConfiguration().getLong("hbase.client.pause", 5 * 1000);
- int numRetries = UTIL.getConfiguration().getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 5);
- for (int i = 0; i < numRetries; i++) {
- try {
- getLog().info("Verification attempt #" + i);
- verifyAttempt(table);
- verified = true;
- break;
- } catch (NullPointerException e) {
- // If here, a cell was empty. Presume its because updates came in
- // after the scanner had been opened. Wait a while and retry.
- getLog().debug("Verification attempt failed: " + e.getMessage());
- }
- try {
- Thread.sleep(pause);
- } catch (InterruptedException e) {
- // continue
- }
- }
- assertTrue(verified);
- }
-
- /**
- * Looks at every value of the mapreduce output and verifies that indeed
- * the values have been reversed.
- * @param table Table to scan.
- * @throws IOException
- * @throws NullPointerException if we failed to find a cell value
- */
- private void verifyAttempt(final Table table) throws IOException, NullPointerException {
- Scan scan = new Scan();
- TableInputFormat.addColumns(scan, columns);
- ResultScanner scanner = table.getScanner(scan);
- try {
- Iterator<Result> itr = scanner.iterator();
- assertTrue(itr.hasNext());
- while(itr.hasNext()) {
- Result r = itr.next();
- if (getLog().isDebugEnabled()) {
- if (r.size() > 2 ) {
- throw new IOException("Too many results, expected 2 got " +
- r.size());
- }
- }
- byte[] firstValue = null;
- byte[] secondValue = null;
- int count = 0;
- for(Cell kv : r.listCells()) {
- if (count == 0) {
- firstValue = CellUtil.cloneValue(kv);
- }
- if (count == 1) {
- secondValue = CellUtil.cloneValue(kv);
- }
- count++;
- if (count == 2) {
- break;
- }
- }
-
-
- if (firstValue == null) {
- throw new NullPointerException(Bytes.toString(r.getRow()) +
- ": first value is null");
- }
- String first = Bytes.toString(firstValue);
-
- if (secondValue == null) {
- throw new NullPointerException(Bytes.toString(r.getRow()) +
- ": second value is null");
- }
- byte[] secondReversed = new byte[secondValue.length];
- for (int i = 0, j = secondValue.length - 1; j >= 0; j--, i++) {
- secondReversed[i] = secondValue[j];
- }
- String second = Bytes.toString(secondReversed);
-
- if (first.compareTo(second) != 0) {
- if (getLog().isDebugEnabled()) {
- getLog().debug("second key is not the reverse of first. row=" +
- Bytes.toStringBinary(r.getRow()) + ", first value=" + first +
- ", second value=" + second);
- }
- fail();
- }
- }
- } finally {
- scanner.close();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduceUtil.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduceUtil.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduceUtil.java
deleted file mode 100644
index 303a144..0000000
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduceUtil.java
+++ /dev/null
@@ -1,99 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with the License. You may
- * obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the
- * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.mapreduce;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.testclassification.MapReduceTests;
-import org.apache.hadoop.hbase.testclassification.SmallTests;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Job;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-
-/**
- * Test different variants of initTableMapperJob method
- */
-@Category({MapReduceTests.class, SmallTests.class})
-public class TestTableMapReduceUtil {
-
- /*
- * initTableSnapshotMapperJob is tested in {@link TestTableSnapshotInputFormat} because
- * the method depends on an online cluster.
- */
-
- @Test
- public void testInitTableMapperJob1() throws Exception {
- Configuration configuration = new Configuration();
- Job job = new Job(configuration, "tableName");
- // test
- TableMapReduceUtil.initTableMapperJob("Table", new Scan(), Import.Importer.class, Text.class,
- Text.class, job, false, WALInputFormat.class);
- assertEquals(WALInputFormat.class, job.getInputFormatClass());
- assertEquals(Import.Importer.class, job.getMapperClass());
- assertEquals(LongWritable.class, job.getOutputKeyClass());
- assertEquals(Text.class, job.getOutputValueClass());
- assertNull(job.getCombinerClass());
- assertEquals("Table", job.getConfiguration().get(TableInputFormat.INPUT_TABLE));
- }
-
- @Test
- public void testInitTableMapperJob2() throws Exception {
- Configuration configuration = new Configuration();
- Job job = new Job(configuration, "tableName");
- TableMapReduceUtil.initTableMapperJob(Bytes.toBytes("Table"), new Scan(),
- Import.Importer.class, Text.class, Text.class, job, false, WALInputFormat.class);
- assertEquals(WALInputFormat.class, job.getInputFormatClass());
- assertEquals(Import.Importer.class, job.getMapperClass());
- assertEquals(LongWritable.class, job.getOutputKeyClass());
- assertEquals(Text.class, job.getOutputValueClass());
- assertNull(job.getCombinerClass());
- assertEquals("Table", job.getConfiguration().get(TableInputFormat.INPUT_TABLE));
- }
-
- @Test
- public void testInitTableMapperJob3() throws Exception {
- Configuration configuration = new Configuration();
- Job job = new Job(configuration, "tableName");
- TableMapReduceUtil.initTableMapperJob(Bytes.toBytes("Table"), new Scan(),
- Import.Importer.class, Text.class, Text.class, job);
- assertEquals(TableInputFormat.class, job.getInputFormatClass());
- assertEquals(Import.Importer.class, job.getMapperClass());
- assertEquals(LongWritable.class, job.getOutputKeyClass());
- assertEquals(Text.class, job.getOutputValueClass());
- assertNull(job.getCombinerClass());
- assertEquals("Table", job.getConfiguration().get(TableInputFormat.INPUT_TABLE));
- }
-
- @Test
- public void testInitTableMapperJob4() throws Exception {
- Configuration configuration = new Configuration();
- Job job = new Job(configuration, "tableName");
- TableMapReduceUtil.initTableMapperJob(Bytes.toBytes("Table"), new Scan(),
- Import.Importer.class, Text.class, Text.class, job, false);
- assertEquals(TableInputFormat.class, job.getInputFormatClass());
- assertEquals(Import.Importer.class, job.getMapperClass());
- assertEquals(LongWritable.class, job.getOutputKeyClass());
- assertEquals(Text.class, job.getOutputValueClass());
- assertNull(job.getCombinerClass());
- assertEquals("Table", job.getConfiguration().get(TableInputFormat.INPUT_TABLE));
- }
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableSnapshotInputFormat.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableSnapshotInputFormat.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableSnapshotInputFormat.java
deleted file mode 100644
index 5e63082..0000000
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableSnapshotInputFormat.java
+++ /dev/null
@@ -1,384 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.mapreduce;
-
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-import java.io.IOException;
-import java.util.List;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.CategoryBasedTimeout;
-import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HDFSBlocksDistribution;
-import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat.TableSnapshotRegionSplit;
-import org.apache.hadoop.hbase.testclassification.LargeTests;
-import org.apache.hadoop.hbase.testclassification.VerySlowMapReduceTests;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.Reducer;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-import org.junit.rules.TestName;
-import org.junit.rules.TestRule;
-
-import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists;
-
-import java.util.Arrays;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.hbase.client.Admin;
-import org.apache.hadoop.hbase.client.Table;
-import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils;
-import org.apache.hadoop.hbase.util.FSUtils;
-
-@Category({VerySlowMapReduceTests.class, LargeTests.class})
-public class TestTableSnapshotInputFormat extends TableSnapshotInputFormatTestBase {
- private static final Log LOG = LogFactory.getLog(TestTableSnapshotInputFormat.class);
- @Rule public final TestRule timeout = CategoryBasedTimeout.builder().
- withTimeout(this.getClass()).withLookingForStuckThread(true).build();
-
- private static final byte[] bbb = Bytes.toBytes("bbb");
- private static final byte[] yyy = Bytes.toBytes("yyy");
-
- @Rule
- public TestName name = new TestName();
-
- @Override
- protected byte[] getStartRow() {
- return bbb;
- }
-
- @Override
- protected byte[] getEndRow() {
- return yyy;
- }
-
- @After
- public void tearDown() throws Exception {
- }
-
- @Test
- public void testGetBestLocations() throws IOException {
- TableSnapshotInputFormatImpl tsif = new TableSnapshotInputFormatImpl();
- Configuration conf = UTIL.getConfiguration();
-
- HDFSBlocksDistribution blockDistribution = new HDFSBlocksDistribution();
- Assert.assertEquals(Lists.newArrayList(),
- TableSnapshotInputFormatImpl.getBestLocations(conf, blockDistribution));
-
- blockDistribution.addHostsAndBlockWeight(new String[] {"h1"}, 1);
- Assert.assertEquals(Lists.newArrayList("h1"),
- TableSnapshotInputFormatImpl.getBestLocations(conf, blockDistribution));
-
- blockDistribution.addHostsAndBlockWeight(new String[] {"h1"}, 1);
- Assert.assertEquals(Lists.newArrayList("h1"),
- TableSnapshotInputFormatImpl.getBestLocations(conf, blockDistribution));
-
- blockDistribution.addHostsAndBlockWeight(new String[] {"h2"}, 1);
- Assert.assertEquals(Lists.newArrayList("h1"),
- TableSnapshotInputFormatImpl.getBestLocations(conf, blockDistribution));
-
- blockDistribution = new HDFSBlocksDistribution();
- blockDistribution.addHostsAndBlockWeight(new String[] {"h1"}, 10);
- blockDistribution.addHostsAndBlockWeight(new String[] {"h2"}, 7);
- blockDistribution.addHostsAndBlockWeight(new String[] {"h3"}, 5);
- blockDistribution.addHostsAndBlockWeight(new String[] {"h4"}, 1);
- Assert.assertEquals(Lists.newArrayList("h1"),
- TableSnapshotInputFormatImpl.getBestLocations(conf, blockDistribution));
-
- blockDistribution.addHostsAndBlockWeight(new String[] {"h2"}, 2);
- Assert.assertEquals(Lists.newArrayList("h1", "h2"),
- TableSnapshotInputFormatImpl.getBestLocations(conf, blockDistribution));
-
- blockDistribution.addHostsAndBlockWeight(new String[] {"h2"}, 3);
- Assert.assertEquals(Lists.newArrayList("h2", "h1"),
- TableSnapshotInputFormatImpl.getBestLocations(conf, blockDistribution));
-
- blockDistribution.addHostsAndBlockWeight(new String[] {"h3"}, 6);
- blockDistribution.addHostsAndBlockWeight(new String[] {"h4"}, 9);
-
- Assert.assertEquals(Lists.newArrayList("h2", "h3", "h4", "h1"),
- TableSnapshotInputFormatImpl.getBestLocations(conf, blockDistribution));
- }
-
- public static enum TestTableSnapshotCounters {
- VALIDATION_ERROR
- }
-
- public static class TestTableSnapshotMapper
- extends TableMapper<ImmutableBytesWritable, NullWritable> {
- @Override
- protected void map(ImmutableBytesWritable key, Result value,
- Context context) throws IOException, InterruptedException {
- // Validate a single row coming from the snapshot, and emit the row key
- verifyRowFromMap(key, value);
- context.write(key, NullWritable.get());
- }
- }
-
- public static class TestTableSnapshotReducer
- extends Reducer<ImmutableBytesWritable, NullWritable, NullWritable, NullWritable> {
- HBaseTestingUtility.SeenRowTracker rowTracker =
- new HBaseTestingUtility.SeenRowTracker(bbb, yyy);
- @Override
- protected void reduce(ImmutableBytesWritable key, Iterable<NullWritable> values,
- Context context) throws IOException, InterruptedException {
- rowTracker.addRow(key.get());
- }
-
- @Override
- protected void cleanup(Context context) throws IOException,
- InterruptedException {
- rowTracker.validate();
- }
- }
-
- @Test
- public void testInitTableSnapshotMapperJobConfig() throws Exception {
- setupCluster();
- final TableName tableName = TableName.valueOf(name.getMethodName());
- String snapshotName = "foo";
-
- try {
- createTableAndSnapshot(UTIL, tableName, snapshotName, getStartRow(), getEndRow(), 1);
- Job job = new Job(UTIL.getConfiguration());
- Path tmpTableDir = UTIL.getDataTestDirOnTestFS(snapshotName);
-
- TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName,
- new Scan(), TestTableSnapshotMapper.class, ImmutableBytesWritable.class,
- NullWritable.class, job, false, tmpTableDir);
-
- // TODO: would be better to examine directly the cache instance that results from this
- // config. Currently this is not possible because BlockCache initialization is static.
- Assert.assertEquals(
- "Snapshot job should be configured for default LruBlockCache.",
- HConstants.HFILE_BLOCK_CACHE_SIZE_DEFAULT,
- job.getConfiguration().getFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, -1), 0.01);
- Assert.assertEquals(
- "Snapshot job should not use BucketCache.",
- 0, job.getConfiguration().getFloat("hbase.bucketcache.size", -1), 0.01);
- } finally {
- UTIL.getAdmin().deleteSnapshot(snapshotName);
- UTIL.deleteTable(tableName);
- tearDownCluster();
- }
- }
-
- @Override
- public void testRestoreSnapshotDoesNotCreateBackRefLinksInit(TableName tableName,
- String snapshotName, Path tmpTableDir) throws Exception {
- Job job = new Job(UTIL.getConfiguration());
- TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName,
- new Scan(), TestTableSnapshotMapper.class, ImmutableBytesWritable.class,
- NullWritable.class, job, false, tmpTableDir);
- }
-
- @Override
- public void testWithMockedMapReduce(HBaseTestingUtility util, String snapshotName,
- int numRegions, int expectedNumSplits) throws Exception {
- setupCluster();
- final TableName tableName = TableName.valueOf(name.getMethodName());
- try {
- createTableAndSnapshot(
- util, tableName, snapshotName, getStartRow(), getEndRow(), numRegions);
-
- Job job = new Job(util.getConfiguration());
- Path tmpTableDir = util.getDataTestDirOnTestFS(snapshotName);
- Scan scan = new Scan(getStartRow(), getEndRow()); // limit the scan
-
- TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName,
- scan, TestTableSnapshotMapper.class, ImmutableBytesWritable.class,
- NullWritable.class, job, false, tmpTableDir);
-
- verifyWithMockedMapReduce(job, numRegions, expectedNumSplits, getStartRow(), getEndRow());
-
- } finally {
- util.getAdmin().deleteSnapshot(snapshotName);
- util.deleteTable(tableName);
- tearDownCluster();
- }
- }
-
- public static void blockUntilSplitFinished(HBaseTestingUtility util, TableName tableName,
- int expectedRegionSize) throws Exception {
- for (int i = 0; i < 100; i++) {
- List<HRegionInfo> hRegionInfoList = util.getAdmin().getTableRegions(tableName);
- if (hRegionInfoList.size() >= expectedRegionSize) {
- break;
- }
- Thread.sleep(1000);
- }
- }
-
- @Test
- public void testNoDuplicateResultsWhenSplitting() throws Exception {
- setupCluster();
- TableName tableName = TableName.valueOf("testNoDuplicateResultsWhenSplitting");
- String snapshotName = "testSnapshotBug";
- try {
- if (UTIL.getAdmin().tableExists(tableName)) {
- UTIL.deleteTable(tableName);
- }
-
- UTIL.createTable(tableName, FAMILIES);
- Admin admin = UTIL.getAdmin();
-
- // put some stuff in the table
- Table table = UTIL.getConnection().getTable(tableName);
- UTIL.loadTable(table, FAMILIES);
-
- // split to 2 regions
- admin.split(tableName, Bytes.toBytes("eee"));
- blockUntilSplitFinished(UTIL, tableName, 2);
-
- Path rootDir = FSUtils.getRootDir(UTIL.getConfiguration());
- FileSystem fs = rootDir.getFileSystem(UTIL.getConfiguration());
-
- SnapshotTestingUtils.createSnapshotAndValidate(admin, tableName, Arrays.asList(FAMILIES),
- null, snapshotName, rootDir, fs, true);
-
- // load different values
- byte[] value = Bytes.toBytes("after_snapshot_value");
- UTIL.loadTable(table, FAMILIES, value);
-
- // cause flush to create new files in the region
- admin.flush(tableName);
- table.close();
-
- Job job = new Job(UTIL.getConfiguration());
- Path tmpTableDir = UTIL.getDataTestDirOnTestFS(snapshotName);
- // limit the scan
- Scan scan = new Scan().withStartRow(getStartRow()).withStopRow(getEndRow());
-
- TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName, scan,
- TestTableSnapshotMapper.class, ImmutableBytesWritable.class, NullWritable.class, job, false,
- tmpTableDir);
-
- verifyWithMockedMapReduce(job, 2, 2, getStartRow(), getEndRow());
- } finally {
- UTIL.getAdmin().deleteSnapshot(snapshotName);
- UTIL.deleteTable(tableName);
- tearDownCluster();
- }
- }
-
- private void verifyWithMockedMapReduce(Job job, int numRegions, int expectedNumSplits,
- byte[] startRow, byte[] stopRow)
- throws IOException, InterruptedException {
- TableSnapshotInputFormat tsif = new TableSnapshotInputFormat();
- List<InputSplit> splits = tsif.getSplits(job);
-
- Assert.assertEquals(expectedNumSplits, splits.size());
-
- HBaseTestingUtility.SeenRowTracker rowTracker =
- new HBaseTestingUtility.SeenRowTracker(startRow, stopRow);
-
- for (int i = 0; i < splits.size(); i++) {
- // validate input split
- InputSplit split = splits.get(i);
- Assert.assertTrue(split instanceof TableSnapshotRegionSplit);
-
- // validate record reader
- TaskAttemptContext taskAttemptContext = mock(TaskAttemptContext.class);
- when(taskAttemptContext.getConfiguration()).thenReturn(job.getConfiguration());
- RecordReader<ImmutableBytesWritable, Result> rr =
- tsif.createRecordReader(split, taskAttemptContext);
- rr.initialize(split, taskAttemptContext);
-
- // validate we can read all the data back
- while (rr.nextKeyValue()) {
- byte[] row = rr.getCurrentKey().get();
- verifyRowFromMap(rr.getCurrentKey(), rr.getCurrentValue());
- rowTracker.addRow(row);
- }
-
- rr.close();
- }
-
- // validate all rows are seen
- rowTracker.validate();
- }
-
- @Override
- protected void testWithMapReduceImpl(HBaseTestingUtility util, TableName tableName,
- String snapshotName, Path tableDir, int numRegions, int expectedNumSplits,
- boolean shutdownCluster) throws Exception {
- doTestWithMapReduce(util, tableName, snapshotName, getStartRow(), getEndRow(), tableDir,
- numRegions, expectedNumSplits, shutdownCluster);
- }
-
- // this is also called by the IntegrationTestTableSnapshotInputFormat
- public static void doTestWithMapReduce(HBaseTestingUtility util, TableName tableName,
- String snapshotName, byte[] startRow, byte[] endRow, Path tableDir, int numRegions,
- int expectedNumSplits, boolean shutdownCluster) throws Exception {
-
- LOG.info("testing with MapReduce");
-
- LOG.info("create the table and snapshot");
- createTableAndSnapshot(util, tableName, snapshotName, startRow, endRow, numRegions);
-
- if (shutdownCluster) {
- LOG.info("shutting down hbase cluster.");
- util.shutdownMiniHBaseCluster();
- }
-
- try {
- // create the job
- Job job = new Job(util.getConfiguration());
- Scan scan = new Scan(startRow, endRow); // limit the scan
-
- job.setJarByClass(util.getClass());
- TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(),
- TestTableSnapshotInputFormat.class);
-
- TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName,
- scan, TestTableSnapshotMapper.class, ImmutableBytesWritable.class,
- NullWritable.class, job, true, tableDir);
-
- job.setReducerClass(TestTableSnapshotInputFormat.TestTableSnapshotReducer.class);
- job.setNumReduceTasks(1);
- job.setOutputFormatClass(NullOutputFormat.class);
-
- Assert.assertTrue(job.waitForCompletion(true));
- } finally {
- if (!shutdownCluster) {
- util.getAdmin().deleteSnapshot(snapshotName);
- util.deleteTable(tableName);
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableSplit.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableSplit.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableSplit.java
deleted file mode 100644
index 4382c9c..0000000
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableSplit.java
+++ /dev/null
@@ -1,129 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.mapreduce;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.testclassification.MapReduceTests;
-import org.apache.hadoop.hbase.testclassification.SmallTests;
-import org.apache.hadoop.util.ReflectionUtils;
-import org.junit.Assert;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-import org.junit.rules.TestName;
-
-import java.util.HashSet;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-@Category({MapReduceTests.class, SmallTests.class})
-public class TestTableSplit {
- @Rule
- public TestName name = new TestName();
-
- @Test
- public void testHashCode() {
- TableSplit split1 = new TableSplit(TableName.valueOf(name.getMethodName()),
- "row-start".getBytes(),
- "row-end".getBytes(), "location");
- TableSplit split2 = new TableSplit(TableName.valueOf(name.getMethodName()),
- "row-start".getBytes(),
- "row-end".getBytes(), "location");
- assertEquals (split1, split2);
- assertTrue (split1.hashCode() == split2.hashCode());
- HashSet<TableSplit> set = new HashSet<>(2);
- set.add(split1);
- set.add(split2);
- assertTrue(set.size() == 1);
- }
-
- /**
- * length of region should not influence hashcode
- * */
- @Test
- public void testHashCode_length() {
- TableSplit split1 = new TableSplit(TableName.valueOf(name.getMethodName()),
- "row-start".getBytes(),
- "row-end".getBytes(), "location", 1984);
- TableSplit split2 = new TableSplit(TableName.valueOf(name.getMethodName()),
- "row-start".getBytes(),
- "row-end".getBytes(), "location", 1982);
-
- assertEquals (split1, split2);
- assertTrue (split1.hashCode() == split2.hashCode());
- HashSet<TableSplit> set = new HashSet<>(2);
- set.add(split1);
- set.add(split2);
- assertTrue(set.size() == 1);
- }
-
- /**
- * Length of region need to be properly serialized.
- * */
- @Test
- public void testLengthIsSerialized() throws Exception {
- TableSplit split1 = new TableSplit(TableName.valueOf(name.getMethodName()),
- "row-start".getBytes(),
- "row-end".getBytes(), "location", 666);
-
- TableSplit deserialized = new TableSplit(TableName.valueOf(name.getMethodName()),
- "row-start2".getBytes(),
- "row-end2".getBytes(), "location1");
- ReflectionUtils.copy(new Configuration(), split1, deserialized);
-
- Assert.assertEquals(666, deserialized.getLength());
- }
-
- @Test
- public void testToString() {
- TableSplit split =
- new TableSplit(TableName.valueOf(name.getMethodName()), "row-start".getBytes(), "row-end".getBytes(),
- "location");
- String str =
- "HBase table split(table name: " + name.getMethodName() + ", scan: , start row: row-start, "
- + "end row: row-end, region location: location, "
- + "encoded region name: )";
- Assert.assertEquals(str, split.toString());
-
- split =
- new TableSplit(TableName.valueOf(name.getMethodName()), null, "row-start".getBytes(),
- "row-end".getBytes(), "location", "encoded-region-name", 1000L);
- str =
- "HBase table split(table name: " + name.getMethodName() + ", scan: , start row: row-start, "
- + "end row: row-end, region location: location, "
- + "encoded region name: encoded-region-name)";
- Assert.assertEquals(str, split.toString());
-
- split = new TableSplit((TableName) null, null, null, null);
- str =
- "HBase table split(table name: null, scan: , start row: null, "
- + "end row: null, region location: null, "
- + "encoded region name: )";
- Assert.assertEquals(str, split.toString());
-
- split = new TableSplit((TableName) null, null, null, null, null, null, 1000L);
- str =
- "HBase table split(table name: null, scan: , start row: null, "
- + "end row: null, region location: null, "
- + "encoded region name: null)";
- Assert.assertEquals(str, split.toString());
- }
-}
-
http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTimeRangeMapRed.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTimeRangeMapRed.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTimeRangeMapRed.java
deleted file mode 100644
index 6796c94..0000000
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTimeRangeMapRed.java
+++ /dev/null
@@ -1,211 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.mapreduce;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configurable;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileUtil;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.CellUtil;
-import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.HColumnDescriptor;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.Admin;
-import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.client.ConnectionFactory;
-import org.apache.hadoop.hbase.client.Durability;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.ResultScanner;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.client.Table;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.testclassification.LargeTests;
-import org.apache.hadoop.hbase.testclassification.MapReduceTests;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.io.MapWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.NavigableMap;
-import java.util.TreeMap;
-
-@Category({MapReduceTests.class, LargeTests.class})
-public class TestTimeRangeMapRed {
- private final static Log log = LogFactory.getLog(TestTimeRangeMapRed.class);
- private static final HBaseTestingUtility UTIL =
- new HBaseTestingUtility();
- private Admin admin;
-
- private static final byte [] KEY = Bytes.toBytes("row1");
- private static final NavigableMap<Long, Boolean> TIMESTAMP = new TreeMap<>();
- static {
- TIMESTAMP.put((long)1245620000, false);
- TIMESTAMP.put((long)1245620005, true); // include
- TIMESTAMP.put((long)1245620010, true); // include
- TIMESTAMP.put((long)1245620055, true); // include
- TIMESTAMP.put((long)1245620100, true); // include
- TIMESTAMP.put((long)1245620150, false);
- TIMESTAMP.put((long)1245620250, false);
- }
- static final long MINSTAMP = 1245620005;
- static final long MAXSTAMP = 1245620100 + 1; // maxStamp itself is excluded. so increment it.
-
- static final TableName TABLE_NAME = TableName.valueOf("table123");
- static final byte[] FAMILY_NAME = Bytes.toBytes("text");
- static final byte[] COLUMN_NAME = Bytes.toBytes("input");
-
- @BeforeClass
- public static void beforeClass() throws Exception {
- UTIL.startMiniCluster();
- }
-
- @AfterClass
- public static void afterClass() throws Exception {
- UTIL.shutdownMiniCluster();
- }
-
- @Before
- public void before() throws Exception {
- this.admin = UTIL.getAdmin();
- }
-
- private static class ProcessTimeRangeMapper
- extends TableMapper<ImmutableBytesWritable, MapWritable>
- implements Configurable {
-
- private Configuration conf = null;
- private Table table = null;
-
- @Override
- public void map(ImmutableBytesWritable key, Result result,
- Context context)
- throws IOException {
- List<Long> tsList = new ArrayList<>();
- for (Cell kv : result.listCells()) {
- tsList.add(kv.getTimestamp());
- }
-
- List<Put> puts = new ArrayList<>();
- for (Long ts : tsList) {
- Put put = new Put(key.get());
- put.setDurability(Durability.SKIP_WAL);
- put.addColumn(FAMILY_NAME, COLUMN_NAME, ts, Bytes.toBytes(true));
- puts.add(put);
- }
- table.put(puts);
- }
-
- @Override
- public Configuration getConf() {
- return conf;
- }
-
- @Override
- public void setConf(Configuration configuration) {
- this.conf = configuration;
- try {
- Connection connection = ConnectionFactory.createConnection(conf);
- table = connection.getTable(TABLE_NAME);
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- }
-
- @Test
- public void testTimeRangeMapRed()
- throws IOException, InterruptedException, ClassNotFoundException {
- final HTableDescriptor desc = new HTableDescriptor(TABLE_NAME);
- final HColumnDescriptor col = new HColumnDescriptor(FAMILY_NAME);
- col.setMaxVersions(Integer.MAX_VALUE);
- desc.addFamily(col);
- admin.createTable(desc);
- List<Put> puts = new ArrayList<>();
- for (Map.Entry<Long, Boolean> entry : TIMESTAMP.entrySet()) {
- Put put = new Put(KEY);
- put.setDurability(Durability.SKIP_WAL);
- put.addColumn(FAMILY_NAME, COLUMN_NAME, entry.getKey(), Bytes.toBytes(false));
- puts.add(put);
- }
- Table table = UTIL.getConnection().getTable(desc.getTableName());
- table.put(puts);
- runTestOnTable();
- verify(table);
- table.close();
- }
-
- private void runTestOnTable()
- throws IOException, InterruptedException, ClassNotFoundException {
- Job job = null;
- try {
- job = new Job(UTIL.getConfiguration(), "test123");
- job.setOutputFormatClass(NullOutputFormat.class);
- job.setNumReduceTasks(0);
- Scan scan = new Scan();
- scan.addColumn(FAMILY_NAME, COLUMN_NAME);
- scan.setTimeRange(MINSTAMP, MAXSTAMP);
- scan.setMaxVersions();
- TableMapReduceUtil.initTableMapperJob(TABLE_NAME,
- scan, ProcessTimeRangeMapper.class, Text.class, Text.class, job);
- job.waitForCompletion(true);
- } catch (IOException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- } finally {
- if (job != null) {
- FileUtil.fullyDelete(
- new File(job.getConfiguration().get("hadoop.tmp.dir")));
- }
- }
- }
-
- private void verify(final Table table) throws IOException {
- Scan scan = new Scan();
- scan.addColumn(FAMILY_NAME, COLUMN_NAME);
- scan.setMaxVersions(1);
- ResultScanner scanner = table.getScanner(scan);
- for (Result r: scanner) {
- for (Cell kv : r.listCells()) {
- log.debug(Bytes.toString(r.getRow()) + "\t" + Bytes.toString(CellUtil.cloneFamily(kv))
- + "\t" + Bytes.toString(CellUtil.cloneQualifier(kv))
- + "\t" + kv.getTimestamp() + "\t" + Bytes.toBoolean(CellUtil.cloneValue(kv)));
- org.junit.Assert.assertEquals(TIMESTAMP.get(kv.getTimestamp()),
- Bytes.toBoolean(CellUtil.cloneValue(kv)));
- }
- }
- scanner.close();
- }
-
-}
-
http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALPlayer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALPlayer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALPlayer.java
deleted file mode 100644
index 427c5cc..0000000
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALPlayer.java
+++ /dev/null
@@ -1,231 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.mapreduce;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-import java.io.ByteArrayOutputStream;
-import java.io.PrintStream;
-import java.util.ArrayList;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.CellUtil;
-import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.MiniHBaseCluster;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.Delete;
-import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.Table;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.mapreduce.WALPlayer.WALKeyValueMapper;
-import org.apache.hadoop.hbase.util.FSUtils;
-import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
-import org.apache.hadoop.hbase.testclassification.LargeTests;
-import org.apache.hadoop.hbase.testclassification.MapReduceTests;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.LauncherSecurityManager;
-import org.apache.hadoop.hbase.wal.WAL;
-import org.apache.hadoop.hbase.wal.WALKey;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.Mapper.Context;
-import org.apache.hadoop.util.ToolRunner;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-import org.junit.rules.TestName;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-
-/**
- * Basic test for the WALPlayer M/R tool
- */
-@Category({MapReduceTests.class, LargeTests.class})
-public class TestWALPlayer {
- private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
- private static MiniHBaseCluster cluster;
- private static Path rootDir;
- private static Path walRootDir;
- private static FileSystem fs;
- private static FileSystem logFs;
- private static Configuration conf;
-
- @Rule
- public TestName name = new TestName();
-
- @BeforeClass
- public static void beforeClass() throws Exception {
- conf= TEST_UTIL.getConfiguration();
- rootDir = TEST_UTIL.createRootDir();
- walRootDir = TEST_UTIL.createWALRootDir();
- fs = FSUtils.getRootDirFileSystem(conf);
- logFs = FSUtils.getWALFileSystem(conf);
- cluster = TEST_UTIL.startMiniCluster();
- }
-
- @AfterClass
- public static void afterClass() throws Exception {
- TEST_UTIL.shutdownMiniCluster();
- fs.delete(rootDir, true);
- logFs.delete(walRootDir, true);
- }
-
- /**
- * Simple end-to-end test
- * @throws Exception
- */
- @Test
- public void testWALPlayer() throws Exception {
- final TableName tableName1 = TableName.valueOf(name.getMethodName() + "1");
- final TableName tableName2 = TableName.valueOf(name.getMethodName() + "2");
- final byte[] FAMILY = Bytes.toBytes("family");
- final byte[] COLUMN1 = Bytes.toBytes("c1");
- final byte[] COLUMN2 = Bytes.toBytes("c2");
- final byte[] ROW = Bytes.toBytes("row");
- Table t1 = TEST_UTIL.createTable(tableName1, FAMILY);
- Table t2 = TEST_UTIL.createTable(tableName2, FAMILY);
-
- // put a row into the first table
- Put p = new Put(ROW);
- p.addColumn(FAMILY, COLUMN1, COLUMN1);
- p.addColumn(FAMILY, COLUMN2, COLUMN2);
- t1.put(p);
- // delete one column
- Delete d = new Delete(ROW);
- d.addColumns(FAMILY, COLUMN1);
- t1.delete(d);
-
- // replay the WAL, map table 1 to table 2
- WAL log = cluster.getRegionServer(0).getWAL(null);
- log.rollWriter();
- String walInputDir = new Path(cluster.getMaster().getMasterFileSystem()
- .getWALRootDir(), HConstants.HREGION_LOGDIR_NAME).toString();
-
- Configuration configuration= TEST_UTIL.getConfiguration();
- WALPlayer player = new WALPlayer(configuration);
- String optionName="_test_.name";
- configuration.set(optionName, "1000");
- player.setupTime(configuration, optionName);
- assertEquals(1000,configuration.getLong(optionName,0));
- assertEquals(0, ToolRunner.run(configuration, player,
- new String[] {walInputDir, tableName1.getNameAsString(),
- tableName2.getNameAsString() }));
-
-
- // verify the WAL was player into table 2
- Get g = new Get(ROW);
- Result r = t2.get(g);
- assertEquals(1, r.size());
- assertTrue(CellUtil.matchingQualifier(r.rawCells()[0], COLUMN2));
- }
-
- /**
- * Test WALKeyValueMapper setup and map
- */
- @Test
- public void testWALKeyValueMapper() throws Exception {
- testWALKeyValueMapper(WALPlayer.TABLES_KEY);
- }
-
- @Test
- public void testWALKeyValueMapperWithDeprecatedConfig() throws Exception {
- testWALKeyValueMapper("hlog.input.tables");
- }
-
- private void testWALKeyValueMapper(final String tableConfigKey) throws Exception {
- Configuration configuration = new Configuration();
- configuration.set(tableConfigKey, "table");
- WALKeyValueMapper mapper = new WALKeyValueMapper();
- WALKey key = mock(WALKey.class);
- when(key.getTablename()).thenReturn(TableName.valueOf("table"));
- @SuppressWarnings("unchecked")
- Mapper<WALKey, WALEdit, ImmutableBytesWritable, KeyValue>.Context context = mock(Context.class);
- when(context.getConfiguration()).thenReturn(configuration);
-
- WALEdit value = mock(WALEdit.class);
- ArrayList<Cell> values = new ArrayList<>();
- KeyValue kv1 = new KeyValue(Bytes.toBytes("row"), Bytes.toBytes("family"), null);
-
- values.add(kv1);
- when(value.getCells()).thenReturn(values);
- mapper.setup(context);
-
- doAnswer(new Answer<Void>() {
-
- @Override
- public Void answer(InvocationOnMock invocation) throws Throwable {
- ImmutableBytesWritable writer = (ImmutableBytesWritable) invocation.getArguments()[0];
- KeyValue key = (KeyValue) invocation.getArguments()[1];
- assertEquals("row", Bytes.toString(writer.get()));
- assertEquals("row", Bytes.toString(CellUtil.cloneRow(key)));
- return null;
- }
- }).when(context).write(any(ImmutableBytesWritable.class), any(KeyValue.class));
-
- mapper.map(key, value, context);
-
- }
-
- /**
- * Test main method
- */
- @Test
- public void testMainMethod() throws Exception {
-
- PrintStream oldPrintStream = System.err;
- SecurityManager SECURITY_MANAGER = System.getSecurityManager();
- LauncherSecurityManager newSecurityManager= new LauncherSecurityManager();
- System.setSecurityManager(newSecurityManager);
- ByteArrayOutputStream data = new ByteArrayOutputStream();
- String[] args = {};
- System.setErr(new PrintStream(data));
- try {
- System.setErr(new PrintStream(data));
- try {
- WALPlayer.main(args);
- fail("should be SecurityException");
- } catch (SecurityException e) {
- assertEquals(-1, newSecurityManager.getExitCode());
- assertTrue(data.toString().contains("ERROR: Wrong number of arguments:"));
- assertTrue(data.toString().contains("Usage: WALPlayer [options] <wal inputdir>" +
- " <tables> [<tableMappings>]"));
- assertTrue(data.toString().contains("-Dwal.bulk.output=/path/for/output"));
- }
-
- } finally {
- System.setErr(oldPrintStream);
- System.setSecurityManager(SECURITY_MANAGER);
- }
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALRecordReader.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALRecordReader.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALRecordReader.java
deleted file mode 100644
index 34725b4..0000000
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALRecordReader.java
+++ /dev/null
@@ -1,276 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.mapreduce;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-import java.util.List;
-import java.util.NavigableMap;
-import java.util.TreeMap;
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.HColumnDescriptor;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.mapreduce.WALInputFormat.WALKeyRecordReader;
-import org.apache.hadoop.hbase.mapreduce.WALInputFormat.WALRecordReader;
-import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
-import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
-import org.apache.hadoop.hbase.util.FSUtils;
-import org.apache.hadoop.hbase.wal.WAL;
-import org.apache.hadoop.hbase.wal.WALFactory;
-import org.apache.hadoop.hbase.wal.WALKey;
-import org.apache.hadoop.hbase.testclassification.MapReduceTests;
-import org.apache.hadoop.hbase.testclassification.MediumTests;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.MapReduceTestUtil;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-
-/**
- * JUnit tests for the WALRecordReader
- */
-@Category({MapReduceTests.class, MediumTests.class})
-public class TestWALRecordReader {
- private static final Log LOG = LogFactory.getLog(TestWALRecordReader.class);
- private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
- private static Configuration conf;
- private static FileSystem fs;
- private static Path hbaseDir;
- private static FileSystem walFs;
- private static Path walRootDir;
- // visible for TestHLogRecordReader
- static final TableName tableName = TableName.valueOf(getName());
- private static final byte [] rowName = tableName.getName();
- // visible for TestHLogRecordReader
- static final HRegionInfo info = new HRegionInfo(tableName,
- Bytes.toBytes(""), Bytes.toBytes(""), false);
- private static final byte [] family = Bytes.toBytes("column");
- private static final byte [] value = Bytes.toBytes("value");
- private static HTableDescriptor htd;
- private static Path logDir;
- protected MultiVersionConcurrencyControl mvcc;
- protected static NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
-
- private static String getName() {
- return "TestWALRecordReader";
- }
-
- @Before
- public void setUp() throws Exception {
- fs.delete(hbaseDir, true);
- walFs.delete(walRootDir, true);
- mvcc = new MultiVersionConcurrencyControl();
- }
- @BeforeClass
- public static void setUpBeforeClass() throws Exception {
- // Make block sizes small.
- conf = TEST_UTIL.getConfiguration();
- conf.setInt("dfs.blocksize", 1024 * 1024);
- conf.setInt("dfs.replication", 1);
- TEST_UTIL.startMiniDFSCluster(1);
-
- conf = TEST_UTIL.getConfiguration();
- fs = TEST_UTIL.getDFSCluster().getFileSystem();
-
- hbaseDir = TEST_UTIL.createRootDir();
- walRootDir = TEST_UTIL.createWALRootDir();
- walFs = FSUtils.getWALFileSystem(conf);
- logDir = new Path(walRootDir, HConstants.HREGION_LOGDIR_NAME);
-
- htd = new HTableDescriptor(tableName);
- htd.addFamily(new HColumnDescriptor(family));
- }
-
- @AfterClass
- public static void tearDownAfterClass() throws Exception {
- fs.delete(hbaseDir, true);
- walFs.delete(walRootDir, true);
- TEST_UTIL.shutdownMiniCluster();
- }
-
- /**
- * Test partial reads from the log based on passed time range
- * @throws Exception
- */
- @Test
- public void testPartialRead() throws Exception {
- final WALFactory walfactory = new WALFactory(conf, null, getName());
- WAL log = walfactory.getWAL(info.getEncodedNameAsBytes(), info.getTable().getNamespace());
- // This test depends on timestamp being millisecond based and the filename of the WAL also
- // being millisecond based.
- long ts = System.currentTimeMillis();
- WALEdit edit = new WALEdit();
- edit.add(new KeyValue(rowName, family, Bytes.toBytes("1"), ts, value));
- log.append(info, getWalKey(ts, scopes), edit, true);
- edit = new WALEdit();
- edit.add(new KeyValue(rowName, family, Bytes.toBytes("2"), ts+1, value));
- log.append(info, getWalKey(ts+1, scopes), edit, true);
- log.sync();
- LOG.info("Before 1st WAL roll " + log.toString());
- log.rollWriter();
- LOG.info("Past 1st WAL roll " + log.toString());
-
- Thread.sleep(1);
- long ts1 = System.currentTimeMillis();
-
- edit = new WALEdit();
- edit.add(new KeyValue(rowName, family, Bytes.toBytes("3"), ts1+1, value));
- log.append(info, getWalKey(ts1+1, scopes), edit, true);
- edit = new WALEdit();
- edit.add(new KeyValue(rowName, family, Bytes.toBytes("4"), ts1+2, value));
- log.append(info, getWalKey(ts1+2, scopes), edit, true);
- log.sync();
- log.shutdown();
- walfactory.shutdown();
- LOG.info("Closed WAL " + log.toString());
-
-
- WALInputFormat input = new WALInputFormat();
- Configuration jobConf = new Configuration(conf);
- jobConf.set("mapreduce.input.fileinputformat.inputdir", logDir.toString());
- jobConf.setLong(WALInputFormat.END_TIME_KEY, ts);
-
- // only 1st file is considered, and only its 1st entry is used
- List<InputSplit> splits = input.getSplits(MapreduceTestingShim.createJobContext(jobConf));
-
- assertEquals(1, splits.size());
- testSplit(splits.get(0), Bytes.toBytes("1"));
-
- jobConf.setLong(WALInputFormat.START_TIME_KEY, ts+1);
- jobConf.setLong(WALInputFormat.END_TIME_KEY, ts1+1);
- splits = input.getSplits(MapreduceTestingShim.createJobContext(jobConf));
- // both files need to be considered
- assertEquals(2, splits.size());
- // only the 2nd entry from the 1st file is used
- testSplit(splits.get(0), Bytes.toBytes("2"));
- // only the 1nd entry from the 2nd file is used
- testSplit(splits.get(1), Bytes.toBytes("3"));
- }
-
- /**
- * Test basic functionality
- * @throws Exception
- */
- @Test
- public void testWALRecordReader() throws Exception {
- final WALFactory walfactory = new WALFactory(conf, null, getName());
- WAL log = walfactory.getWAL(info.getEncodedNameAsBytes(), info.getTable().getNamespace());
- byte [] value = Bytes.toBytes("value");
- final AtomicLong sequenceId = new AtomicLong(0);
- WALEdit edit = new WALEdit();
- edit.add(new KeyValue(rowName, family, Bytes.toBytes("1"),
- System.currentTimeMillis(), value));
- long txid = log.append(info, getWalKey(System.currentTimeMillis(), scopes), edit, true);
- log.sync(txid);
-
- Thread.sleep(1); // make sure 2nd log gets a later timestamp
- long secondTs = System.currentTimeMillis();
- log.rollWriter();
-
- edit = new WALEdit();
- edit.add(new KeyValue(rowName, family, Bytes.toBytes("2"),
- System.currentTimeMillis(), value));
- txid = log.append(info, getWalKey(System.currentTimeMillis(), scopes), edit, true);
- log.sync(txid);
- log.shutdown();
- walfactory.shutdown();
- long thirdTs = System.currentTimeMillis();
-
- // should have 2 log files now
- WALInputFormat input = new WALInputFormat();
- Configuration jobConf = new Configuration(conf);
- jobConf.set("mapreduce.input.fileinputformat.inputdir", logDir.toString());
-
- // make sure both logs are found
- List<InputSplit> splits = input.getSplits(MapreduceTestingShim.createJobContext(jobConf));
- assertEquals(2, splits.size());
-
- // should return exactly one KV
- testSplit(splits.get(0), Bytes.toBytes("1"));
- // same for the 2nd split
- testSplit(splits.get(1), Bytes.toBytes("2"));
-
- // now test basic time ranges:
-
- // set an endtime, the 2nd log file can be ignored completely.
- jobConf.setLong(WALInputFormat.END_TIME_KEY, secondTs-1);
- splits = input.getSplits(MapreduceTestingShim.createJobContext(jobConf));
- assertEquals(1, splits.size());
- testSplit(splits.get(0), Bytes.toBytes("1"));
-
- // now set a start time
- jobConf.setLong(WALInputFormat.END_TIME_KEY, Long.MAX_VALUE);
- jobConf.setLong(WALInputFormat.START_TIME_KEY, thirdTs);
- splits = input.getSplits(MapreduceTestingShim.createJobContext(jobConf));
- // both logs need to be considered
- assertEquals(2, splits.size());
- // but both readers skip all edits
- testSplit(splits.get(0));
- testSplit(splits.get(1));
- }
-
- protected WALKey getWalKey(final long time, NavigableMap<byte[], Integer> scopes) {
- return new WALKey(info.getEncodedNameAsBytes(), tableName, time, mvcc, scopes);
- }
-
- protected WALRecordReader getReader() {
- return new WALKeyRecordReader();
- }
-
- /**
- * Create a new reader from the split, and match the edits against the passed columns.
- */
- private void testSplit(InputSplit split, byte[]... columns) throws Exception {
- final WALRecordReader reader = getReader();
- reader.initialize(split, MapReduceTestUtil.createDummyMapTaskAttemptContext(conf));
-
- for (byte[] column : columns) {
- assertTrue(reader.nextKeyValue());
- Cell cell = reader.getCurrentValue().getCells().get(0);
- if (!Bytes.equals(column, 0, column.length, cell.getQualifierArray(),
- cell.getQualifierOffset(), cell.getQualifierLength())) {
- assertTrue(
- "expected ["
- + Bytes.toString(column)
- + "], actual ["
- + Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(),
- cell.getQualifierLength()) + "]", false);
- }
- }
- assertFalse(reader.nextKeyValue());
- reader.close();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TsvImporterCustomTestMapper.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TsvImporterCustomTestMapper.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TsvImporterCustomTestMapper.java
deleted file mode 100644
index aea5036..0000000
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TsvImporterCustomTestMapper.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.mapreduce;
-
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.Durability;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.KeyValue;
-
-import java.io.IOException;
-
-/**
- * Dummy mapper used for unit tests to verify that the mapper can be injected.
- * This approach would be used if a custom transformation needed to be done after
- * reading the input data before writing it to HFiles.
- */
-public class TsvImporterCustomTestMapper extends TsvImporterMapper {
-
- @Override
- protected void setup(Context context) {
- doSetup(context);
- }
-
- /**
- * Convert a line of TSV text into an HBase table row after transforming the
- * values by multiplying them by 3.
- */
- @Override
- public void map(LongWritable offset, Text value, Context context)
- throws IOException {
- byte[] family = Bytes.toBytes("FAM");
- final byte[][] qualifiers = { Bytes.toBytes("A"), Bytes.toBytes("B") };
-
- // do some basic line parsing
- byte[] lineBytes = value.getBytes();
- String[] valueTokens = new String(lineBytes, "UTF-8").split("\u001b");
-
- // create the rowKey and Put
- ImmutableBytesWritable rowKey =
- new ImmutableBytesWritable(Bytes.toBytes(valueTokens[0]));
- Put put = new Put(rowKey.copyBytes());
- put.setDurability(Durability.SKIP_WAL);
-
- //The value should look like this: VALUE1 or VALUE2. Let's multiply
- //the integer by 3
- for(int i = 1; i < valueTokens.length; i++) {
- String prefix = valueTokens[i].substring(0, "VALUE".length());
- String suffix = valueTokens[i].substring("VALUE".length());
- String newValue = prefix + Integer.parseInt(suffix) * 3;
-
- KeyValue kv = new KeyValue(rowKey.copyBytes(), family,
- qualifiers[i-1], Bytes.toBytes(newValue));
- put.add(kv);
- }
-
- try {
- context.write(rowKey, put);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
-}