You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by dd...@apache.org on 2013/10/31 19:08:23 UTC

svn commit: r1537567 - in /hbase/trunk/hbase-server/src: main/java/org/apache/hadoop/hbase/mapred/ test/java/org/apache/hadoop/hbase/mapred/

Author: ddas
Date: Thu Oct 31 18:08:22 2013
New Revision: 1537567

URL: http://svn.apache.org/r1537567
Log:
HBASE-8611. Improve test coverage in pkg org.apache.hadoop.hbase.mapred

Added:
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapred/TestDriver.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapred/TestGroupingTableMap.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapred/TestIdentityTableMap.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapred/TestRowCounter.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapred/TestSplitTable.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapred/TestTableMapReduceUtil.java
Modified:
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/Driver.java

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/Driver.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/Driver.java?rev=1537567&r1=1537566&r2=1537567&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/Driver.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/Driver.java Thu Oct 31 18:08:22 2013
@@ -21,24 +21,31 @@ package org.apache.hadoop.hbase.mapred;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.util.ProgramDriver;
+import com.google.common.annotations.VisibleForTesting;
 
 /**
- * Driver for hbase mapreduce jobs. Select which to run by passing
- * name of job to this main.
+ * Driver for hbase mapreduce jobs. Select which to run by passing name of job
+ * to this main.
  */
 @Deprecated
 @InterfaceAudience.Public
 @InterfaceStability.Stable
 public class Driver {
+
+  private static ProgramDriver pgd = new ProgramDriver();
+
+  @VisibleForTesting
+  static void setProgramDriver(ProgramDriver pgd0) {    
+    pgd = pgd0;
+  }
+
   /**
    * @param args
    * @throws Throwable
    */
   public static void main(String[] args) throws Throwable {
-    ProgramDriver pgd = new ProgramDriver();
-    pgd.addClass(RowCounter.NAME, RowCounter.class,
-      "Count rows in HBase table");
-    ProgramDriver.class.getMethod("driver", new Class [] {String[].class}).
-      invoke(pgd, new Object[]{args});
+    pgd.addClass(RowCounter.NAME, RowCounter.class, "Count rows in HBase table");
+    ProgramDriver.class.getMethod("driver", new Class[] { String[].class })
+        .invoke(pgd, new Object[] { args });    
   }
 }

Added: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapred/TestDriver.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapred/TestDriver.java?rev=1537567&view=auto
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapred/TestDriver.java (added)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapred/TestDriver.java Thu Oct 31 18:08:22 2013
@@ -0,0 +1,41 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapred;
+
+import org.apache.hadoop.hbase.SmallTests;
+import org.apache.hadoop.util.ProgramDriver;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.mockito.Mockito;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+
+@Category(SmallTests.class)
+public class TestDriver {
+
+  @Test
+  @SuppressWarnings("deprecation")
+  public void testDriverMainMethod() throws Throwable {
+    ProgramDriver programDriverMock = mock(ProgramDriver.class);
+    Driver.setProgramDriver(programDriverMock);
+    Driver.main(new String[]{});
+    verify(programDriverMock).driver(Mockito.any(String[].class));    
+  }
+}

Added: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapred/TestGroupingTableMap.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapred/TestGroupingTableMap.java?rev=1537567&view=auto
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapred/TestGroupingTableMap.java (added)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapred/TestGroupingTableMap.java Thu Oct 31 18:08:22 2013
@@ -0,0 +1,178 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapred;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertNull;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyZeroInteractions;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.SmallTests;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import com.google.common.collect.ImmutableList;
+
+@Category(SmallTests.class)
+public class TestGroupingTableMap {
+
+  @Test
+  @SuppressWarnings({ "deprecation", "unchecked" })
+  public void shouldNotCallCollectonSinceFindUniqueKeyValueMoreThanOnes()
+      throws Exception {
+    GroupingTableMap gTableMap = null;
+    try {
+      Result result = mock(Result.class);
+      Reporter reporter = mock(Reporter.class);
+      gTableMap = new GroupingTableMap();
+      Configuration cfg = new Configuration();
+      cfg.set(GroupingTableMap.GROUP_COLUMNS, "familyA:qualifierA familyB:qualifierB");
+      JobConf jobConf = new JobConf(cfg);
+      gTableMap.configure(jobConf);
+  
+      byte[] row = {};
+      List<Cell> keyValues = ImmutableList.<Cell>of(
+          new KeyValue(row, "familyA".getBytes(), "qualifierA".getBytes(), Bytes.toBytes("1111")),
+          new KeyValue(row, "familyA".getBytes(), "qualifierA".getBytes(), Bytes.toBytes("2222")),
+          new KeyValue(row, "familyB".getBytes(), "qualifierB".getBytes(), Bytes.toBytes("3333")));
+      when(result.listCells()).thenReturn(keyValues);
+      OutputCollector<ImmutableBytesWritable, Result> outputCollectorMock =
+          mock(OutputCollector.class);
+      gTableMap.map(null, result, outputCollectorMock, reporter);
+      verify(result).listCells();
+      verifyZeroInteractions(outputCollectorMock);
+    } finally {
+      if (gTableMap != null)
+        gTableMap.close();    
+    }
+  }
+
+  @Test
+  @SuppressWarnings({ "deprecation", "unchecked" })
+  public void shouldCreateNewKeyAlthoughExtraKey() throws Exception {
+    GroupingTableMap gTableMap = null;
+    try {
+      Result result = mock(Result.class);
+      Reporter reporter = mock(Reporter.class);
+      gTableMap = new GroupingTableMap();
+      Configuration cfg = new Configuration();
+      cfg.set(GroupingTableMap.GROUP_COLUMNS, "familyA:qualifierA familyB:qualifierB");
+      JobConf jobConf = new JobConf(cfg);
+      gTableMap.configure(jobConf);
+  
+      byte[] row = {};
+      List<Cell> keyValues = ImmutableList.<Cell>of(
+          new KeyValue(row, "familyA".getBytes(), "qualifierA".getBytes(), Bytes.toBytes("1111")),
+          new KeyValue(row, "familyB".getBytes(), "qualifierB".getBytes(), Bytes.toBytes("2222")),
+          new KeyValue(row, "familyC".getBytes(), "qualifierC".getBytes(), Bytes.toBytes("3333")));
+      when(result.listCells()).thenReturn(keyValues);
+      OutputCollector<ImmutableBytesWritable, Result> outputCollectorMock =
+          mock(OutputCollector.class);
+      gTableMap.map(null, result, outputCollectorMock, reporter);
+      verify(result).listCells();
+      verify(outputCollectorMock, times(1))
+        .collect(any(ImmutableBytesWritable.class), any(Result.class));
+      verifyNoMoreInteractions(outputCollectorMock);
+    } finally {
+      if (gTableMap != null)
+        gTableMap.close();
+    }
+  }
+
+  @Test
+  @SuppressWarnings({ "deprecation" })
+  public void shouldCreateNewKey() throws Exception {
+    GroupingTableMap gTableMap = null;  
+    try {
+      Result result = mock(Result.class);
+      Reporter reporter = mock(Reporter.class);
+      final byte[] bSeparator = Bytes.toBytes(" ");
+      gTableMap = new GroupingTableMap();
+      Configuration cfg = new Configuration();
+      cfg.set(GroupingTableMap.GROUP_COLUMNS, "familyA:qualifierA familyB:qualifierB");
+      JobConf jobConf = new JobConf(cfg);
+      gTableMap.configure(jobConf);
+  
+      final byte[] firstPartKeyValue = Bytes.toBytes("34879512738945");
+      final byte[] secondPartKeyValue = Bytes.toBytes("35245142671437");
+      byte[] row = {};
+      List<Cell> cells = ImmutableList.<Cell>of(
+          new KeyValue(row, "familyA".getBytes(), "qualifierA".getBytes(), firstPartKeyValue),
+          new KeyValue(row, "familyB".getBytes(), "qualifierB".getBytes(), secondPartKeyValue));
+      when(result.listCells()).thenReturn(cells);
+  
+      final AtomicBoolean outputCollected = new AtomicBoolean();
+      OutputCollector<ImmutableBytesWritable, Result> outputCollector =
+          new OutputCollector<ImmutableBytesWritable, Result>() {
+        @Override
+        public void collect(ImmutableBytesWritable arg, Result result) throws IOException {
+          assertArrayEquals(com.google.common.primitives.Bytes.concat(firstPartKeyValue, bSeparator,
+              secondPartKeyValue), arg.copyBytes());
+          outputCollected.set(true);
+        }
+      };
+      
+      gTableMap.map(null, result, outputCollector, reporter);
+      verify(result).listCells();
+      Assert.assertTrue("Output not received", outputCollected.get());
+  
+      final byte[] firstPartValue = Bytes.toBytes("238947928");
+      final byte[] secondPartValue = Bytes.toBytes("4678456942345");
+      byte[][] data = { firstPartValue, secondPartValue };
+      ImmutableBytesWritable byteWritable = gTableMap.createGroupKey(data);
+      assertArrayEquals(com.google.common.primitives.Bytes.concat(firstPartValue,
+          bSeparator, secondPartValue), byteWritable.get());
+    } finally {
+      if (gTableMap != null)
+        gTableMap.close();
+    }
+  }
+
+  @Test
+  @SuppressWarnings({ "deprecation" })
+  public void shouldReturnNullFromCreateGroupKey() throws Exception {
+    GroupingTableMap gTableMap = null;
+    try {
+      gTableMap = new GroupingTableMap();
+      assertNull(gTableMap.createGroupKey(null));
+    } finally {
+      if(gTableMap != null)
+        gTableMap.close();
+    }
+  }
+}

Added: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapred/TestIdentityTableMap.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapred/TestIdentityTableMap.java?rev=1537567&view=auto
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapred/TestIdentityTableMap.java (added)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapred/TestIdentityTableMap.java Thu Oct 31 18:08:22 2013
@@ -0,0 +1,63 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapred;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.SmallTests;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.mockito.Mockito;
+
+@Category(SmallTests.class)
+public class TestIdentityTableMap {
+
+  @Test
+  @SuppressWarnings({ "deprecation", "unchecked" })
+  public void shouldCollectPredefinedTimes() throws IOException {
+    int recordNumber = 999;
+    Result resultMock = mock(Result.class);
+    IdentityTableMap identityTableMap = null;
+    try {
+      Reporter reporterMock = mock(Reporter.class);
+      identityTableMap = new IdentityTableMap();
+      ImmutableBytesWritable bytesWritableMock = mock(ImmutableBytesWritable.class);
+      OutputCollector<ImmutableBytesWritable, Result> outputCollectorMock =
+          mock(OutputCollector.class);
+  
+      for (int i = 0; i < recordNumber; i++)
+        identityTableMap.map(bytesWritableMock, resultMock, outputCollectorMock,
+            reporterMock);
+  
+      verify(outputCollectorMock, times(recordNumber)).collect(
+          Mockito.any(ImmutableBytesWritable.class), Mockito.any(Result.class));
+    } finally {
+      if (identityTableMap != null)
+        identityTableMap.close();
+    }
+  }
+}

Added: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapred/TestRowCounter.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapred/TestRowCounter.java?rev=1537567&view=auto
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapred/TestRowCounter.java (added)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapred/TestRowCounter.java Thu Oct 31 18:08:22 2013
@@ -0,0 +1,162 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapred;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.PrintStream;
+
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.SmallTests;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapred.RowCounter.RowCounterMapper;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.mockito.Mockito;
+
+import com.google.common.base.Joiner;
+
+@Category(SmallTests.class)
+public class TestRowCounter {
+
+  @Test
+  @SuppressWarnings("deprecation")
+  public void shouldPrintUsage() throws Exception {
+    String expectedOutput = "rowcounter <outputdir> <tablename> <column1> [<column2>...]";
+    String result = new OutputReader(System.out) {
+      @Override
+      void doRead() {
+        assertEquals(-1, RowCounter.printUsage());
+      }
+    }.read();
+
+    assertTrue(result.startsWith(expectedOutput));
+  }
+
+  @Test
+  @SuppressWarnings("deprecation")
+  public void shouldExitAndPrintUsageSinceParameterNumberLessThanThree()
+      throws Exception {
+    final String[] args = new String[] { "one", "two" };
+    String line = "ERROR: Wrong number of parameters: " + args.length;
+    String result = new OutputReader(System.err) {
+      @Override
+      void doRead() throws Exception {
+        assertEquals(-1, new RowCounter().run(args));
+      }
+    }.read();
+
+    assertTrue(result.startsWith(line));
+  }
+
+  @Test
+  @SuppressWarnings({ "deprecation", "unchecked" })
+  public void shouldRegInReportEveryIncomingRow() throws IOException {
+    int iterationNumber = 999;
+    RowCounter.RowCounterMapper mapper = new RowCounter.RowCounterMapper();
+    Reporter reporter = mock(Reporter.class);
+    for (int i = 0; i < iterationNumber; i++)
+      mapper.map(mock(ImmutableBytesWritable.class), mock(Result.class),
+          mock(OutputCollector.class), reporter);
+
+    Mockito.verify(reporter, times(iterationNumber)).incrCounter(
+        any(Enum.class), anyInt());
+  }
+
+  @Test
+  @SuppressWarnings({ "deprecation" })
+  public void shouldCreateAndRunSubmittableJob() throws Exception {
+    RowCounter rCounter = new RowCounter();
+    rCounter.setConf(HBaseConfiguration.create());
+    String[] args = new String[] { "\temp", "tableA", "column1", "column2",
+        "column3" };
+    JobConf jobConfig = rCounter.createSubmittableJob(args);
+
+    assertNotNull(jobConfig);
+    assertEquals(0, jobConfig.getNumReduceTasks());
+    assertEquals("rowcounter", jobConfig.getJobName());
+    assertEquals(jobConfig.getMapOutputValueClass(), Result.class);
+    assertEquals(jobConfig.getMapperClass(), RowCounterMapper.class);
+    assertEquals(jobConfig.get(TableInputFormat.COLUMN_LIST), Joiner.on(' ')
+        .join("column1", "column2", "column3"));
+    assertEquals(jobConfig.getMapOutputKeyClass(), ImmutableBytesWritable.class);
+  }
+
+  enum Outs {
+    OUT, ERR
+  }
+
+  private static abstract class OutputReader {
+    private final PrintStream ps;
+    private PrintStream oldPrintStream;
+    private Outs outs;
+
+    protected OutputReader(PrintStream ps) {
+      this.ps = ps;
+    }
+
+    protected String read() throws Exception {
+      ByteArrayOutputStream outBytes = new ByteArrayOutputStream();
+      if (ps == System.out) {
+        oldPrintStream = System.out;
+        outs = Outs.OUT;
+        System.setOut(new PrintStream(outBytes));
+      } else if (ps == System.err) {
+        oldPrintStream = System.err;
+        outs = Outs.ERR;
+        System.setErr(new PrintStream(outBytes));
+      } else {
+        throw new IllegalStateException("OutputReader: unsupported PrintStream");
+      }
+
+      try {
+        doRead();
+        return new String(outBytes.toByteArray());
+      } finally {
+        switch (outs) {
+        case OUT: {
+          System.setOut(oldPrintStream);
+          break;
+        }
+        case ERR: {
+          System.setErr(oldPrintStream);
+          break;
+        }
+        default:
+          throw new IllegalStateException(
+              "OutputReader: unsupported PrintStream");
+        }
+      }
+    }
+
+    abstract void doRead() throws Exception;
+  }
+}

Added: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapred/TestSplitTable.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapred/TestSplitTable.java?rev=1537567&view=auto
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapred/TestSplitTable.java (added)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapred/TestSplitTable.java Thu Oct 31 18:08:22 2013
@@ -0,0 +1,88 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapred;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.hadoop.hbase.SmallTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category(SmallTests.class)
+public class TestSplitTable {
+
+  @Test
+  @SuppressWarnings("deprecation")
+  public void testSplitTableCompareTo() {
+    TableSplit aTableSplit = new TableSplit(Bytes.toBytes("tableA"),
+        Bytes.toBytes("aaa"), Bytes.toBytes("ddd"), "locationA");
+
+    TableSplit bTableSplit = new TableSplit(Bytes.toBytes("tableA"),
+        Bytes.toBytes("iii"), Bytes.toBytes("kkk"), "locationA");
+
+    TableSplit cTableSplit = new TableSplit(Bytes.toBytes("tableA"),
+        Bytes.toBytes("lll"), Bytes.toBytes("zzz"), "locationA");
+
+    assertTrue(aTableSplit.compareTo(aTableSplit) == 0);
+    assertTrue(bTableSplit.compareTo(bTableSplit) == 0);
+    assertTrue(cTableSplit.compareTo(cTableSplit) == 0);
+
+    assertTrue(aTableSplit.compareTo(bTableSplit) < 0);
+    assertTrue(bTableSplit.compareTo(aTableSplit) > 0);
+
+    assertTrue(aTableSplit.compareTo(cTableSplit) < 0);
+    assertTrue(cTableSplit.compareTo(aTableSplit) > 0);
+
+    assertTrue(bTableSplit.compareTo(cTableSplit) < 0);
+    assertTrue(cTableSplit.compareTo(bTableSplit) > 0);
+
+    assertTrue(cTableSplit.compareTo(aTableSplit) > 0);
+  }
+
+  @Test
+  @SuppressWarnings("deprecation")
+  public void testSplitTableEquals() {
+    assertFalse(new TableSplit(Bytes.toBytes("tableA"), Bytes.toBytes("aaa"),
+        Bytes.toBytes("ddd"), "locationA").equals(new TableSplit(Bytes
+        .toBytes("tableB"), Bytes.toBytes("aaa"), Bytes.toBytes("ddd"),
+        "locationA")));
+
+    assertFalse(new TableSplit(Bytes.toBytes("tableA"), Bytes.toBytes("aaa"),
+        Bytes.toBytes("ddd"), "locationA").equals(new TableSplit(Bytes
+        .toBytes("tableA"), Bytes.toBytes("bbb"), Bytes.toBytes("ddd"),
+        "locationA")));
+
+    assertFalse(new TableSplit(Bytes.toBytes("tableA"), Bytes.toBytes("aaa"),
+        Bytes.toBytes("ddd"), "locationA").equals(new TableSplit(Bytes
+        .toBytes("tableA"), Bytes.toBytes("aaa"), Bytes.toBytes("eee"),
+        "locationA")));
+
+    assertFalse(new TableSplit(Bytes.toBytes("tableA"), Bytes.toBytes("aaa"),
+        Bytes.toBytes("ddd"), "locationA").equals(new TableSplit(Bytes
+        .toBytes("tableA"), Bytes.toBytes("aaa"), Bytes.toBytes("ddd"),
+        "locationB")));
+
+    assertTrue(new TableSplit(Bytes.toBytes("tableA"), Bytes.toBytes("aaa"),
+        Bytes.toBytes("ddd"), "locationA").equals(new TableSplit(Bytes
+        .toBytes("tableA"), Bytes.toBytes("aaa"), Bytes.toBytes("ddd"),
+        "locationA")));
+  }
+}

Added: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapred/TestTableMapReduceUtil.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapred/TestTableMapReduceUtil.java?rev=1537567&view=auto
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapred/TestTableMapReduceUtil.java (added)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapred/TestTableMapReduceUtil.java Thu Oct 31 18:08:22 2013
@@ -0,0 +1,274 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapred;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.LargeTests;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.RunningJob;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+
+@Category(LargeTests.class)
+public class TestTableMapReduceUtil {
+
+  private static final Log LOG = LogFactory
+      .getLog(TestTableMapReduceUtil.class);
+
+  private static HTable presidentsTable;
+  private static final String TABLE_NAME = "People";
+
+  private static final byte[] COLUMN_FAMILY = Bytes.toBytes("info");
+  private static final byte[] COLUMN_QUALIFIER = Bytes.toBytes("name");
+
+  private static ImmutableSet<String> presidentsRowKeys = ImmutableSet.of(
+      "president1", "president2", "president3");
+  private static Iterator<String> presidentNames = ImmutableSet.of(
+      "John F. Kennedy", "George W. Bush", "Barack Obama").iterator();
+
+  private static ImmutableSet<String> actorsRowKeys = ImmutableSet.of("actor1",
+      "actor2");
+  private static Iterator<String> actorNames = ImmutableSet.of(
+      "Jack Nicholson", "Martin Freeman").iterator();
+
+  private static String PRESIDENT_PATTERN = "president";
+  private static String ACTOR_PATTERN = "actor";
+  private static ImmutableMap<String, ImmutableSet<String>> relation = ImmutableMap
+      .of(PRESIDENT_PATTERN, presidentsRowKeys, ACTOR_PATTERN, actorsRowKeys);
+
+  private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
+
+  @BeforeClass
+  public static void beforeClass() throws Exception {
+    UTIL.startMiniCluster();
+    presidentsTable = createAndFillTable(Bytes.toBytes(TABLE_NAME));
+    UTIL.startMiniMapReduceCluster();
+  }
+
+  @AfterClass
+  public static void afterClass() throws Exception {
+    UTIL.shutdownMiniMapReduceCluster();
+    UTIL.shutdownMiniCluster();
+  }
+
+  @Before
+  public void before() throws IOException {
+    LOG.info("before");
+    UTIL.ensureSomeRegionServersAvailable(1);
+    LOG.info("before done");
+  }
+
+  public static HTable createAndFillTable(byte[] tableName) throws IOException {
+    HTable table = UTIL.createTable(tableName, COLUMN_FAMILY);
+    createPutCommand(table);
+    return table;
+  }
+
+  private static void createPutCommand(HTable table) throws IOException {
+    for (String president : presidentsRowKeys) {
+      if (presidentNames.hasNext()) {
+        Put p = new Put(Bytes.toBytes(president));
+        p.add(COLUMN_FAMILY, COLUMN_QUALIFIER,
+            Bytes.toBytes(presidentNames.next()));
+        table.put(p);
+      }
+    }
+
+    for (String actor : actorsRowKeys) {
+      if (actorNames.hasNext()) {
+        Put p = new Put(Bytes.toBytes(actor));
+        p.add(COLUMN_FAMILY, COLUMN_QUALIFIER, Bytes.toBytes(actorNames.next()));
+        table.put(p);
+      }
+    }
+  }
+
+  /**
+   * Check what the given number of reduce tasks for the given job configuration
+   * does not exceed the number of regions for the given table.
+   */
+  @Test
+  @SuppressWarnings("deprecation")
+  public void shouldNumberOfReduceTaskNotExceedNumberOfRegionsForGivenTable()
+      throws IOException {
+    Assert.assertNotNull(presidentsTable);
+    Configuration cfg = UTIL.getConfiguration();
+    JobConf jobConf = new JobConf(cfg);
+    TableMapReduceUtil.setNumReduceTasks(TABLE_NAME, jobConf);
+    TableMapReduceUtil.limitNumReduceTasks(TABLE_NAME, jobConf);
+    TableMapReduceUtil.setScannerCaching(jobConf, 100);
+    assertEquals(1, jobConf.getNumReduceTasks());
+    assertEquals(100, jobConf.getInt("hbase.client.scanner.caching", 0));
+
+    jobConf.setNumReduceTasks(10);
+    TableMapReduceUtil.setNumMapTasks(TABLE_NAME, jobConf);
+    TableMapReduceUtil.limitNumReduceTasks(TABLE_NAME, jobConf);
+    assertEquals(1, jobConf.getNumReduceTasks());
+  }
+
+  @Test
+  @SuppressWarnings("deprecation")
+  public void shouldNumberOfMapTaskNotExceedNumberOfRegionsForGivenTable()
+      throws IOException {
+    Configuration cfg = UTIL.getConfiguration();
+    JobConf jobConf = new JobConf(cfg);
+    TableMapReduceUtil.setNumReduceTasks(TABLE_NAME, jobConf);
+    TableMapReduceUtil.limitNumMapTasks(TABLE_NAME, jobConf);
+    assertEquals(1, jobConf.getNumMapTasks());
+
+    jobConf.setNumMapTasks(10);
+    TableMapReduceUtil.setNumMapTasks(TABLE_NAME, jobConf);
+    TableMapReduceUtil.limitNumMapTasks(TABLE_NAME, jobConf);
+    assertEquals(1, jobConf.getNumMapTasks());
+  }
+
+  @Test
+  @SuppressWarnings("deprecation")
+  public void shoudBeValidMapReduceEvaluation() throws Exception {
+    Configuration cfg = UTIL.getConfiguration();
+    JobConf jobConf = new JobConf(cfg);
+    try {
+      jobConf.setJobName("process row task");
+      jobConf.setNumReduceTasks(1);
+      TableMapReduceUtil.initTableMapJob(TABLE_NAME, new String(COLUMN_FAMILY),
+          ClassificatorMapper.class, ImmutableBytesWritable.class, Put.class,
+          jobConf);
+      TableMapReduceUtil.initTableReduceJob(TABLE_NAME,
+          ClassificatorRowReduce.class, jobConf);
+      RunningJob job = JobClient.runJob(jobConf);
+      assertTrue(job.isSuccessful());
+    } finally {
+      if (jobConf != null)
+        FileUtil.fullyDelete(new File(jobConf.get("hadoop.tmp.dir")));
+    }
+  }
+
+  @Test
+  @SuppressWarnings("deprecation")
+  public void shoudBeValidMapReduceWithPartitionerEvaluation()
+      throws IOException {
+    Configuration cfg = UTIL.getConfiguration();
+    JobConf jobConf = new JobConf(cfg);
+    try {
+      jobConf.setJobName("process row task");
+      jobConf.setNumReduceTasks(2);
+      TableMapReduceUtil.initTableMapJob(TABLE_NAME, new String(COLUMN_FAMILY),
+          ClassificatorMapper.class, ImmutableBytesWritable.class, Put.class,
+          jobConf);
+
+      TableMapReduceUtil.initTableReduceJob(TABLE_NAME,
+          ClassificatorRowReduce.class, jobConf, HRegionPartitioner.class);
+      RunningJob job = JobClient.runJob(jobConf);
+      assertTrue(job.isSuccessful());
+    } finally {
+      if (jobConf != null)
+        FileUtil.fullyDelete(new File(jobConf.get("hadoop.tmp.dir")));
+    }
+  }
+
+  @SuppressWarnings("deprecation")
+  static class ClassificatorRowReduce extends MapReduceBase implements
+      TableReduce<ImmutableBytesWritable, Put> {
+
+    @Override
+    public void reduce(ImmutableBytesWritable key, Iterator<Put> values,
+        OutputCollector<ImmutableBytesWritable, Put> output, Reporter reporter)
+        throws IOException {
+      String strKey = Bytes.toString(key.get());
+      List<Put> result = new ArrayList<Put>();
+      while (values.hasNext())
+        result.add(values.next());
+
+      if (relation.keySet().contains(strKey)) {
+        Set<String> set = relation.get(strKey);
+        if (set != null) {
+          assertEquals(set.size(), result.size());
+        } else {
+          throwAccertionError("Test infrastructure error: set is null");
+        }
+      } else {
+        throwAccertionError("Test infrastructure error: key not found in map");
+      }
+    }
+
+    private void throwAccertionError(String errorMessage) throws AssertionError {
+      throw new AssertionError(errorMessage);
+    }
+  }
+
+  @SuppressWarnings("deprecation")
+  static class ClassificatorMapper extends MapReduceBase implements
+      TableMap<ImmutableBytesWritable, Put> {
+
+    @Override
+    public void map(ImmutableBytesWritable row, Result result,
+        OutputCollector<ImmutableBytesWritable, Put> outCollector,
+        Reporter reporter) throws IOException {
+      String rowKey = Bytes.toString(result.getRow());
+      final ImmutableBytesWritable pKey = new ImmutableBytesWritable(
+          Bytes.toBytes(PRESIDENT_PATTERN));
+      final ImmutableBytesWritable aKey = new ImmutableBytesWritable(
+          Bytes.toBytes(ACTOR_PATTERN));
+      ImmutableBytesWritable outKey = null;
+
+      if (rowKey.startsWith(PRESIDENT_PATTERN)) {
+        outKey = pKey;
+      } else if (rowKey.startsWith(ACTOR_PATTERN)) {
+        outKey = aKey;
+      } else {
+        throw new AssertionError("unexpected rowKey");
+      }
+
+      String name = Bytes.toString(result.getValue(COLUMN_FAMILY,
+          COLUMN_QUALIFIER));
+      outCollector.collect(outKey, new Put(Bytes.toBytes("rowKey2")).add(
+          COLUMN_FAMILY, COLUMN_QUALIFIER, Bytes.toBytes(name)));
+    }
+  }
+}