You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by dd...@apache.org on 2013/10/31 19:08:23 UTC
svn commit: r1537567 - in /hbase/trunk/hbase-server/src:
main/java/org/apache/hadoop/hbase/mapred/
test/java/org/apache/hadoop/hbase/mapred/
Author: ddas
Date: Thu Oct 31 18:08:22 2013
New Revision: 1537567
URL: http://svn.apache.org/r1537567
Log:
HBASE-8611. Improve test coverage in pkg org.apache.hadoop.hbase.mapred
Added:
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapred/TestDriver.java
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapred/TestGroupingTableMap.java
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapred/TestIdentityTableMap.java
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapred/TestRowCounter.java
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapred/TestSplitTable.java
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapred/TestTableMapReduceUtil.java
Modified:
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/Driver.java
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/Driver.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/Driver.java?rev=1537567&r1=1537566&r2=1537567&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/Driver.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/Driver.java Thu Oct 31 18:08:22 2013
@@ -21,24 +21,31 @@ package org.apache.hadoop.hbase.mapred;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.util.ProgramDriver;
+import com.google.common.annotations.VisibleForTesting;
/**
- * Driver for hbase mapreduce jobs. Select which to run by passing
- * name of job to this main.
+ * Driver for hbase mapreduce jobs. Select which to run by passing name of job
+ * to this main.
*/
@Deprecated
@InterfaceAudience.Public
@InterfaceStability.Stable
public class Driver {
+
+ private static ProgramDriver pgd = new ProgramDriver();
+
+ @VisibleForTesting
+ static void setProgramDriver(ProgramDriver pgd0) {
+ pgd = pgd0;
+ }
+
/**
* @param args
* @throws Throwable
*/
public static void main(String[] args) throws Throwable {
- ProgramDriver pgd = new ProgramDriver();
- pgd.addClass(RowCounter.NAME, RowCounter.class,
- "Count rows in HBase table");
- ProgramDriver.class.getMethod("driver", new Class [] {String[].class}).
- invoke(pgd, new Object[]{args});
+ pgd.addClass(RowCounter.NAME, RowCounter.class, "Count rows in HBase table");
+ ProgramDriver.class.getMethod("driver", new Class[] { String[].class })
+ .invoke(pgd, new Object[] { args });
}
}
Added: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapred/TestDriver.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapred/TestDriver.java?rev=1537567&view=auto
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapred/TestDriver.java (added)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapred/TestDriver.java Thu Oct 31 18:08:22 2013
@@ -0,0 +1,41 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapred;
+
+import org.apache.hadoop.hbase.SmallTests;
+import org.apache.hadoop.util.ProgramDriver;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.mockito.Mockito;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+
+@Category(SmallTests.class)
+public class TestDriver {
+
+ @Test
+ @SuppressWarnings("deprecation")
+ public void testDriverMainMethod() throws Throwable {
+ ProgramDriver programDriverMock = mock(ProgramDriver.class);
+ Driver.setProgramDriver(programDriverMock);
+ Driver.main(new String[]{});
+ verify(programDriverMock).driver(Mockito.any(String[].class));
+ }
+}
Added: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapred/TestGroupingTableMap.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapred/TestGroupingTableMap.java?rev=1537567&view=auto
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapred/TestGroupingTableMap.java (added)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapred/TestGroupingTableMap.java Thu Oct 31 18:08:22 2013
@@ -0,0 +1,178 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapred;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertNull;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyZeroInteractions;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.SmallTests;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import com.google.common.collect.ImmutableList;
+
+@Category(SmallTests.class)
+public class TestGroupingTableMap {
+
+ @Test
+ @SuppressWarnings({ "deprecation", "unchecked" })
+ public void shouldNotCallCollectonSinceFindUniqueKeyValueMoreThanOnes()
+ throws Exception {
+ GroupingTableMap gTableMap = null;
+ try {
+ Result result = mock(Result.class);
+ Reporter reporter = mock(Reporter.class);
+ gTableMap = new GroupingTableMap();
+ Configuration cfg = new Configuration();
+ cfg.set(GroupingTableMap.GROUP_COLUMNS, "familyA:qualifierA familyB:qualifierB");
+ JobConf jobConf = new JobConf(cfg);
+ gTableMap.configure(jobConf);
+
+ byte[] row = {};
+ List<Cell> keyValues = ImmutableList.<Cell>of(
+ new KeyValue(row, "familyA".getBytes(), "qualifierA".getBytes(), Bytes.toBytes("1111")),
+ new KeyValue(row, "familyA".getBytes(), "qualifierA".getBytes(), Bytes.toBytes("2222")),
+ new KeyValue(row, "familyB".getBytes(), "qualifierB".getBytes(), Bytes.toBytes("3333")));
+ when(result.listCells()).thenReturn(keyValues);
+ OutputCollector<ImmutableBytesWritable, Result> outputCollectorMock =
+ mock(OutputCollector.class);
+ gTableMap.map(null, result, outputCollectorMock, reporter);
+ verify(result).listCells();
+ verifyZeroInteractions(outputCollectorMock);
+ } finally {
+ if (gTableMap != null)
+ gTableMap.close();
+ }
+ }
+
+ @Test
+ @SuppressWarnings({ "deprecation", "unchecked" })
+ public void shouldCreateNewKeyAlthoughExtraKey() throws Exception {
+ GroupingTableMap gTableMap = null;
+ try {
+ Result result = mock(Result.class);
+ Reporter reporter = mock(Reporter.class);
+ gTableMap = new GroupingTableMap();
+ Configuration cfg = new Configuration();
+ cfg.set(GroupingTableMap.GROUP_COLUMNS, "familyA:qualifierA familyB:qualifierB");
+ JobConf jobConf = new JobConf(cfg);
+ gTableMap.configure(jobConf);
+
+ byte[] row = {};
+ List<Cell> keyValues = ImmutableList.<Cell>of(
+ new KeyValue(row, "familyA".getBytes(), "qualifierA".getBytes(), Bytes.toBytes("1111")),
+ new KeyValue(row, "familyB".getBytes(), "qualifierB".getBytes(), Bytes.toBytes("2222")),
+ new KeyValue(row, "familyC".getBytes(), "qualifierC".getBytes(), Bytes.toBytes("3333")));
+ when(result.listCells()).thenReturn(keyValues);
+ OutputCollector<ImmutableBytesWritable, Result> outputCollectorMock =
+ mock(OutputCollector.class);
+ gTableMap.map(null, result, outputCollectorMock, reporter);
+ verify(result).listCells();
+ verify(outputCollectorMock, times(1))
+ .collect(any(ImmutableBytesWritable.class), any(Result.class));
+ verifyNoMoreInteractions(outputCollectorMock);
+ } finally {
+ if (gTableMap != null)
+ gTableMap.close();
+ }
+ }
+
+ @Test
+ @SuppressWarnings({ "deprecation" })
+ public void shouldCreateNewKey() throws Exception {
+ GroupingTableMap gTableMap = null;
+ try {
+ Result result = mock(Result.class);
+ Reporter reporter = mock(Reporter.class);
+ final byte[] bSeparator = Bytes.toBytes(" ");
+ gTableMap = new GroupingTableMap();
+ Configuration cfg = new Configuration();
+ cfg.set(GroupingTableMap.GROUP_COLUMNS, "familyA:qualifierA familyB:qualifierB");
+ JobConf jobConf = new JobConf(cfg);
+ gTableMap.configure(jobConf);
+
+ final byte[] firstPartKeyValue = Bytes.toBytes("34879512738945");
+ final byte[] secondPartKeyValue = Bytes.toBytes("35245142671437");
+ byte[] row = {};
+ List<Cell> cells = ImmutableList.<Cell>of(
+ new KeyValue(row, "familyA".getBytes(), "qualifierA".getBytes(), firstPartKeyValue),
+ new KeyValue(row, "familyB".getBytes(), "qualifierB".getBytes(), secondPartKeyValue));
+ when(result.listCells()).thenReturn(cells);
+
+ final AtomicBoolean outputCollected = new AtomicBoolean();
+ OutputCollector<ImmutableBytesWritable, Result> outputCollector =
+ new OutputCollector<ImmutableBytesWritable, Result>() {
+ @Override
+ public void collect(ImmutableBytesWritable arg, Result result) throws IOException {
+ assertArrayEquals(com.google.common.primitives.Bytes.concat(firstPartKeyValue, bSeparator,
+ secondPartKeyValue), arg.copyBytes());
+ outputCollected.set(true);
+ }
+ };
+
+ gTableMap.map(null, result, outputCollector, reporter);
+ verify(result).listCells();
+ Assert.assertTrue("Output not received", outputCollected.get());
+
+ final byte[] firstPartValue = Bytes.toBytes("238947928");
+ final byte[] secondPartValue = Bytes.toBytes("4678456942345");
+ byte[][] data = { firstPartValue, secondPartValue };
+ ImmutableBytesWritable byteWritable = gTableMap.createGroupKey(data);
+ assertArrayEquals(com.google.common.primitives.Bytes.concat(firstPartValue,
+ bSeparator, secondPartValue), byteWritable.get());
+ } finally {
+ if (gTableMap != null)
+ gTableMap.close();
+ }
+ }
+
+ @Test
+ @SuppressWarnings({ "deprecation" })
+ public void shouldReturnNullFromCreateGroupKey() throws Exception {
+ GroupingTableMap gTableMap = null;
+ try {
+ gTableMap = new GroupingTableMap();
+ assertNull(gTableMap.createGroupKey(null));
+ } finally {
+ if(gTableMap != null)
+ gTableMap.close();
+ }
+ }
+}
Added: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapred/TestIdentityTableMap.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapred/TestIdentityTableMap.java?rev=1537567&view=auto
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapred/TestIdentityTableMap.java (added)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapred/TestIdentityTableMap.java Thu Oct 31 18:08:22 2013
@@ -0,0 +1,63 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapred;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.SmallTests;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.mockito.Mockito;
+
+@Category(SmallTests.class)
+public class TestIdentityTableMap {
+
+ @Test
+ @SuppressWarnings({ "deprecation", "unchecked" })
+ public void shouldCollectPredefinedTimes() throws IOException {
+ int recordNumber = 999;
+ Result resultMock = mock(Result.class);
+ IdentityTableMap identityTableMap = null;
+ try {
+ Reporter reporterMock = mock(Reporter.class);
+ identityTableMap = new IdentityTableMap();
+ ImmutableBytesWritable bytesWritableMock = mock(ImmutableBytesWritable.class);
+ OutputCollector<ImmutableBytesWritable, Result> outputCollectorMock =
+ mock(OutputCollector.class);
+
+ for (int i = 0; i < recordNumber; i++)
+ identityTableMap.map(bytesWritableMock, resultMock, outputCollectorMock,
+ reporterMock);
+
+ verify(outputCollectorMock, times(recordNumber)).collect(
+ Mockito.any(ImmutableBytesWritable.class), Mockito.any(Result.class));
+ } finally {
+ if (identityTableMap != null)
+ identityTableMap.close();
+ }
+ }
+}
Added: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapred/TestRowCounter.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapred/TestRowCounter.java?rev=1537567&view=auto
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapred/TestRowCounter.java (added)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapred/TestRowCounter.java Thu Oct 31 18:08:22 2013
@@ -0,0 +1,162 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapred;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.PrintStream;
+
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.SmallTests;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapred.RowCounter.RowCounterMapper;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.mockito.Mockito;
+
+import com.google.common.base.Joiner;
+
+@Category(SmallTests.class)
+public class TestRowCounter {
+
+ @Test
+ @SuppressWarnings("deprecation")
+ public void shouldPrintUsage() throws Exception {
+ String expectedOutput = "rowcounter <outputdir> <tablename> <column1> [<column2>...]";
+ String result = new OutputReader(System.out) {
+ @Override
+ void doRead() {
+ assertEquals(-1, RowCounter.printUsage());
+ }
+ }.read();
+
+ assertTrue(result.startsWith(expectedOutput));
+ }
+
+ @Test
+ @SuppressWarnings("deprecation")
+ public void shouldExitAndPrintUsageSinceParameterNumberLessThanThree()
+ throws Exception {
+ final String[] args = new String[] { "one", "two" };
+ String line = "ERROR: Wrong number of parameters: " + args.length;
+ String result = new OutputReader(System.err) {
+ @Override
+ void doRead() throws Exception {
+ assertEquals(-1, new RowCounter().run(args));
+ }
+ }.read();
+
+ assertTrue(result.startsWith(line));
+ }
+
+ @Test
+ @SuppressWarnings({ "deprecation", "unchecked" })
+ public void shouldRegInReportEveryIncomingRow() throws IOException {
+ int iterationNumber = 999;
+ RowCounter.RowCounterMapper mapper = new RowCounter.RowCounterMapper();
+ Reporter reporter = mock(Reporter.class);
+ for (int i = 0; i < iterationNumber; i++)
+ mapper.map(mock(ImmutableBytesWritable.class), mock(Result.class),
+ mock(OutputCollector.class), reporter);
+
+ Mockito.verify(reporter, times(iterationNumber)).incrCounter(
+ any(Enum.class), anyInt());
+ }
+
+ @Test
+ @SuppressWarnings({ "deprecation" })
+ public void shouldCreateAndRunSubmittableJob() throws Exception {
+ RowCounter rCounter = new RowCounter();
+ rCounter.setConf(HBaseConfiguration.create());
+ String[] args = new String[] { "\temp", "tableA", "column1", "column2",
+ "column3" };
+ JobConf jobConfig = rCounter.createSubmittableJob(args);
+
+ assertNotNull(jobConfig);
+ assertEquals(0, jobConfig.getNumReduceTasks());
+ assertEquals("rowcounter", jobConfig.getJobName());
+ assertEquals(jobConfig.getMapOutputValueClass(), Result.class);
+ assertEquals(jobConfig.getMapperClass(), RowCounterMapper.class);
+ assertEquals(jobConfig.get(TableInputFormat.COLUMN_LIST), Joiner.on(' ')
+ .join("column1", "column2", "column3"));
+ assertEquals(jobConfig.getMapOutputKeyClass(), ImmutableBytesWritable.class);
+ }
+
+ enum Outs {
+ OUT, ERR
+ }
+
+ private static abstract class OutputReader {
+ private final PrintStream ps;
+ private PrintStream oldPrintStream;
+ private Outs outs;
+
+ protected OutputReader(PrintStream ps) {
+ this.ps = ps;
+ }
+
+ protected String read() throws Exception {
+ ByteArrayOutputStream outBytes = new ByteArrayOutputStream();
+ if (ps == System.out) {
+ oldPrintStream = System.out;
+ outs = Outs.OUT;
+ System.setOut(new PrintStream(outBytes));
+ } else if (ps == System.err) {
+ oldPrintStream = System.err;
+ outs = Outs.ERR;
+ System.setErr(new PrintStream(outBytes));
+ } else {
+ throw new IllegalStateException("OutputReader: unsupported PrintStream");
+ }
+
+ try {
+ doRead();
+ return new String(outBytes.toByteArray());
+ } finally {
+ switch (outs) {
+ case OUT: {
+ System.setOut(oldPrintStream);
+ break;
+ }
+ case ERR: {
+ System.setErr(oldPrintStream);
+ break;
+ }
+ default:
+ throw new IllegalStateException(
+ "OutputReader: unsupported PrintStream");
+ }
+ }
+ }
+
+ abstract void doRead() throws Exception;
+ }
+}
Added: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapred/TestSplitTable.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapred/TestSplitTable.java?rev=1537567&view=auto
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapred/TestSplitTable.java (added)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapred/TestSplitTable.java Thu Oct 31 18:08:22 2013
@@ -0,0 +1,88 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapred;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.hadoop.hbase.SmallTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category(SmallTests.class)
+public class TestSplitTable {
+
+ @Test
+ @SuppressWarnings("deprecation")
+ public void testSplitTableCompareTo() {
+ TableSplit aTableSplit = new TableSplit(Bytes.toBytes("tableA"),
+ Bytes.toBytes("aaa"), Bytes.toBytes("ddd"), "locationA");
+
+ TableSplit bTableSplit = new TableSplit(Bytes.toBytes("tableA"),
+ Bytes.toBytes("iii"), Bytes.toBytes("kkk"), "locationA");
+
+ TableSplit cTableSplit = new TableSplit(Bytes.toBytes("tableA"),
+ Bytes.toBytes("lll"), Bytes.toBytes("zzz"), "locationA");
+
+ assertTrue(aTableSplit.compareTo(aTableSplit) == 0);
+ assertTrue(bTableSplit.compareTo(bTableSplit) == 0);
+ assertTrue(cTableSplit.compareTo(cTableSplit) == 0);
+
+ assertTrue(aTableSplit.compareTo(bTableSplit) < 0);
+ assertTrue(bTableSplit.compareTo(aTableSplit) > 0);
+
+ assertTrue(aTableSplit.compareTo(cTableSplit) < 0);
+ assertTrue(cTableSplit.compareTo(aTableSplit) > 0);
+
+ assertTrue(bTableSplit.compareTo(cTableSplit) < 0);
+ assertTrue(cTableSplit.compareTo(bTableSplit) > 0);
+
+ assertTrue(cTableSplit.compareTo(aTableSplit) > 0);
+ }
+
+ @Test
+ @SuppressWarnings("deprecation")
+ public void testSplitTableEquals() {
+ assertFalse(new TableSplit(Bytes.toBytes("tableA"), Bytes.toBytes("aaa"),
+ Bytes.toBytes("ddd"), "locationA").equals(new TableSplit(Bytes
+ .toBytes("tableB"), Bytes.toBytes("aaa"), Bytes.toBytes("ddd"),
+ "locationA")));
+
+ assertFalse(new TableSplit(Bytes.toBytes("tableA"), Bytes.toBytes("aaa"),
+ Bytes.toBytes("ddd"), "locationA").equals(new TableSplit(Bytes
+ .toBytes("tableA"), Bytes.toBytes("bbb"), Bytes.toBytes("ddd"),
+ "locationA")));
+
+ assertFalse(new TableSplit(Bytes.toBytes("tableA"), Bytes.toBytes("aaa"),
+ Bytes.toBytes("ddd"), "locationA").equals(new TableSplit(Bytes
+ .toBytes("tableA"), Bytes.toBytes("aaa"), Bytes.toBytes("eee"),
+ "locationA")));
+
+ assertFalse(new TableSplit(Bytes.toBytes("tableA"), Bytes.toBytes("aaa"),
+ Bytes.toBytes("ddd"), "locationA").equals(new TableSplit(Bytes
+ .toBytes("tableA"), Bytes.toBytes("aaa"), Bytes.toBytes("ddd"),
+ "locationB")));
+
+ assertTrue(new TableSplit(Bytes.toBytes("tableA"), Bytes.toBytes("aaa"),
+ Bytes.toBytes("ddd"), "locationA").equals(new TableSplit(Bytes
+ .toBytes("tableA"), Bytes.toBytes("aaa"), Bytes.toBytes("ddd"),
+ "locationA")));
+ }
+}
Added: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapred/TestTableMapReduceUtil.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapred/TestTableMapReduceUtil.java?rev=1537567&view=auto
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapred/TestTableMapReduceUtil.java (added)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapred/TestTableMapReduceUtil.java Thu Oct 31 18:08:22 2013
@@ -0,0 +1,274 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapred;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.LargeTests;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.RunningJob;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+
+@Category(LargeTests.class)
+public class TestTableMapReduceUtil {
+
+ private static final Log LOG = LogFactory
+ .getLog(TestTableMapReduceUtil.class);
+
+ private static HTable presidentsTable;
+ private static final String TABLE_NAME = "People";
+
+ private static final byte[] COLUMN_FAMILY = Bytes.toBytes("info");
+ private static final byte[] COLUMN_QUALIFIER = Bytes.toBytes("name");
+
+ private static ImmutableSet<String> presidentsRowKeys = ImmutableSet.of(
+ "president1", "president2", "president3");
+ private static Iterator<String> presidentNames = ImmutableSet.of(
+ "John F. Kennedy", "George W. Bush", "Barack Obama").iterator();
+
+ private static ImmutableSet<String> actorsRowKeys = ImmutableSet.of("actor1",
+ "actor2");
+ private static Iterator<String> actorNames = ImmutableSet.of(
+ "Jack Nicholson", "Martin Freeman").iterator();
+
+ private static String PRESIDENT_PATTERN = "president";
+ private static String ACTOR_PATTERN = "actor";
+ private static ImmutableMap<String, ImmutableSet<String>> relation = ImmutableMap
+ .of(PRESIDENT_PATTERN, presidentsRowKeys, ACTOR_PATTERN, actorsRowKeys);
+
+ private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
+
+ @BeforeClass
+ public static void beforeClass() throws Exception {
+ UTIL.startMiniCluster();
+ presidentsTable = createAndFillTable(Bytes.toBytes(TABLE_NAME));
+ UTIL.startMiniMapReduceCluster();
+ }
+
+ @AfterClass
+ public static void afterClass() throws Exception {
+ UTIL.shutdownMiniMapReduceCluster();
+ UTIL.shutdownMiniCluster();
+ }
+
+ @Before
+ public void before() throws IOException {
+ LOG.info("before");
+ UTIL.ensureSomeRegionServersAvailable(1);
+ LOG.info("before done");
+ }
+
+ public static HTable createAndFillTable(byte[] tableName) throws IOException {
+ HTable table = UTIL.createTable(tableName, COLUMN_FAMILY);
+ createPutCommand(table);
+ return table;
+ }
+
+ private static void createPutCommand(HTable table) throws IOException {
+ for (String president : presidentsRowKeys) {
+ if (presidentNames.hasNext()) {
+ Put p = new Put(Bytes.toBytes(president));
+ p.add(COLUMN_FAMILY, COLUMN_QUALIFIER,
+ Bytes.toBytes(presidentNames.next()));
+ table.put(p);
+ }
+ }
+
+ for (String actor : actorsRowKeys) {
+ if (actorNames.hasNext()) {
+ Put p = new Put(Bytes.toBytes(actor));
+ p.add(COLUMN_FAMILY, COLUMN_QUALIFIER, Bytes.toBytes(actorNames.next()));
+ table.put(p);
+ }
+ }
+ }
+
+ /**
+ * Check what the given number of reduce tasks for the given job configuration
+ * does not exceed the number of regions for the given table.
+ */
+ @Test
+ @SuppressWarnings("deprecation")
+ public void shouldNumberOfReduceTaskNotExceedNumberOfRegionsForGivenTable()
+ throws IOException {
+ Assert.assertNotNull(presidentsTable);
+ Configuration cfg = UTIL.getConfiguration();
+ JobConf jobConf = new JobConf(cfg);
+ TableMapReduceUtil.setNumReduceTasks(TABLE_NAME, jobConf);
+ TableMapReduceUtil.limitNumReduceTasks(TABLE_NAME, jobConf);
+ TableMapReduceUtil.setScannerCaching(jobConf, 100);
+ assertEquals(1, jobConf.getNumReduceTasks());
+ assertEquals(100, jobConf.getInt("hbase.client.scanner.caching", 0));
+
+ jobConf.setNumReduceTasks(10);
+ TableMapReduceUtil.setNumMapTasks(TABLE_NAME, jobConf);
+ TableMapReduceUtil.limitNumReduceTasks(TABLE_NAME, jobConf);
+ assertEquals(1, jobConf.getNumReduceTasks());
+ }
+
+ @Test
+ @SuppressWarnings("deprecation")
+ public void shouldNumberOfMapTaskNotExceedNumberOfRegionsForGivenTable()
+ throws IOException {
+ Configuration cfg = UTIL.getConfiguration();
+ JobConf jobConf = new JobConf(cfg);
+ TableMapReduceUtil.setNumReduceTasks(TABLE_NAME, jobConf);
+ TableMapReduceUtil.limitNumMapTasks(TABLE_NAME, jobConf);
+ assertEquals(1, jobConf.getNumMapTasks());
+
+ jobConf.setNumMapTasks(10);
+ TableMapReduceUtil.setNumMapTasks(TABLE_NAME, jobConf);
+ TableMapReduceUtil.limitNumMapTasks(TABLE_NAME, jobConf);
+ assertEquals(1, jobConf.getNumMapTasks());
+ }
+
+ @Test
+ @SuppressWarnings("deprecation")
+ public void shoudBeValidMapReduceEvaluation() throws Exception {
+ Configuration cfg = UTIL.getConfiguration();
+ JobConf jobConf = new JobConf(cfg);
+ try {
+ jobConf.setJobName("process row task");
+ jobConf.setNumReduceTasks(1);
+ TableMapReduceUtil.initTableMapJob(TABLE_NAME, new String(COLUMN_FAMILY),
+ ClassificatorMapper.class, ImmutableBytesWritable.class, Put.class,
+ jobConf);
+ TableMapReduceUtil.initTableReduceJob(TABLE_NAME,
+ ClassificatorRowReduce.class, jobConf);
+ RunningJob job = JobClient.runJob(jobConf);
+ assertTrue(job.isSuccessful());
+ } finally {
+ if (jobConf != null)
+ FileUtil.fullyDelete(new File(jobConf.get("hadoop.tmp.dir")));
+ }
+ }
+
+ @Test
+ @SuppressWarnings("deprecation")
+ public void shoudBeValidMapReduceWithPartitionerEvaluation()
+ throws IOException {
+ Configuration cfg = UTIL.getConfiguration();
+ JobConf jobConf = new JobConf(cfg);
+ try {
+ jobConf.setJobName("process row task");
+ jobConf.setNumReduceTasks(2);
+ TableMapReduceUtil.initTableMapJob(TABLE_NAME, new String(COLUMN_FAMILY),
+ ClassificatorMapper.class, ImmutableBytesWritable.class, Put.class,
+ jobConf);
+
+ TableMapReduceUtil.initTableReduceJob(TABLE_NAME,
+ ClassificatorRowReduce.class, jobConf, HRegionPartitioner.class);
+ RunningJob job = JobClient.runJob(jobConf);
+ assertTrue(job.isSuccessful());
+ } finally {
+ if (jobConf != null)
+ FileUtil.fullyDelete(new File(jobConf.get("hadoop.tmp.dir")));
+ }
+ }
+
+ @SuppressWarnings("deprecation")
+ static class ClassificatorRowReduce extends MapReduceBase implements
+ TableReduce<ImmutableBytesWritable, Put> {
+
+ @Override
+ public void reduce(ImmutableBytesWritable key, Iterator<Put> values,
+ OutputCollector<ImmutableBytesWritable, Put> output, Reporter reporter)
+ throws IOException {
+ String strKey = Bytes.toString(key.get());
+ List<Put> result = new ArrayList<Put>();
+ while (values.hasNext())
+ result.add(values.next());
+
+ if (relation.keySet().contains(strKey)) {
+ Set<String> set = relation.get(strKey);
+ if (set != null) {
+ assertEquals(set.size(), result.size());
+ } else {
+ throwAccertionError("Test infrastructure error: set is null");
+ }
+ } else {
+ throwAccertionError("Test infrastructure error: key not found in map");
+ }
+ }
+
+ private void throwAccertionError(String errorMessage) throws AssertionError {
+ throw new AssertionError(errorMessage);
+ }
+ }
+
+ @SuppressWarnings("deprecation")
+ static class ClassificatorMapper extends MapReduceBase implements
+ TableMap<ImmutableBytesWritable, Put> {
+
+ @Override
+ public void map(ImmutableBytesWritable row, Result result,
+ OutputCollector<ImmutableBytesWritable, Put> outCollector,
+ Reporter reporter) throws IOException {
+ String rowKey = Bytes.toString(result.getRow());
+ final ImmutableBytesWritable pKey = new ImmutableBytesWritable(
+ Bytes.toBytes(PRESIDENT_PATTERN));
+ final ImmutableBytesWritable aKey = new ImmutableBytesWritable(
+ Bytes.toBytes(ACTOR_PATTERN));
+ ImmutableBytesWritable outKey = null;
+
+ if (rowKey.startsWith(PRESIDENT_PATTERN)) {
+ outKey = pKey;
+ } else if (rowKey.startsWith(ACTOR_PATTERN)) {
+ outKey = aKey;
+ } else {
+ throw new AssertionError("unexpected rowKey");
+ }
+
+ String name = Bytes.toString(result.getValue(COLUMN_FAMILY,
+ COLUMN_QUALIFIER));
+ outCollector.collect(outKey, new Put(Bytes.toBytes("rowKey2")).add(
+ COLUMN_FAMILY, COLUMN_QUALIFIER, Bytes.toBytes(name)));
+ }
+ }
+}