You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by bu...@apache.org on 2017/08/27 05:33:35 UTC
[35/50] [abbrv] hbase git commit: HBASE-18640 Move mapreduce out of
hbase-server into separate module.
http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/ScanPerformanceEvaluation.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/ScanPerformanceEvaluation.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/ScanPerformanceEvaluation.java
new file mode 100644
index 0000000..e669f14
--- /dev/null
+++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/ScanPerformanceEvaluation.java
@@ -0,0 +1,406 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.TableSnapshotScanner;
+import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
+import org.apache.hadoop.hbase.mapreduce.TableMapper;
+import org.apache.hadoop.hbase.util.AbstractHBaseTool;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.ToolRunner;
+
+import org.apache.hadoop.hbase.shaded.com.google.common.base.Stopwatch;
+
+/**
+ * A simple performance evaluation tool for single client and MR scans
+ * and snapshot scans.
+ */
+@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS)
+public class ScanPerformanceEvaluation extends AbstractHBaseTool {
+
+ private static final String HBASE_COUNTER_GROUP_NAME = "HBase Counters";
+
+ private String type;
+ private String file;
+ private String tablename;
+ private String snapshotName;
+ private String restoreDir;
+ private String caching;
+
+ @Override
+ public void setConf(Configuration conf) {
+ super.setConf(conf);
+ Path rootDir;
+ try {
+ rootDir = FSUtils.getRootDir(conf);
+ rootDir.getFileSystem(conf);
+ } catch (IOException ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+
+ @Override
+ protected void addOptions() {
+ this.addRequiredOptWithArg("t", "type", "the type of the test. One of the following: streaming|scan|snapshotscan|scanmapreduce|snapshotscanmapreduce");
+ this.addOptWithArg("f", "file", "the filename to read from");
+ this.addOptWithArg("tn", "table", "the tablename to read from");
+ this.addOptWithArg("sn", "snapshot", "the snapshot name to read from");
+ this.addOptWithArg("rs", "restoredir", "the directory to restore the snapshot");
+ this.addOptWithArg("ch", "caching", "scanner caching value");
+ }
+
+ @Override
+ protected void processOptions(CommandLine cmd) {
+ type = cmd.getOptionValue("type");
+ file = cmd.getOptionValue("file");
+ tablename = cmd.getOptionValue("table");
+ snapshotName = cmd.getOptionValue("snapshot");
+ restoreDir = cmd.getOptionValue("restoredir");
+ caching = cmd.getOptionValue("caching");
+ }
+
+ protected void testHdfsStreaming(Path filename) throws IOException {
+ byte[] buf = new byte[1024];
+ FileSystem fs = filename.getFileSystem(getConf());
+
+ // read the file from start to finish
+ Stopwatch fileOpenTimer = Stopwatch.createUnstarted();
+ Stopwatch streamTimer = Stopwatch.createUnstarted();
+
+ fileOpenTimer.start();
+ FSDataInputStream in = fs.open(filename);
+ fileOpenTimer.stop();
+
+ long totalBytes = 0;
+ streamTimer.start();
+ while (true) {
+ int read = in.read(buf);
+ if (read < 0) {
+ break;
+ }
+ totalBytes += read;
+ }
+ streamTimer.stop();
+
+ double throughput = (double)totalBytes / streamTimer.elapsed(TimeUnit.SECONDS);
+
+ System.out.println("HDFS streaming: ");
+ System.out.println("total time to open: " +
+ fileOpenTimer.elapsed(TimeUnit.MILLISECONDS) + " ms");
+ System.out.println("total time to read: " + streamTimer.elapsed(TimeUnit.MILLISECONDS) + " ms");
+ System.out.println("total bytes: " + totalBytes + " bytes ("
+ + StringUtils.humanReadableInt(totalBytes) + ")");
+ System.out.println("throghput : " + StringUtils.humanReadableInt((long)throughput) + "B/s");
+ }
+
+ private Scan getScan() {
+ Scan scan = new Scan(); // default scan settings
+ scan.setCacheBlocks(false);
+ scan.setMaxVersions(1);
+ scan.setScanMetricsEnabled(true);
+ if (caching != null) {
+ scan.setCaching(Integer.parseInt(caching));
+ }
+
+ return scan;
+ }
+
+ public void testScan() throws IOException {
+ Stopwatch tableOpenTimer = Stopwatch.createUnstarted();
+ Stopwatch scanOpenTimer = Stopwatch.createUnstarted();
+ Stopwatch scanTimer = Stopwatch.createUnstarted();
+
+ tableOpenTimer.start();
+ Connection connection = ConnectionFactory.createConnection(getConf());
+ Table table = connection.getTable(TableName.valueOf(tablename));
+ tableOpenTimer.stop();
+
+ Scan scan = getScan();
+ scanOpenTimer.start();
+ ResultScanner scanner = table.getScanner(scan);
+ scanOpenTimer.stop();
+
+ long numRows = 0;
+ long numCells = 0;
+ scanTimer.start();
+ while (true) {
+ Result result = scanner.next();
+ if (result == null) {
+ break;
+ }
+ numRows++;
+
+ numCells += result.rawCells().length;
+ }
+ scanTimer.stop();
+ scanner.close();
+ table.close();
+ connection.close();
+
+ ScanMetrics metrics = scan.getScanMetrics();
+ long totalBytes = metrics.countOfBytesInResults.get();
+ double throughput = (double)totalBytes / scanTimer.elapsed(TimeUnit.SECONDS);
+ double throughputRows = (double)numRows / scanTimer.elapsed(TimeUnit.SECONDS);
+ double throughputCells = (double)numCells / scanTimer.elapsed(TimeUnit.SECONDS);
+
+ System.out.println("HBase scan: ");
+ System.out.println("total time to open table: " +
+ tableOpenTimer.elapsed(TimeUnit.MILLISECONDS) + " ms");
+ System.out.println("total time to open scanner: " +
+ scanOpenTimer.elapsed(TimeUnit.MILLISECONDS) + " ms");
+ System.out.println("total time to scan: " +
+ scanTimer.elapsed(TimeUnit.MILLISECONDS) + " ms");
+
+ System.out.println("Scan metrics:\n" + metrics.getMetricsMap());
+
+ System.out.println("total bytes: " + totalBytes + " bytes ("
+ + StringUtils.humanReadableInt(totalBytes) + ")");
+ System.out.println("throughput : " + StringUtils.humanReadableInt((long)throughput) + "B/s");
+ System.out.println("total rows : " + numRows);
+ System.out.println("throughput : " + StringUtils.humanReadableInt((long)throughputRows) + " rows/s");
+ System.out.println("total cells : " + numCells);
+ System.out.println("throughput : " + StringUtils.humanReadableInt((long)throughputCells) + " cells/s");
+ }
+
+
+ public void testSnapshotScan() throws IOException {
+ Stopwatch snapshotRestoreTimer = Stopwatch.createUnstarted();
+ Stopwatch scanOpenTimer = Stopwatch.createUnstarted();
+ Stopwatch scanTimer = Stopwatch.createUnstarted();
+
+ Path restoreDir = new Path(this.restoreDir);
+
+ snapshotRestoreTimer.start();
+ restoreDir.getFileSystem(conf).delete(restoreDir, true);
+ snapshotRestoreTimer.stop();
+
+ Scan scan = getScan();
+ scanOpenTimer.start();
+ TableSnapshotScanner scanner = new TableSnapshotScanner(conf, restoreDir, snapshotName, scan);
+ scanOpenTimer.stop();
+
+ long numRows = 0;
+ long numCells = 0;
+ scanTimer.start();
+ while (true) {
+ Result result = scanner.next();
+ if (result == null) {
+ break;
+ }
+ numRows++;
+
+ numCells += result.rawCells().length;
+ }
+ scanTimer.stop();
+ scanner.close();
+
+ ScanMetrics metrics = scanner.getScanMetrics();
+ long totalBytes = metrics.countOfBytesInResults.get();
+ double throughput = (double)totalBytes / scanTimer.elapsed(TimeUnit.SECONDS);
+ double throughputRows = (double)numRows / scanTimer.elapsed(TimeUnit.SECONDS);
+ double throughputCells = (double)numCells / scanTimer.elapsed(TimeUnit.SECONDS);
+
+ System.out.println("HBase scan snapshot: ");
+ System.out.println("total time to restore snapshot: " +
+ snapshotRestoreTimer.elapsed(TimeUnit.MILLISECONDS) + " ms");
+ System.out.println("total time to open scanner: " +
+ scanOpenTimer.elapsed(TimeUnit.MILLISECONDS) + " ms");
+ System.out.println("total time to scan: " +
+ scanTimer.elapsed(TimeUnit.MILLISECONDS) + " ms");
+
+ System.out.println("Scan metrics:\n" + metrics.getMetricsMap());
+
+ System.out.println("total bytes: " + totalBytes + " bytes ("
+ + StringUtils.humanReadableInt(totalBytes) + ")");
+ System.out.println("throughput : " + StringUtils.humanReadableInt((long)throughput) + "B/s");
+ System.out.println("total rows : " + numRows);
+ System.out.println("throughput : " + StringUtils.humanReadableInt((long)throughputRows) + " rows/s");
+ System.out.println("total cells : " + numCells);
+ System.out.println("throughput : " + StringUtils.humanReadableInt((long)throughputCells) + " cells/s");
+
+ }
+
+ public static enum ScanCounter {
+ NUM_ROWS,
+ NUM_CELLS,
+ }
+
+ public static class MyMapper<KEYOUT, VALUEOUT> extends TableMapper<KEYOUT, VALUEOUT> {
+ @Override
+ protected void map(ImmutableBytesWritable key, Result value,
+ Context context) throws IOException,
+ InterruptedException {
+ context.getCounter(ScanCounter.NUM_ROWS).increment(1);
+ context.getCounter(ScanCounter.NUM_CELLS).increment(value.rawCells().length);
+ }
+ }
+
+ public void testScanMapReduce() throws IOException, InterruptedException, ClassNotFoundException {
+ Stopwatch scanOpenTimer = Stopwatch.createUnstarted();
+ Stopwatch scanTimer = Stopwatch.createUnstarted();
+
+ Scan scan = getScan();
+
+ String jobName = "testScanMapReduce";
+
+ Job job = new Job(conf);
+ job.setJobName(jobName);
+
+ job.setJarByClass(getClass());
+
+ TableMapReduceUtil.initTableMapperJob(
+ this.tablename,
+ scan,
+ MyMapper.class,
+ NullWritable.class,
+ NullWritable.class,
+ job
+ );
+
+ job.setNumReduceTasks(0);
+ job.setOutputKeyClass(NullWritable.class);
+ job.setOutputValueClass(NullWritable.class);
+ job.setOutputFormatClass(NullOutputFormat.class);
+
+ scanTimer.start();
+ job.waitForCompletion(true);
+ scanTimer.stop();
+
+ Counters counters = job.getCounters();
+ long numRows = counters.findCounter(ScanCounter.NUM_ROWS).getValue();
+ long numCells = counters.findCounter(ScanCounter.NUM_CELLS).getValue();
+
+ long totalBytes = counters.findCounter(HBASE_COUNTER_GROUP_NAME, "BYTES_IN_RESULTS").getValue();
+ double throughput = (double)totalBytes / scanTimer.elapsed(TimeUnit.SECONDS);
+ double throughputRows = (double)numRows / scanTimer.elapsed(TimeUnit.SECONDS);
+ double throughputCells = (double)numCells / scanTimer.elapsed(TimeUnit.SECONDS);
+
+ System.out.println("HBase scan mapreduce: ");
+ System.out.println("total time to open scanner: " +
+ scanOpenTimer.elapsed(TimeUnit.MILLISECONDS) + " ms");
+ System.out.println("total time to scan: " + scanTimer.elapsed(TimeUnit.MILLISECONDS) + " ms");
+
+ System.out.println("total bytes: " + totalBytes + " bytes ("
+ + StringUtils.humanReadableInt(totalBytes) + ")");
+ System.out.println("throughput : " + StringUtils.humanReadableInt((long)throughput) + "B/s");
+ System.out.println("total rows : " + numRows);
+ System.out.println("throughput : " + StringUtils.humanReadableInt((long)throughputRows) + " rows/s");
+ System.out.println("total cells : " + numCells);
+ System.out.println("throughput : " + StringUtils.humanReadableInt((long)throughputCells) + " cells/s");
+ }
+
+ public void testSnapshotScanMapReduce() throws IOException, InterruptedException, ClassNotFoundException {
+ Stopwatch scanOpenTimer = Stopwatch.createUnstarted();
+ Stopwatch scanTimer = Stopwatch.createUnstarted();
+
+ Scan scan = getScan();
+
+ String jobName = "testSnapshotScanMapReduce";
+
+ Job job = new Job(conf);
+ job.setJobName(jobName);
+
+ job.setJarByClass(getClass());
+
+ TableMapReduceUtil.initTableSnapshotMapperJob(
+ this.snapshotName,
+ scan,
+ MyMapper.class,
+ NullWritable.class,
+ NullWritable.class,
+ job,
+ true,
+ new Path(restoreDir)
+ );
+
+ job.setNumReduceTasks(0);
+ job.setOutputKeyClass(NullWritable.class);
+ job.setOutputValueClass(NullWritable.class);
+ job.setOutputFormatClass(NullOutputFormat.class);
+
+ scanTimer.start();
+ job.waitForCompletion(true);
+ scanTimer.stop();
+
+ Counters counters = job.getCounters();
+ long numRows = counters.findCounter(ScanCounter.NUM_ROWS).getValue();
+ long numCells = counters.findCounter(ScanCounter.NUM_CELLS).getValue();
+
+ long totalBytes = counters.findCounter(HBASE_COUNTER_GROUP_NAME, "BYTES_IN_RESULTS").getValue();
+ double throughput = (double)totalBytes / scanTimer.elapsed(TimeUnit.SECONDS);
+ double throughputRows = (double)numRows / scanTimer.elapsed(TimeUnit.SECONDS);
+ double throughputCells = (double)numCells / scanTimer.elapsed(TimeUnit.SECONDS);
+
+ System.out.println("HBase scan mapreduce: ");
+ System.out.println("total time to open scanner: " +
+ scanOpenTimer.elapsed(TimeUnit.MILLISECONDS) + " ms");
+ System.out.println("total time to scan: " + scanTimer.elapsed(TimeUnit.MILLISECONDS) + " ms");
+
+ System.out.println("total bytes: " + totalBytes + " bytes ("
+ + StringUtils.humanReadableInt(totalBytes) + ")");
+ System.out.println("throughput : " + StringUtils.humanReadableInt((long)throughput) + "B/s");
+ System.out.println("total rows : " + numRows);
+ System.out.println("throughput : " + StringUtils.humanReadableInt((long)throughputRows) + " rows/s");
+ System.out.println("total cells : " + numCells);
+ System.out.println("throughput : " + StringUtils.humanReadableInt((long)throughputCells) + " cells/s");
+ }
+
+ @Override
+ protected int doWork() throws Exception {
+ if (type.equals("streaming")) {
+ testHdfsStreaming(new Path(file));
+ } else if (type.equals("scan")){
+ testScan();
+ } else if (type.equals("snapshotscan")) {
+ testSnapshotScan();
+ } else if (type.equals("scanmapreduce")) {
+ testScanMapReduce();
+ } else if (type.equals("snapshotscanmapreduce")) {
+ testSnapshotScanMapReduce();
+ }
+ return 0;
+ }
+
+ public static void main (String[] args) throws Exception {
+ int ret = ToolRunner.run(HBaseConfiguration.create(), new ScanPerformanceEvaluation(), args);
+ System.exit(ret);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/TestPerformanceEvaluation.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/TestPerformanceEvaluation.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/TestPerformanceEvaluation.java
new file mode 100644
index 0000000..86a3d3f
--- /dev/null
+++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/TestPerformanceEvaluation.java
@@ -0,0 +1,218 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase;
+
+import static org.junit.Assert.*;
+
+import java.io.BufferedReader;
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.util.NoSuchElementException;
+import java.util.Queue;
+import java.util.Random;
+import java.util.LinkedList;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.PerformanceEvaluation.RandomReadTest;
+import org.apache.hadoop.hbase.PerformanceEvaluation.TestOptions;
+import org.apache.hadoop.hbase.testclassification.MiscTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.codehaus.jackson.JsonGenerationException;
+import org.codehaus.jackson.map.JsonMappingException;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import com.codahale.metrics.Histogram;
+import com.codahale.metrics.Snapshot;
+import com.codahale.metrics.UniformReservoir;
+
+@Category({MiscTests.class, SmallTests.class})
+public class TestPerformanceEvaluation {
+ private static final HBaseTestingUtility HTU = new HBaseTestingUtility();
+
+ @Test
+ public void testSerialization()
+ throws JsonGenerationException, JsonMappingException, IOException {
+ PerformanceEvaluation.TestOptions options = new PerformanceEvaluation.TestOptions();
+ assertTrue(!options.isAutoFlush());
+ options.setAutoFlush(true);
+ ObjectMapper mapper = new ObjectMapper();
+ String optionsString = mapper.writeValueAsString(options);
+ PerformanceEvaluation.TestOptions optionsDeserialized =
+ mapper.readValue(optionsString, PerformanceEvaluation.TestOptions.class);
+ assertTrue(optionsDeserialized.isAutoFlush());
+ }
+
+ /**
+ * Exercise the mr spec writing. Simple assertions to make sure it is basically working.
+ * @throws IOException
+ */
+ @Ignore @Test
+ public void testWriteInputFile() throws IOException {
+ TestOptions opts = new PerformanceEvaluation.TestOptions();
+ final int clients = 10;
+ opts.setNumClientThreads(clients);
+ opts.setPerClientRunRows(10);
+ Path dir =
+ PerformanceEvaluation.writeInputFile(HTU.getConfiguration(), opts, HTU.getDataTestDir());
+ FileSystem fs = FileSystem.get(HTU.getConfiguration());
+ Path p = new Path(dir, PerformanceEvaluation.JOB_INPUT_FILENAME);
+ long len = fs.getFileStatus(p).getLen();
+ assertTrue(len > 0);
+ byte [] content = new byte[(int)len];
+ FSDataInputStream dis = fs.open(p);
+ try {
+ dis.readFully(content);
+ BufferedReader br =
+ new BufferedReader(new InputStreamReader(new ByteArrayInputStream(content)));
+ int count = 0;
+ while (br.readLine() != null) {
+ count++;
+ }
+ assertEquals(clients, count);
+ } finally {
+ dis.close();
+ }
+ }
+
+ @Test
+ public void testSizeCalculation() {
+ TestOptions opts = new PerformanceEvaluation.TestOptions();
+ opts = PerformanceEvaluation.calculateRowsAndSize(opts);
+ int rows = opts.getPerClientRunRows();
+ // Default row count
+ final int defaultPerClientRunRows = 1024 * 1024;
+ assertEquals(defaultPerClientRunRows, rows);
+ // If size is 2G, then twice the row count.
+ opts.setSize(2.0f);
+ opts = PerformanceEvaluation.calculateRowsAndSize(opts);
+ assertEquals(defaultPerClientRunRows * 2, opts.getPerClientRunRows());
+ // If two clients, then they get half the rows each.
+ opts.setNumClientThreads(2);
+ opts = PerformanceEvaluation.calculateRowsAndSize(opts);
+ assertEquals(defaultPerClientRunRows, opts.getPerClientRunRows());
+ // What if valueSize is 'random'? Then half of the valueSize so twice the rows.
+ opts.valueRandom = true;
+ opts = PerformanceEvaluation.calculateRowsAndSize(opts);
+ assertEquals(defaultPerClientRunRows * 2, opts.getPerClientRunRows());
+ }
+
+ @Test
+ public void testRandomReadCalculation() {
+ TestOptions opts = new PerformanceEvaluation.TestOptions();
+ opts = PerformanceEvaluation.calculateRowsAndSize(opts);
+ int rows = opts.getPerClientRunRows();
+ // Default row count
+ final int defaultPerClientRunRows = 1024 * 1024;
+ assertEquals(defaultPerClientRunRows, rows);
+ // If size is 2G, then twice the row count.
+ opts.setSize(2.0f);
+ opts.setPerClientRunRows(1000);
+ opts.setCmdName(PerformanceEvaluation.RANDOM_READ);
+ opts = PerformanceEvaluation.calculateRowsAndSize(opts);
+ assertEquals(1000, opts.getPerClientRunRows());
+ // If two clients, then they get half the rows each.
+ opts.setNumClientThreads(2);
+ opts = PerformanceEvaluation.calculateRowsAndSize(opts);
+ assertEquals(1000, opts.getPerClientRunRows());
+ Random random = new Random();
+ // assuming we will get one before this loop expires
+ boolean foundValue = false;
+ for (int i = 0; i < 10000000; i++) {
+ int randomRow = PerformanceEvaluation.generateRandomRow(random, opts.totalRows);
+ if (randomRow > 1000) {
+ foundValue = true;
+ break;
+ }
+ }
+ assertTrue("We need to get a value more than 1000", foundValue);
+ }
+
+ @Test
+ public void testZipfian()
+ throws NoSuchMethodException, SecurityException, InstantiationException, IllegalAccessException,
+ IllegalArgumentException, InvocationTargetException {
+ TestOptions opts = new PerformanceEvaluation.TestOptions();
+ opts.setValueZipf(true);
+ final int valueSize = 1024;
+ opts.setValueSize(valueSize);
+ RandomReadTest rrt = new RandomReadTest(null, opts, null);
+ Constructor<?> ctor =
+ Histogram.class.getDeclaredConstructor(com.codahale.metrics.Reservoir.class);
+ ctor.setAccessible(true);
+ Histogram histogram = (Histogram)ctor.newInstance(new UniformReservoir(1024 * 500));
+ for (int i = 0; i < 100; i++) {
+ histogram.update(rrt.getValueLength(null));
+ }
+ Snapshot snapshot = histogram.getSnapshot();
+ double stddev = snapshot.getStdDev();
+ assertTrue(stddev != 0 && stddev != 1.0);
+ assertTrue(snapshot.getStdDev() != 0);
+ double median = snapshot.getMedian();
+ assertTrue(median != 0 && median != 1 && median != valueSize);
+ }
+
+ @Test
+ public void testParseOptsWithThreads() {
+ Queue<String> opts = new LinkedList<>();
+ String cmdName = "sequentialWrite";
+ int threads = 1;
+ opts.offer(cmdName);
+ opts.offer(String.valueOf(threads));
+ PerformanceEvaluation.TestOptions options = PerformanceEvaluation.parseOpts(opts);
+ assertNotNull(options);
+ assertNotNull(options.getCmdName());
+ assertEquals(cmdName, options.getCmdName());
+ assertEquals(threads, options.getNumClientThreads());
+ }
+
+ @Test
+ public void testParseOptsWrongThreads() {
+ Queue<String> opts = new LinkedList<>();
+ String cmdName = "sequentialWrite";
+ opts.offer(cmdName);
+ opts.offer("qq");
+ try {
+ PerformanceEvaluation.parseOpts(opts);
+ } catch (IllegalArgumentException e) {
+ System.out.println(e.getMessage());
+ assertEquals("Command " + cmdName + " does not have threads number", e.getMessage());
+ assertTrue(e.getCause() instanceof NumberFormatException);
+ }
+ }
+
+ @Test
+ public void testParseOptsNoThreads() {
+ Queue<String> opts = new LinkedList<>();
+ String cmdName = "sequentialWrite";
+ try {
+ PerformanceEvaluation.parseOpts(opts);
+ } catch (IllegalArgumentException e) {
+ System.out.println(e.getMessage());
+ assertEquals("Command " + cmdName + " does not have threads number", e.getMessage());
+ assertTrue(e.getCause() instanceof NoSuchElementException);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapred/TestDriver.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapred/TestDriver.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapred/TestDriver.java
new file mode 100644
index 0000000..d085c21
--- /dev/null
+++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapred/TestDriver.java
@@ -0,0 +1,41 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapred;
+
+import org.apache.hadoop.hbase.testclassification.MapReduceTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.util.ProgramDriver;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.mockito.Mockito;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+
+@Category({MapReduceTests.class, SmallTests.class})
+public class TestDriver {
+
+ @Test
+ public void testDriverMainMethod() throws Throwable {
+ ProgramDriver programDriverMock = mock(ProgramDriver.class);
+ Driver.setProgramDriver(programDriverMock);
+ Driver.main(new String[]{});
+ verify(programDriverMock).driver(Mockito.any(String[].class));
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapred/TestGroupingTableMap.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapred/TestGroupingTableMap.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapred/TestGroupingTableMap.java
new file mode 100644
index 0000000..7131cf9
--- /dev/null
+++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapred/TestGroupingTableMap.java
@@ -0,0 +1,181 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapred;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertNull;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyZeroInteractions;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.testclassification.MapReduceTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.hadoop.hbase.shaded.com.google.common.collect.ImmutableList;
+
+@Category({MapReduceTests.class, SmallTests.class})
+public class TestGroupingTableMap {
+
+ @Test
+ @SuppressWarnings({ "deprecation", "unchecked" })
+ public void shouldNotCallCollectonSinceFindUniqueKeyValueMoreThanOnes()
+ throws Exception {
+ GroupingTableMap gTableMap = null;
+ try {
+ Result result = mock(Result.class);
+ Reporter reporter = mock(Reporter.class);
+ gTableMap = new GroupingTableMap();
+ Configuration cfg = new Configuration();
+ cfg.set(GroupingTableMap.GROUP_COLUMNS, "familyA:qualifierA familyB:qualifierB");
+ JobConf jobConf = new JobConf(cfg);
+ gTableMap.configure(jobConf);
+
+ byte[] row = {};
+ List<Cell> keyValues = ImmutableList.<Cell>of(
+ new KeyValue(row, "familyA".getBytes(), "qualifierA".getBytes(), Bytes.toBytes("1111")),
+ new KeyValue(row, "familyA".getBytes(), "qualifierA".getBytes(), Bytes.toBytes("2222")),
+ new KeyValue(row, "familyB".getBytes(), "qualifierB".getBytes(), Bytes.toBytes("3333")));
+ when(result.listCells()).thenReturn(keyValues);
+ OutputCollector<ImmutableBytesWritable, Result> outputCollectorMock =
+ mock(OutputCollector.class);
+ gTableMap.map(null, result, outputCollectorMock, reporter);
+ verify(result).listCells();
+ verifyZeroInteractions(outputCollectorMock);
+ } finally {
+ if (gTableMap != null)
+ gTableMap.close();
+ }
+ }
+
+ @Test
+ @SuppressWarnings({ "deprecation", "unchecked" })
+ public void shouldCreateNewKeyAlthoughExtraKey() throws Exception {
+ GroupingTableMap gTableMap = null;
+ try {
+ Result result = mock(Result.class);
+ Reporter reporter = mock(Reporter.class);
+ gTableMap = new GroupingTableMap();
+ Configuration cfg = new Configuration();
+ cfg.set(GroupingTableMap.GROUP_COLUMNS, "familyA:qualifierA familyB:qualifierB");
+ JobConf jobConf = new JobConf(cfg);
+ gTableMap.configure(jobConf);
+
+ byte[] row = {};
+ List<Cell> keyValues = ImmutableList.<Cell>of(
+ new KeyValue(row, "familyA".getBytes(), "qualifierA".getBytes(), Bytes.toBytes("1111")),
+ new KeyValue(row, "familyB".getBytes(), "qualifierB".getBytes(), Bytes.toBytes("2222")),
+ new KeyValue(row, "familyC".getBytes(), "qualifierC".getBytes(), Bytes.toBytes("3333")));
+ when(result.listCells()).thenReturn(keyValues);
+ OutputCollector<ImmutableBytesWritable, Result> outputCollectorMock =
+ mock(OutputCollector.class);
+ gTableMap.map(null, result, outputCollectorMock, reporter);
+ verify(result).listCells();
+ verify(outputCollectorMock, times(1))
+ .collect(any(ImmutableBytesWritable.class), any(Result.class));
+ verifyNoMoreInteractions(outputCollectorMock);
+ } finally {
+ if (gTableMap != null)
+ gTableMap.close();
+ }
+ }
+
+ @Test
+ @SuppressWarnings({ "deprecation" })
+ public void shouldCreateNewKey() throws Exception {
+ GroupingTableMap gTableMap = null;
+ try {
+ Result result = mock(Result.class);
+ Reporter reporter = mock(Reporter.class);
+ final byte[] bSeparator = Bytes.toBytes(" ");
+ gTableMap = new GroupingTableMap();
+ Configuration cfg = new Configuration();
+ cfg.set(GroupingTableMap.GROUP_COLUMNS, "familyA:qualifierA familyB:qualifierB");
+ JobConf jobConf = new JobConf(cfg);
+ gTableMap.configure(jobConf);
+
+ final byte[] firstPartKeyValue = Bytes.toBytes("34879512738945");
+ final byte[] secondPartKeyValue = Bytes.toBytes("35245142671437");
+ byte[] row = {};
+ List<Cell> cells = ImmutableList.<Cell>of(
+ new KeyValue(row, "familyA".getBytes(), "qualifierA".getBytes(), firstPartKeyValue),
+ new KeyValue(row, "familyB".getBytes(), "qualifierB".getBytes(), secondPartKeyValue));
+ when(result.listCells()).thenReturn(cells);
+
+ final AtomicBoolean outputCollected = new AtomicBoolean();
+ OutputCollector<ImmutableBytesWritable, Result> outputCollector =
+ new OutputCollector<ImmutableBytesWritable, Result>() {
+ @Override
+ public void collect(ImmutableBytesWritable arg, Result result) throws IOException {
+ assertArrayEquals(org.apache.hadoop.hbase.shaded.com.google.common.primitives.
+ Bytes.concat(firstPartKeyValue, bSeparator,
+ secondPartKeyValue), arg.copyBytes());
+ outputCollected.set(true);
+ }
+ };
+
+ gTableMap.map(null, result, outputCollector, reporter);
+ verify(result).listCells();
+ Assert.assertTrue("Output not received", outputCollected.get());
+
+ final byte[] firstPartValue = Bytes.toBytes("238947928");
+ final byte[] secondPartValue = Bytes.toBytes("4678456942345");
+ byte[][] data = { firstPartValue, secondPartValue };
+ ImmutableBytesWritable byteWritable = gTableMap.createGroupKey(data);
+ assertArrayEquals(org.apache.hadoop.hbase.shaded.com.google.common.primitives.
+ Bytes.concat(firstPartValue,
+ bSeparator, secondPartValue), byteWritable.get());
+ } finally {
+ if (gTableMap != null)
+ gTableMap.close();
+ }
+ }
+
+ @Test
+ @SuppressWarnings({ "deprecation" })
+ public void shouldReturnNullFromCreateGroupKey() throws Exception {
+ GroupingTableMap gTableMap = null;
+ try {
+ gTableMap = new GroupingTableMap();
+ assertNull(gTableMap.createGroupKey(null));
+ } finally {
+ if(gTableMap != null)
+ gTableMap.close();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapred/TestIdentityTableMap.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapred/TestIdentityTableMap.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapred/TestIdentityTableMap.java
new file mode 100644
index 0000000..e222d0b
--- /dev/null
+++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapred/TestIdentityTableMap.java
@@ -0,0 +1,64 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapred;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.testclassification.MapReduceTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.mockito.Mockito;
+
+@Category({MapReduceTests.class, SmallTests.class})
+public class TestIdentityTableMap {
+
+ @Test
+ @SuppressWarnings({ "deprecation", "unchecked" })
+ public void shouldCollectPredefinedTimes() throws IOException {
+ int recordNumber = 999;
+ Result resultMock = mock(Result.class);
+ IdentityTableMap identityTableMap = null;
+ try {
+ Reporter reporterMock = mock(Reporter.class);
+ identityTableMap = new IdentityTableMap();
+ ImmutableBytesWritable bytesWritableMock = mock(ImmutableBytesWritable.class);
+ OutputCollector<ImmutableBytesWritable, Result> outputCollectorMock =
+ mock(OutputCollector.class);
+
+ for (int i = 0; i < recordNumber; i++)
+ identityTableMap.map(bytesWritableMock, resultMock, outputCollectorMock,
+ reporterMock);
+
+ verify(outputCollectorMock, times(recordNumber)).collect(
+ Mockito.any(ImmutableBytesWritable.class), Mockito.any(Result.class));
+ } finally {
+ if (identityTableMap != null)
+ identityTableMap.close();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapred/TestMultiTableSnapshotInputFormat.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapred/TestMultiTableSnapshotInputFormat.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapred/TestMultiTableSnapshotInputFormat.java
new file mode 100644
index 0000000..665c547
--- /dev/null
+++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapred/TestMultiTableSnapshotInputFormat.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.mapred;
+
+import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.testclassification.VerySlowMapReduceTests;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.RunningJob;
+import org.junit.experimental.categories.Category;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+
+import static org.junit.Assert.assertTrue;
+
+@Category({ VerySlowMapReduceTests.class, LargeTests.class })
+public class TestMultiTableSnapshotInputFormat
+ extends org.apache.hadoop.hbase.mapreduce.TestMultiTableSnapshotInputFormat {
+
+ private static final Log LOG = LogFactory.getLog(TestMultiTableSnapshotInputFormat.class);
+
+ @Override
+ protected void runJob(String jobName, Configuration c, List<Scan> scans)
+ throws IOException, InterruptedException, ClassNotFoundException {
+ JobConf job = new JobConf(TEST_UTIL.getConfiguration());
+
+ job.setJobName(jobName);
+ job.setMapperClass(Mapper.class);
+ job.setReducerClass(Reducer.class);
+
+ TableMapReduceUtil.initMultiTableSnapshotMapperJob(getSnapshotScanMapping(scans), Mapper.class,
+ ImmutableBytesWritable.class, ImmutableBytesWritable.class, job, true, restoreDir);
+
+ TableMapReduceUtil.addDependencyJars(job);
+
+ job.setReducerClass(Reducer.class);
+ job.setNumReduceTasks(1); // one to get final "first" and "last" key
+ FileOutputFormat.setOutputPath(job, new Path(job.getJobName()));
+ LOG.info("Started " + job.getJobName());
+
+ RunningJob runningJob = JobClient.runJob(job);
+ runningJob.waitForCompletion();
+ assertTrue(runningJob.isSuccessful());
+ LOG.info("After map/reduce completion - job " + jobName);
+ }
+
+ public static class Mapper extends TestMultiTableSnapshotInputFormat.ScanMapper
+ implements TableMap<ImmutableBytesWritable, ImmutableBytesWritable> {
+
+ @Override
+ public void map(ImmutableBytesWritable key, Result value,
+ OutputCollector<ImmutableBytesWritable, ImmutableBytesWritable> outputCollector,
+ Reporter reporter) throws IOException {
+ makeAssertions(key, value);
+ outputCollector.collect(key, key);
+ }
+
+ /**
+ * Closes this stream and releases any system resources associated
+ * with it. If the stream is already closed then invoking this
+ * method has no effect.
+ *
+ * @throws IOException if an I/O error occurs
+ */
+ @Override
+ public void close() throws IOException {
+ }
+
+ @Override
+ public void configure(JobConf jobConf) {
+
+ }
+ }
+
+ public static class Reducer extends TestMultiTableSnapshotInputFormat.ScanReducer implements
+ org.apache.hadoop.mapred.Reducer<ImmutableBytesWritable, ImmutableBytesWritable,
+ NullWritable, NullWritable> {
+
+ private JobConf jobConf;
+
+ @Override
+ public void reduce(ImmutableBytesWritable key, Iterator<ImmutableBytesWritable> values,
+ OutputCollector<NullWritable, NullWritable> outputCollector, Reporter reporter)
+ throws IOException {
+ makeAssertions(key, Lists.newArrayList(values));
+ }
+
+ /**
+ * Closes this stream and releases any system resources associated
+ * with it. If the stream is already closed then invoking this
+ * method has no effect.
+ *
+ * @throws IOException if an I/O error occurs
+ */
+ @Override
+ public void close() throws IOException {
+ super.cleanup(this.jobConf);
+ }
+
+ @Override
+ public void configure(JobConf jobConf) {
+ this.jobConf = jobConf;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapred/TestRowCounter.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapred/TestRowCounter.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapred/TestRowCounter.java
new file mode 100644
index 0000000..4ebd8bf
--- /dev/null
+++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapred/TestRowCounter.java
@@ -0,0 +1,163 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapred;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.PrintStream;
+
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.testclassification.MapReduceTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapred.RowCounter.RowCounterMapper;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.mockito.Mockito;
+
+import org.apache.hadoop.hbase.shaded.com.google.common.base.Joiner;
+
+@Category({MapReduceTests.class, SmallTests.class})
+public class TestRowCounter {
+
+ @Test
+ @SuppressWarnings("deprecation")
+ public void shouldPrintUsage() throws Exception {
+ String expectedOutput = "rowcounter <outputdir> <tablename> <column1> [<column2>...]";
+ String result = new OutputReader(System.out) {
+ @Override
+ void doRead() {
+ assertEquals(-1, RowCounter.printUsage());
+ }
+ }.read();
+
+ assertTrue(result.startsWith(expectedOutput));
+ }
+
+ @Test
+ @SuppressWarnings("deprecation")
+ public void shouldExitAndPrintUsageSinceParameterNumberLessThanThree()
+ throws Exception {
+ final String[] args = new String[] { "one", "two" };
+ String line = "ERROR: Wrong number of parameters: " + args.length;
+ String result = new OutputReader(System.err) {
+ @Override
+ void doRead() throws Exception {
+ assertEquals(-1, new RowCounter().run(args));
+ }
+ }.read();
+
+ assertTrue(result.startsWith(line));
+ }
+
+ @Test
+ @SuppressWarnings({ "deprecation", "unchecked" })
+ public void shouldRegInReportEveryIncomingRow() throws IOException {
+ int iterationNumber = 999;
+ RowCounter.RowCounterMapper mapper = new RowCounter.RowCounterMapper();
+ Reporter reporter = mock(Reporter.class);
+ for (int i = 0; i < iterationNumber; i++)
+ mapper.map(mock(ImmutableBytesWritable.class), mock(Result.class),
+ mock(OutputCollector.class), reporter);
+
+ Mockito.verify(reporter, times(iterationNumber)).incrCounter(
+ any(Enum.class), anyInt());
+ }
+
+ @Test
+ @SuppressWarnings({ "deprecation" })
+ public void shouldCreateAndRunSubmittableJob() throws Exception {
+ RowCounter rCounter = new RowCounter();
+ rCounter.setConf(HBaseConfiguration.create());
+ String[] args = new String[] { "\temp", "tableA", "column1", "column2",
+ "column3" };
+ JobConf jobConfig = rCounter.createSubmittableJob(args);
+
+ assertNotNull(jobConfig);
+ assertEquals(0, jobConfig.getNumReduceTasks());
+ assertEquals("rowcounter", jobConfig.getJobName());
+ assertEquals(jobConfig.getMapOutputValueClass(), Result.class);
+ assertEquals(jobConfig.getMapperClass(), RowCounterMapper.class);
+ assertEquals(jobConfig.get(TableInputFormat.COLUMN_LIST), Joiner.on(' ')
+ .join("column1", "column2", "column3"));
+ assertEquals(jobConfig.getMapOutputKeyClass(), ImmutableBytesWritable.class);
+ }
+
+ enum Outs {
+ OUT, ERR
+ }
+
+ private static abstract class OutputReader {
+ private final PrintStream ps;
+ private PrintStream oldPrintStream;
+ private Outs outs;
+
+ protected OutputReader(PrintStream ps) {
+ this.ps = ps;
+ }
+
+ protected String read() throws Exception {
+ ByteArrayOutputStream outBytes = new ByteArrayOutputStream();
+ if (ps == System.out) {
+ oldPrintStream = System.out;
+ outs = Outs.OUT;
+ System.setOut(new PrintStream(outBytes));
+ } else if (ps == System.err) {
+ oldPrintStream = System.err;
+ outs = Outs.ERR;
+ System.setErr(new PrintStream(outBytes));
+ } else {
+ throw new IllegalStateException("OutputReader: unsupported PrintStream");
+ }
+
+ try {
+ doRead();
+ return new String(outBytes.toByteArray());
+ } finally {
+ switch (outs) {
+ case OUT: {
+ System.setOut(oldPrintStream);
+ break;
+ }
+ case ERR: {
+ System.setErr(oldPrintStream);
+ break;
+ }
+ default:
+ throw new IllegalStateException(
+ "OutputReader: unsupported PrintStream");
+ }
+ }
+ }
+
+ abstract void doRead() throws Exception;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapred/TestSplitTable.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapred/TestSplitTable.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapred/TestSplitTable.java
new file mode 100644
index 0000000..2655ac2
--- /dev/null
+++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapred/TestSplitTable.java
@@ -0,0 +1,116 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapred;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.testclassification.MapReduceTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+@Category({MapReduceTests.class, SmallTests.class})
+public class TestSplitTable {
+ @Rule
+ public TestName name = new TestName();
+
+ @Test
+ @SuppressWarnings("deprecation")
+ public void testSplitTableCompareTo() {
+ TableSplit aTableSplit = new TableSplit(Bytes.toBytes("tableA"),
+ Bytes.toBytes("aaa"), Bytes.toBytes("ddd"), "locationA");
+
+ TableSplit bTableSplit = new TableSplit(Bytes.toBytes("tableA"),
+ Bytes.toBytes("iii"), Bytes.toBytes("kkk"), "locationA");
+
+ TableSplit cTableSplit = new TableSplit(Bytes.toBytes("tableA"),
+ Bytes.toBytes("lll"), Bytes.toBytes("zzz"), "locationA");
+
+ assertTrue(aTableSplit.compareTo(aTableSplit) == 0);
+ assertTrue(bTableSplit.compareTo(bTableSplit) == 0);
+ assertTrue(cTableSplit.compareTo(cTableSplit) == 0);
+
+ assertTrue(aTableSplit.compareTo(bTableSplit) < 0);
+ assertTrue(bTableSplit.compareTo(aTableSplit) > 0);
+
+ assertTrue(aTableSplit.compareTo(cTableSplit) < 0);
+ assertTrue(cTableSplit.compareTo(aTableSplit) > 0);
+
+ assertTrue(bTableSplit.compareTo(cTableSplit) < 0);
+ assertTrue(cTableSplit.compareTo(bTableSplit) > 0);
+
+ assertTrue(cTableSplit.compareTo(aTableSplit) > 0);
+ }
+
+ @Test
+ @SuppressWarnings("deprecation")
+ public void testSplitTableEquals() {
+ byte[] tableA = Bytes.toBytes("tableA");
+ byte[] aaa = Bytes.toBytes("aaa");
+ byte[] ddd = Bytes.toBytes("ddd");
+ String locationA = "locationA";
+
+ TableSplit tablesplit = new TableSplit(tableA, aaa, ddd, locationA);
+
+ TableSplit tableB = new TableSplit(Bytes.toBytes("tableB"), aaa, ddd, locationA);
+ assertNotEquals(tablesplit.hashCode(), tableB.hashCode());
+ assertNotEquals(tablesplit, tableB);
+
+ TableSplit startBbb = new TableSplit(tableA, Bytes.toBytes("bbb"), ddd, locationA);
+ assertNotEquals(tablesplit.hashCode(), startBbb.hashCode());
+ assertNotEquals(tablesplit, startBbb);
+
+ TableSplit endEee = new TableSplit(tableA, aaa, Bytes.toBytes("eee"), locationA);
+ assertNotEquals(tablesplit.hashCode(), endEee.hashCode());
+ assertNotEquals(tablesplit, endEee);
+
+ TableSplit locationB = new TableSplit(tableA, aaa, ddd, "locationB");
+ assertNotEquals(tablesplit.hashCode(), locationB.hashCode());
+ assertNotEquals(tablesplit, locationB);
+
+ TableSplit same = new TableSplit(tableA, aaa, ddd, locationA);
+ assertEquals(tablesplit.hashCode(), same.hashCode());
+ assertEquals(tablesplit, same);
+ }
+
+ @Test
+ @SuppressWarnings("deprecation")
+ public void testToString() {
+ TableSplit split =
+ new TableSplit(TableName.valueOf(name.getMethodName()), "row-start".getBytes(), "row-end".getBytes(),
+ "location");
+ String str =
+ "HBase table split(table name: " + name.getMethodName() + ", start row: row-start, "
+ + "end row: row-end, region location: location)";
+ Assert.assertEquals(str, split.toString());
+
+ split = new TableSplit((TableName) null, null, null, null);
+ str =
+ "HBase table split(table name: null, start row: null, "
+ + "end row: null, region location: null)";
+ Assert.assertEquals(str, split.toString());
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapred/TestTableInputFormat.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapred/TestTableInputFormat.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapred/TestTableInputFormat.java
new file mode 100644
index 0000000..f39a7f5
--- /dev/null
+++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapred/TestTableInputFormat.java
@@ -0,0 +1,460 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapred;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.anyObject;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.*;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.RegexStringComparator;
+import org.apache.hadoop.hbase.filter.RowFilter;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.testclassification.MapReduceTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobConfigurable;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.RunningJob;
+import org.apache.hadoop.mapred.lib.NullOutputFormat;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+/**
+ * This tests the TableInputFormat and its recovery semantics
+ */
+@Category({MapReduceTests.class, LargeTests.class})
+public class TestTableInputFormat {
+
+ private static final Log LOG = LogFactory.getLog(TestTableInputFormat.class);
+
+ private final static HBaseTestingUtility UTIL = new HBaseTestingUtility();
+
+ static final byte[] FAMILY = Bytes.toBytes("family");
+
+ private static final byte[][] columns = new byte[][] { FAMILY };
+
+ @BeforeClass
+ public static void beforeClass() throws Exception {
+ UTIL.startMiniCluster();
+ }
+
+ @AfterClass
+ public static void afterClass() throws Exception {
+ UTIL.shutdownMiniCluster();
+ }
+
+ @Before
+ public void before() throws IOException {
+ LOG.info("before");
+ UTIL.ensureSomeRegionServersAvailable(1);
+ LOG.info("before done");
+ }
+
+ /**
+ * Setup a table with two rows and values.
+ *
+ * @param tableName
+ * @return
+ * @throws IOException
+ */
+ public static Table createTable(byte[] tableName) throws IOException {
+ return createTable(tableName, new byte[][] { FAMILY });
+ }
+
+ /**
+ * Setup a table with two rows and values per column family.
+ *
+ * @param tableName
+ * @return
+ * @throws IOException
+ */
+ public static Table createTable(byte[] tableName, byte[][] families) throws IOException {
+ Table table = UTIL.createTable(TableName.valueOf(tableName), families);
+ Put p = new Put("aaa".getBytes());
+ for (byte[] family : families) {
+ p.addColumn(family, null, "value aaa".getBytes());
+ }
+ table.put(p);
+ p = new Put("bbb".getBytes());
+ for (byte[] family : families) {
+ p.addColumn(family, null, "value bbb".getBytes());
+ }
+ table.put(p);
+ return table;
+ }
+
+ /**
+ * Verify that the result and key have expected values.
+ *
+ * @param r
+ * @param key
+ * @param expectedKey
+ * @param expectedValue
+ * @return
+ */
+ static boolean checkResult(Result r, ImmutableBytesWritable key,
+ byte[] expectedKey, byte[] expectedValue) {
+ assertEquals(0, key.compareTo(expectedKey));
+ Map<byte[], byte[]> vals = r.getFamilyMap(FAMILY);
+ byte[] value = vals.values().iterator().next();
+ assertTrue(Arrays.equals(value, expectedValue));
+ return true; // if succeed
+ }
+
+ /**
+ * Create table data and run tests on specified htable using the
+ * o.a.h.hbase.mapred API.
+ *
+ * @param table
+ * @throws IOException
+ */
+ static void runTestMapred(Table table) throws IOException {
+ org.apache.hadoop.hbase.mapred.TableRecordReader trr =
+ new org.apache.hadoop.hbase.mapred.TableRecordReader();
+ trr.setStartRow("aaa".getBytes());
+ trr.setEndRow("zzz".getBytes());
+ trr.setHTable(table);
+ trr.setInputColumns(columns);
+
+ trr.init();
+ Result r = new Result();
+ ImmutableBytesWritable key = new ImmutableBytesWritable();
+
+ boolean more = trr.next(key, r);
+ assertTrue(more);
+ checkResult(r, key, "aaa".getBytes(), "value aaa".getBytes());
+
+ more = trr.next(key, r);
+ assertTrue(more);
+ checkResult(r, key, "bbb".getBytes(), "value bbb".getBytes());
+
+ // no more data
+ more = trr.next(key, r);
+ assertFalse(more);
+ }
+
+ /**
+ * Create a table that IOE's on first scanner next call
+ *
+ * @throws IOException
+ */
+ static Table createIOEScannerTable(byte[] name, final int failCnt)
+ throws IOException {
+ // build up a mock scanner stuff to fail the first time
+ Answer<ResultScanner> a = new Answer<ResultScanner>() {
+ int cnt = 0;
+
+ @Override
+ public ResultScanner answer(InvocationOnMock invocation) throws Throwable {
+ // first invocation return the busted mock scanner
+ if (cnt++ < failCnt) {
+ // create mock ResultScanner that always fails.
+ Scan scan = mock(Scan.class);
+ doReturn("bogus".getBytes()).when(scan).getStartRow(); // avoid npe
+ ResultScanner scanner = mock(ResultScanner.class);
+ // simulate TimeoutException / IOException
+ doThrow(new IOException("Injected exception")).when(scanner).next();
+ return scanner;
+ }
+
+ // otherwise return the real scanner.
+ return (ResultScanner) invocation.callRealMethod();
+ }
+ };
+
+ Table htable = spy(createTable(name));
+ doAnswer(a).when(htable).getScanner((Scan) anyObject());
+ return htable;
+ }
+
+ /**
+ * Create a table that throws a DoNoRetryIOException on first scanner next
+ * call
+ *
+ * @throws IOException
+ */
+ static Table createDNRIOEScannerTable(byte[] name, final int failCnt)
+ throws IOException {
+ // build up a mock scanner stuff to fail the first time
+ Answer<ResultScanner> a = new Answer<ResultScanner>() {
+ int cnt = 0;
+
+ @Override
+ public ResultScanner answer(InvocationOnMock invocation) throws Throwable {
+ // first invocation return the busted mock scanner
+ if (cnt++ < failCnt) {
+ // create mock ResultScanner that always fails.
+ Scan scan = mock(Scan.class);
+ doReturn("bogus".getBytes()).when(scan).getStartRow(); // avoid npe
+ ResultScanner scanner = mock(ResultScanner.class);
+
+ invocation.callRealMethod(); // simulate NotServingRegionException
+ doThrow(
+ new NotServingRegionException("Injected simulated TimeoutException"))
+ .when(scanner).next();
+ return scanner;
+ }
+
+ // otherwise return the real scanner.
+ return (ResultScanner) invocation.callRealMethod();
+ }
+ };
+
+ Table htable = spy(createTable(name));
+ doAnswer(a).when(htable).getScanner((Scan) anyObject());
+ return htable;
+ }
+
+ /**
+ * Run test assuming no errors using mapred api.
+ *
+ * @throws IOException
+ */
+ @Test
+ public void testTableRecordReader() throws IOException {
+ Table table = createTable("table1".getBytes());
+ runTestMapred(table);
+ }
+
+ /**
+ * Run test assuming Scanner IOException failure using mapred api,
+ *
+ * @throws IOException
+ */
+ @Test
+ public void testTableRecordReaderScannerFail() throws IOException {
+ Table htable = createIOEScannerTable("table2".getBytes(), 1);
+ runTestMapred(htable);
+ }
+
+ /**
+ * Run test assuming Scanner IOException failure using mapred api,
+ *
+ * @throws IOException
+ */
+ @Test(expected = IOException.class)
+ public void testTableRecordReaderScannerFailTwice() throws IOException {
+ Table htable = createIOEScannerTable("table3".getBytes(), 2);
+ runTestMapred(htable);
+ }
+
+ /**
+ * Run test assuming NotServingRegionException using mapred api.
+ *
+ * @throws org.apache.hadoop.hbase.DoNotRetryIOException
+ */
+ @Test
+ public void testTableRecordReaderScannerTimeout() throws IOException {
+ Table htable = createDNRIOEScannerTable("table4".getBytes(), 1);
+ runTestMapred(htable);
+ }
+
+ /**
+ * Run test assuming NotServingRegionException using mapred api.
+ *
+ * @throws org.apache.hadoop.hbase.DoNotRetryIOException
+ */
+ @Test(expected = org.apache.hadoop.hbase.NotServingRegionException.class)
+ public void testTableRecordReaderScannerTimeoutTwice() throws IOException {
+ Table htable = createDNRIOEScannerTable("table5".getBytes(), 2);
+ runTestMapred(htable);
+ }
+
+ /**
+ * Verify the example we present in javadocs on TableInputFormatBase
+ */
+ @Test
+ public void testExtensionOfTableInputFormatBase() throws IOException {
+ LOG.info("testing use of an InputFormat taht extends InputFormatBase");
+ final Table table = createTable(Bytes.toBytes("exampleTable"),
+ new byte[][] { Bytes.toBytes("columnA"), Bytes.toBytes("columnB") });
+ testInputFormat(ExampleTIF.class);
+ }
+
+ @Test
+ public void testDeprecatedExtensionOfTableInputFormatBase() throws IOException {
+ LOG.info("testing use of an InputFormat taht extends InputFormatBase, "
+ + "as it was given in 0.98.");
+ final Table table = createTable(Bytes.toBytes("exampleDeprecatedTable"),
+ new byte[][] { Bytes.toBytes("columnA"), Bytes.toBytes("columnB") });
+ testInputFormat(ExampleDeprecatedTIF.class);
+ }
+
+ @Test
+ public void testJobConfigurableExtensionOfTableInputFormatBase() throws IOException {
+ LOG.info("testing use of an InputFormat taht extends InputFormatBase, "
+ + "using JobConfigurable.");
+ final Table table = createTable(Bytes.toBytes("exampleJobConfigurableTable"),
+ new byte[][] { Bytes.toBytes("columnA"), Bytes.toBytes("columnB") });
+ testInputFormat(ExampleJobConfigurableTIF.class);
+ }
+
+ void testInputFormat(Class<? extends InputFormat> clazz) throws IOException {
+ Configuration conf = UTIL.getConfiguration();
+ final JobConf job = new JobConf(conf);
+ job.setInputFormat(clazz);
+ job.setOutputFormat(NullOutputFormat.class);
+ job.setMapperClass(ExampleVerifier.class);
+ job.setNumReduceTasks(0);
+ LOG.debug("submitting job.");
+ final RunningJob run = JobClient.runJob(job);
+ assertTrue("job failed!", run.isSuccessful());
+ assertEquals("Saw the wrong number of instances of the filtered-for row.", 2, run.getCounters()
+ .findCounter(TestTableInputFormat.class.getName() + ":row", "aaa").getCounter());
+ assertEquals("Saw any instances of the filtered out row.", 0, run.getCounters()
+ .findCounter(TestTableInputFormat.class.getName() + ":row", "bbb").getCounter());
+ assertEquals("Saw the wrong number of instances of columnA.", 1, run.getCounters()
+ .findCounter(TestTableInputFormat.class.getName() + ":family", "columnA").getCounter());
+ assertEquals("Saw the wrong number of instances of columnB.", 1, run.getCounters()
+ .findCounter(TestTableInputFormat.class.getName() + ":family", "columnB").getCounter());
+ assertEquals("Saw the wrong count of values for the filtered-for row.", 2, run.getCounters()
+ .findCounter(TestTableInputFormat.class.getName() + ":value", "value aaa").getCounter());
+ assertEquals("Saw the wrong count of values for the filtered-out row.", 0, run.getCounters()
+ .findCounter(TestTableInputFormat.class.getName() + ":value", "value bbb").getCounter());
+ }
+
+ public static class ExampleVerifier implements TableMap<NullWritable, NullWritable> {
+
+ @Override
+ public void configure(JobConf conf) {
+ }
+
+ @Override
+ public void map(ImmutableBytesWritable key, Result value,
+ OutputCollector<NullWritable,NullWritable> output,
+ Reporter reporter) throws IOException {
+ for (Cell cell : value.listCells()) {
+ reporter.getCounter(TestTableInputFormat.class.getName() + ":row",
+ Bytes.toString(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()))
+ .increment(1l);
+ reporter.getCounter(TestTableInputFormat.class.getName() + ":family",
+ Bytes.toString(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength()))
+ .increment(1l);
+ reporter.getCounter(TestTableInputFormat.class.getName() + ":value",
+ Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()))
+ .increment(1l);
+ }
+ }
+
+ @Override
+ public void close() {
+ }
+
+ }
+
+ public static class ExampleDeprecatedTIF extends TableInputFormatBase implements JobConfigurable {
+
+ @Override
+ public void configure(JobConf job) {
+ try {
+ Connection connection = ConnectionFactory.createConnection(job);
+ Table exampleTable = connection.getTable(TableName.valueOf("exampleDeprecatedTable"));
+ // mandatory
+ initializeTable(connection, exampleTable.getName());
+ byte[][] inputColumns = new byte [][] { Bytes.toBytes("columnA"),
+ Bytes.toBytes("columnB") };
+ // mandatory
+ setInputColumns(inputColumns);
+ Filter exampleFilter = new RowFilter(CompareOp.EQUAL, new RegexStringComparator("aa.*"));
+ // optional
+ setRowFilter(exampleFilter);
+ } catch (IOException exception) {
+ throw new RuntimeException("Failed to configure for job.", exception);
+ }
+ }
+
+ }
+
+ public static class ExampleJobConfigurableTIF extends ExampleTIF implements JobConfigurable {
+
+ @Override
+ public void configure(JobConf job) {
+ try {
+ initialize(job);
+ } catch (IOException exception) {
+ throw new RuntimeException("Failed to initialize.", exception);
+ }
+ }
+
+ @Override
+ protected void initialize(JobConf job) throws IOException {
+ initialize(job, "exampleJobConfigurableTable");
+ }
+ }
+
+
+ public static class ExampleTIF extends TableInputFormatBase {
+
+ @Override
+ protected void initialize(JobConf job) throws IOException {
+ initialize(job, "exampleTable");
+ }
+
+ protected void initialize(JobConf job, String table) throws IOException {
+ Connection connection = ConnectionFactory.createConnection(HBaseConfiguration.create(job));
+ TableName tableName = TableName.valueOf(table);
+ // mandatory
+ initializeTable(connection, tableName);
+ byte[][] inputColumns = new byte [][] { Bytes.toBytes("columnA"),
+ Bytes.toBytes("columnB") };
+ // mandatory
+ setInputColumns(inputColumns);
+ Filter exampleFilter = new RowFilter(CompareOp.EQUAL, new RegexStringComparator("aa.*"));
+ // optional
+ setRowFilter(exampleFilter);
+ }
+
+ }
+
+}
+
http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapred/TestTableMapReduce.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapred/TestTableMapReduce.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapred/TestTableMapReduce.java
new file mode 100644
index 0000000..3f905cf
--- /dev/null
+++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapred/TestTableMapReduce.java
@@ -0,0 +1,103 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapred;
+
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.testclassification.MapReduceTests;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.TestTableMapReduceBase;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.RunningJob;
+import org.junit.experimental.categories.Category;
+
+/**
+ * Test Map/Reduce job over HBase tables. The map/reduce process we're testing
+ * on our tables is simple - take every row in the table, reverse the value of
+ * a particular cell, and write it back to the table.
+ */
+@Category({MapReduceTests.class, LargeTests.class})
+@SuppressWarnings("deprecation")
+public class TestTableMapReduce extends TestTableMapReduceBase {
+ private static final Log LOG =
+ LogFactory.getLog(TestTableMapReduce.class.getName());
+
+ protected Log getLog() { return LOG; }
+
+ /**
+ * Pass the given key and processed record reduce
+ */
+ static class ProcessContentsMapper extends MapReduceBase implements
+ TableMap<ImmutableBytesWritable, Put> {
+
+ /**
+ * Pass the key, and reversed value to reduce
+ */
+ public void map(ImmutableBytesWritable key, Result value,
+ OutputCollector<ImmutableBytesWritable, Put> output,
+ Reporter reporter)
+ throws IOException {
+ output.collect(key, TestTableMapReduceBase.map(key, value));
+ }
+ }
+
+ @Override
+ protected void runTestOnTable(Table table) throws IOException {
+ JobConf jobConf = null;
+ try {
+ LOG.info("Before map/reduce startup");
+ jobConf = new JobConf(UTIL.getConfiguration(), TestTableMapReduce.class);
+ jobConf.setJobName("process column contents");
+ jobConf.setNumReduceTasks(1);
+ TableMapReduceUtil.initTableMapJob(table.getName().getNameAsString(),
+ Bytes.toString(INPUT_FAMILY), ProcessContentsMapper.class,
+ ImmutableBytesWritable.class, Put.class, jobConf);
+ TableMapReduceUtil.initTableReduceJob(table.getName().getNameAsString(),
+ IdentityTableReduce.class, jobConf);
+
+ LOG.info("Started " + table.getName());
+ RunningJob job = JobClient.runJob(jobConf);
+ assertTrue(job.isSuccessful());
+ LOG.info("After map/reduce completion");
+
+ // verify map-reduce results
+ verify(table.getName());
+ } finally {
+ if (jobConf != null) {
+ FileUtil.fullyDelete(new File(jobConf.get("hadoop.tmp.dir")));
+ }
+ }
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapred/TestTableMapReduceUtil.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapred/TestTableMapReduceUtil.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapred/TestTableMapReduceUtil.java
new file mode 100644
index 0000000..ac2f20d
--- /dev/null
+++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapred/TestTableMapReduceUtil.java
@@ -0,0 +1,272 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapred;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.testclassification.MapReduceTests;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.RunningJob;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.hadoop.hbase.shaded.com.google.common.collect.ImmutableMap;
+import org.apache.hadoop.hbase.shaded.com.google.common.collect.ImmutableSet;
+
+@Category({MapReduceTests.class, LargeTests.class})
+public class TestTableMapReduceUtil {
+
+ private static final Log LOG = LogFactory
+ .getLog(TestTableMapReduceUtil.class);
+
+ private static Table presidentsTable;
+ private static final String TABLE_NAME = "People";
+
+ private static final byte[] COLUMN_FAMILY = Bytes.toBytes("info");
+ private static final byte[] COLUMN_QUALIFIER = Bytes.toBytes("name");
+
+ private static ImmutableSet<String> presidentsRowKeys = ImmutableSet.of(
+ "president1", "president2", "president3");
+ private static Iterator<String> presidentNames = ImmutableSet.of(
+ "John F. Kennedy", "George W. Bush", "Barack Obama").iterator();
+
+ private static ImmutableSet<String> actorsRowKeys = ImmutableSet.of("actor1",
+ "actor2");
+ private static Iterator<String> actorNames = ImmutableSet.of(
+ "Jack Nicholson", "Martin Freeman").iterator();
+
+ private static String PRESIDENT_PATTERN = "president";
+ private static String ACTOR_PATTERN = "actor";
+ private static ImmutableMap<String, ImmutableSet<String>> relation = ImmutableMap
+ .of(PRESIDENT_PATTERN, presidentsRowKeys, ACTOR_PATTERN, actorsRowKeys);
+
+ private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
+
+ @BeforeClass
+ public static void beforeClass() throws Exception {
+ UTIL.startMiniCluster();
+ presidentsTable = createAndFillTable(TableName.valueOf(TABLE_NAME));
+ }
+
+ @AfterClass
+ public static void afterClass() throws Exception {
+ UTIL.shutdownMiniCluster();
+ }
+
+ @Before
+ public void before() throws IOException {
+ LOG.info("before");
+ UTIL.ensureSomeRegionServersAvailable(1);
+ LOG.info("before done");
+ }
+
+ public static Table createAndFillTable(TableName tableName) throws IOException {
+ Table table = UTIL.createTable(tableName, COLUMN_FAMILY);
+ createPutCommand(table);
+ return table;
+ }
+
+ private static void createPutCommand(Table table) throws IOException {
+ for (String president : presidentsRowKeys) {
+ if (presidentNames.hasNext()) {
+ Put p = new Put(Bytes.toBytes(president));
+ p.addColumn(COLUMN_FAMILY, COLUMN_QUALIFIER, Bytes.toBytes(presidentNames.next()));
+ table.put(p);
+ }
+ }
+
+ for (String actor : actorsRowKeys) {
+ if (actorNames.hasNext()) {
+ Put p = new Put(Bytes.toBytes(actor));
+ p.addColumn(COLUMN_FAMILY, COLUMN_QUALIFIER, Bytes.toBytes(actorNames.next()));
+ table.put(p);
+ }
+ }
+ }
+
+ /**
+ * Check what the given number of reduce tasks for the given job configuration
+ * does not exceed the number of regions for the given table.
+ */
+ @Test
+ public void shouldNumberOfReduceTaskNotExceedNumberOfRegionsForGivenTable()
+ throws IOException {
+ Assert.assertNotNull(presidentsTable);
+ Configuration cfg = UTIL.getConfiguration();
+ JobConf jobConf = new JobConf(cfg);
+ TableMapReduceUtil.setNumReduceTasks(TABLE_NAME, jobConf);
+ TableMapReduceUtil.limitNumReduceTasks(TABLE_NAME, jobConf);
+ TableMapReduceUtil.setScannerCaching(jobConf, 100);
+ assertEquals(1, jobConf.getNumReduceTasks());
+ assertEquals(100, jobConf.getInt("hbase.client.scanner.caching", 0));
+
+ jobConf.setNumReduceTasks(10);
+ TableMapReduceUtil.setNumMapTasks(TABLE_NAME, jobConf);
+ TableMapReduceUtil.limitNumReduceTasks(TABLE_NAME, jobConf);
+ assertEquals(1, jobConf.getNumReduceTasks());
+ }
+
+ @Test
+ public void shouldNumberOfMapTaskNotExceedNumberOfRegionsForGivenTable()
+ throws IOException {
+ Configuration cfg = UTIL.getConfiguration();
+ JobConf jobConf = new JobConf(cfg);
+ TableMapReduceUtil.setNumReduceTasks(TABLE_NAME, jobConf);
+ TableMapReduceUtil.limitNumMapTasks(TABLE_NAME, jobConf);
+ assertEquals(1, jobConf.getNumMapTasks());
+
+ jobConf.setNumMapTasks(10);
+ TableMapReduceUtil.setNumMapTasks(TABLE_NAME, jobConf);
+ TableMapReduceUtil.limitNumMapTasks(TABLE_NAME, jobConf);
+ assertEquals(1, jobConf.getNumMapTasks());
+ }
+
+ @Test
+ @SuppressWarnings("deprecation")
+ public void shoudBeValidMapReduceEvaluation() throws Exception {
+ Configuration cfg = UTIL.getConfiguration();
+ JobConf jobConf = new JobConf(cfg);
+ try {
+ jobConf.setJobName("process row task");
+ jobConf.setNumReduceTasks(1);
+ TableMapReduceUtil.initTableMapJob(TABLE_NAME, new String(COLUMN_FAMILY),
+ ClassificatorMapper.class, ImmutableBytesWritable.class, Put.class,
+ jobConf);
+ TableMapReduceUtil.initTableReduceJob(TABLE_NAME,
+ ClassificatorRowReduce.class, jobConf);
+ RunningJob job = JobClient.runJob(jobConf);
+ assertTrue(job.isSuccessful());
+ } finally {
+ if (jobConf != null)
+ FileUtil.fullyDelete(new File(jobConf.get("hadoop.tmp.dir")));
+ }
+ }
+
+ @Test
+ @SuppressWarnings("deprecation")
+ public void shoudBeValidMapReduceWithPartitionerEvaluation()
+ throws IOException {
+ Configuration cfg = UTIL.getConfiguration();
+ JobConf jobConf = new JobConf(cfg);
+ try {
+ jobConf.setJobName("process row task");
+ jobConf.setNumReduceTasks(2);
+ TableMapReduceUtil.initTableMapJob(TABLE_NAME, new String(COLUMN_FAMILY),
+ ClassificatorMapper.class, ImmutableBytesWritable.class, Put.class,
+ jobConf);
+
+ TableMapReduceUtil.initTableReduceJob(TABLE_NAME,
+ ClassificatorRowReduce.class, jobConf, HRegionPartitioner.class);
+ RunningJob job = JobClient.runJob(jobConf);
+ assertTrue(job.isSuccessful());
+ } finally {
+ if (jobConf != null)
+ FileUtil.fullyDelete(new File(jobConf.get("hadoop.tmp.dir")));
+ }
+ }
+
+ @SuppressWarnings("deprecation")
+ static class ClassificatorRowReduce extends MapReduceBase implements
+ TableReduce<ImmutableBytesWritable, Put> {
+
+ @Override
+ public void reduce(ImmutableBytesWritable key, Iterator<Put> values,
+ OutputCollector<ImmutableBytesWritable, Put> output, Reporter reporter)
+ throws IOException {
+ String strKey = Bytes.toString(key.get());
+ List<Put> result = new ArrayList<>();
+ while (values.hasNext())
+ result.add(values.next());
+
+ if (relation.keySet().contains(strKey)) {
+ Set<String> set = relation.get(strKey);
+ if (set != null) {
+ assertEquals(set.size(), result.size());
+ } else {
+ throwAccertionError("Test infrastructure error: set is null");
+ }
+ } else {
+ throwAccertionError("Test infrastructure error: key not found in map");
+ }
+ }
+
+ private void throwAccertionError(String errorMessage) throws AssertionError {
+ throw new AssertionError(errorMessage);
+ }
+ }
+
+ @SuppressWarnings("deprecation")
+ static class ClassificatorMapper extends MapReduceBase implements
+ TableMap<ImmutableBytesWritable, Put> {
+
+ @Override
+ public void map(ImmutableBytesWritable row, Result result,
+ OutputCollector<ImmutableBytesWritable, Put> outCollector,
+ Reporter reporter) throws IOException {
+ String rowKey = Bytes.toString(result.getRow());
+ final ImmutableBytesWritable pKey = new ImmutableBytesWritable(
+ Bytes.toBytes(PRESIDENT_PATTERN));
+ final ImmutableBytesWritable aKey = new ImmutableBytesWritable(
+ Bytes.toBytes(ACTOR_PATTERN));
+ ImmutableBytesWritable outKey = null;
+
+ if (rowKey.startsWith(PRESIDENT_PATTERN)) {
+ outKey = pKey;
+ } else if (rowKey.startsWith(ACTOR_PATTERN)) {
+ outKey = aKey;
+ } else {
+ throw new AssertionError("unexpected rowKey");
+ }
+
+ String name = Bytes.toString(result.getValue(COLUMN_FAMILY,
+ COLUMN_QUALIFIER));
+ outCollector.collect(outKey,
+ new Put(Bytes.toBytes("rowKey2"))
+ .addColumn(COLUMN_FAMILY, COLUMN_QUALIFIER, Bytes.toBytes(name)));
+ }
+ }
+}