You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by te...@apache.org on 2012/04/02 15:50:03 UTC
svn commit: r1308353 - in /hbase/trunk/src:
main/java/org/apache/hadoop/hbase/mapreduce/
test/java/org/apache/hadoop/hbase/mapred/
test/java/org/apache/hadoop/hbase/mapreduce/
Author: tedyu
Date: Mon Apr 2 13:50:03 2012
New Revision: 1308353
URL: http://svn.apache.org/viewvc?rev=1308353&view=rev
Log:
HBASE-5663 HBASE-5636 MultithreadedTableMapper doesn't work (Takuya Ueshin)
Added:
hbase/trunk/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultithreadedTableMapper.java
Removed:
hbase/trunk/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMulitthreadedTableMapper.java
Modified:
hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/MultithreadedTableMapper.java
hbase/trunk/src/test/java/org/apache/hadoop/hbase/mapred/TestTableMapReduce.java
hbase/trunk/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduce.java
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/MultithreadedTableMapper.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/MultithreadedTableMapper.java?rev=1308353&r1=1308352&r2=1308353&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/MultithreadedTableMapper.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/MultithreadedTableMapper.java Mon Apr 2 13:50:03 2012
@@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.mapreduc
import java.io.IOException;
import java.lang.reflect.Constructor;
+import java.lang.reflect.Method;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -31,11 +32,14 @@ import org.apache.hadoop.mapreduce.Count
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.MapContext;
import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.StatusReporter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.util.ReflectionUtils;
@@ -239,15 +243,17 @@ public class MultithreadedTableMapper<K2
context.getConfiguration());
try {
Constructor c = context.getClass().getConstructor(
+ Mapper.class,
Configuration.class,
- outer.getTaskAttemptID().getClass(),
- SubMapRecordReader.class,
- SubMapRecordWriter.class,
- context.getOutputCommitter().getClass(),
- SubMapStatusReporter.class,
- outer.getInputSplit().getClass());
+ TaskAttemptID.class,
+ RecordReader.class,
+ RecordWriter.class,
+ OutputCommitter.class,
+ StatusReporter.class,
+ InputSplit.class);
c.setAccessible(true);
subcontext = (Context) c.newInstance(
+ mapper,
outer.getConfiguration(),
outer.getTaskAttemptID(),
new SubMapRecordReader(),
@@ -256,8 +262,31 @@ public class MultithreadedTableMapper<K2
new SubMapStatusReporter(),
outer.getInputSplit());
} catch (Exception e) {
- // rethrow as IOE
- throw new IOException(e);
+ try {
+ Constructor c = Class.forName("org.apache.hadoop.mapreduce.task.MapContextImpl").getConstructor(
+ Configuration.class,
+ TaskAttemptID.class,
+ RecordReader.class,
+ RecordWriter.class,
+ OutputCommitter.class,
+ StatusReporter.class,
+ InputSplit.class);
+ c.setAccessible(true);
+ MapContext mc = (MapContext) c.newInstance(
+ outer.getConfiguration(),
+ outer.getTaskAttemptID(),
+ new SubMapRecordReader(),
+ new SubMapRecordWriter(),
+ context.getOutputCommitter(),
+ new SubMapStatusReporter(),
+ outer.getInputSplit());
+ Class<?> wrappedMapperClass = Class.forName("org.apache.hadoop.mapreduce.lib.map.WrappedMapper");
+ Method getMapContext = wrappedMapperClass.getMethod("getMapContext", MapContext.class);
+ subcontext = (Context) getMapContext.invoke(wrappedMapperClass.newInstance(), mc);
+ } catch (Exception ee) {
+ // rethrow as IOE
+ throw new IOException(e);
+ }
}
}
@@ -270,4 +299,4 @@ public class MultithreadedTableMapper<K2
}
}
}
-}
\ No newline at end of file
+}
Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/mapred/TestTableMapReduce.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/mapred/TestTableMapReduce.java?rev=1308353&r1=1308352&r2=1308353&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/mapred/TestTableMapReduce.java (original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/mapred/TestTableMapReduce.java Mon Apr 2 13:50:03 2012
@@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.mapred;
import java.io.File;
import java.io.IOException;
+import java.util.Iterator;
import java.util.Map;
import java.util.NavigableMap;
@@ -28,7 +29,6 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.hbase.*;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
@@ -42,11 +42,15 @@ import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.RunningJob;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
+import static org.junit.Assert.fail;
+import static org.junit.Assert.assertTrue;
+
/**
* Test Map/Reduce job over HBase tables. The map/reduce process we're testing
* on our tables is simple - take every row in the table, reverse the value of
@@ -58,7 +62,7 @@ public class TestTableMapReduce {
LogFactory.getLog(TestTableMapReduce.class.getName());
private static final HBaseTestingUtility UTIL =
new HBaseTestingUtility();
- static final String MULTI_REGION_TABLE_NAME = "mrtest";
+ static final byte[] MULTI_REGION_TABLE_NAME = Bytes.toBytes("mrtest");
static final byte[] INPUT_FAMILY = Bytes.toBytes("contents");
static final byte[] OUTPUT_FAMILY = Bytes.toBytes("text");
@@ -69,12 +73,10 @@ public class TestTableMapReduce {
@BeforeClass
public static void beforeClass() throws Exception {
- HTableDescriptor desc = new HTableDescriptor(MULTI_REGION_TABLE_NAME);
- desc.addFamily(new HColumnDescriptor(INPUT_FAMILY));
- desc.addFamily(new HColumnDescriptor(OUTPUT_FAMILY));
UTIL.startMiniCluster();
- HBaseAdmin admin = new HBaseAdmin(UTIL.getConfiguration());
- admin.createTable(desc, HBaseTestingUtility.KEYS);
+ HTable table = UTIL.createTable(MULTI_REGION_TABLE_NAME, new byte[][] {INPUT_FAMILY, OUTPUT_FAMILY});
+ UTIL.createMultiRegions(table, INPUT_FAMILY);
+ UTIL.loadTable(table, INPUT_FAMILY);
UTIL.startMiniMapReduceCluster();
}
@@ -150,7 +152,8 @@ public class TestTableMapReduce {
IdentityTableReduce.class, jobConf);
LOG.info("Started " + Bytes.toString(table.getTableName()));
- JobClient.runJob(jobConf);
+ RunningJob job = JobClient.runJob(jobConf);
+ assertTrue(job.isSuccessful());
LOG.info("After map/reduce completion");
// verify map-reduce results
@@ -184,7 +187,7 @@ public class TestTableMapReduce {
// continue
}
}
- org.junit.Assert.assertTrue(verified);
+ assertTrue(verified);
}
/**
@@ -199,7 +202,10 @@ public class TestTableMapReduce {
TableInputFormat.addColumns(scan, columns);
ResultScanner scanner = table.getScanner(scan);
try {
- for (Result r : scanner) {
+ Iterator<Result> itr = scanner.iterator();
+ assertTrue(itr.hasNext());
+ while(itr.hasNext()) {
+ Result r = itr.next();
if (LOG.isDebugEnabled()) {
if (r.size() > 2 ) {
throw new IOException("Too many results, expected 2 got " +
@@ -247,7 +253,7 @@ public class TestTableMapReduce {
r.getRow() + ", first value=" + first + ", second value=" +
second);
}
- org.junit.Assert.fail();
+ fail();
}
}
} finally {
Added: hbase/trunk/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultithreadedTableMapper.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultithreadedTableMapper.java?rev=1308353&view=auto
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultithreadedTableMapper.java (added)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultithreadedTableMapper.java Mon Apr 2 13:50:03 2012
@@ -0,0 +1,261 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.NavigableMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.*;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import static org.junit.Assert.fail;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Test Map/Reduce job over HBase tables. The map/reduce process we're testing
+ * on our tables is simple - take every row in the table, reverse the value of
+ * a particular cell, and write it back to the table.
+ */
+@Category(LargeTests.class)
+public class TestMultithreadedTableMapper {
+ private static final Log LOG = LogFactory.getLog(TestMultithreadedTableMapper.class);
+ private static final HBaseTestingUtility UTIL =
+ new HBaseTestingUtility();
+ static final byte[] MULTI_REGION_TABLE_NAME = Bytes.toBytes("mrtest");
+ static final byte[] INPUT_FAMILY = Bytes.toBytes("contents");
+ static final byte[] OUTPUT_FAMILY = Bytes.toBytes("text");
+ static final int NUMBER_OF_THREADS = 10;
+
+ @BeforeClass
+ public static void beforeClass() throws Exception {
+ UTIL.startMiniCluster();
+ HTable table = UTIL.createTable(MULTI_REGION_TABLE_NAME, new byte[][] {INPUT_FAMILY, OUTPUT_FAMILY});
+ UTIL.createMultiRegions(table, INPUT_FAMILY);
+ UTIL.loadTable(table, INPUT_FAMILY);
+ UTIL.startMiniMapReduceCluster();
+ }
+
+ @AfterClass
+ public static void afterClass() throws Exception {
+ UTIL.shutdownMiniMapReduceCluster();
+ UTIL.shutdownMiniCluster();
+ }
+
+ /**
+ * Pass the given key and processed record reduce
+ */
+ public static class ProcessContentsMapper
+ extends TableMapper<ImmutableBytesWritable, Put> {
+
+ /**
+ * Pass the key, and reversed value to reduce
+ *
+ * @param key
+ * @param value
+ * @param context
+ * @throws IOException
+ */
+ public void map(ImmutableBytesWritable key, Result value,
+ Context context)
+ throws IOException, InterruptedException {
+ if (value.size() != 1) {
+ throw new IOException("There should only be one input column");
+ }
+ Map<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>>
+ cf = value.getMap();
+ if(!cf.containsKey(INPUT_FAMILY)) {
+ throw new IOException("Wrong input columns. Missing: '" +
+ Bytes.toString(INPUT_FAMILY) + "'.");
+ }
+ // Get the original value and reverse it
+ String originalValue = new String(value.getValue(INPUT_FAMILY, null),
+ HConstants.UTF8_ENCODING);
+ StringBuilder newValue = new StringBuilder(originalValue);
+ newValue.reverse();
+ // Now set the value to be collected
+ Put outval = new Put(key.get());
+ outval.add(OUTPUT_FAMILY, null, Bytes.toBytes(newValue.toString()));
+ context.write(key, outval);
+ }
+ }
+
+ /**
+ * Test multithreadedTableMappper map/reduce against a multi-region table
+ * @throws IOException
+ * @throws ClassNotFoundException
+ * @throws InterruptedException
+ */
+ @Test
+ public void testMultithreadedTableMapper()
+ throws IOException, InterruptedException, ClassNotFoundException {
+ runTestOnTable(new HTable(new Configuration(UTIL.getConfiguration()),
+ MULTI_REGION_TABLE_NAME));
+ }
+
+ private void runTestOnTable(HTable table)
+ throws IOException, InterruptedException, ClassNotFoundException {
+ Job job = null;
+ try {
+ LOG.info("Before map/reduce startup");
+ job = new Job(table.getConfiguration(), "process column contents");
+ job.setNumReduceTasks(1);
+ Scan scan = new Scan();
+ scan.addFamily(INPUT_FAMILY);
+ TableMapReduceUtil.initTableMapperJob(
+ Bytes.toString(table.getTableName()), scan,
+ MultithreadedTableMapper.class, ImmutableBytesWritable.class,
+ Put.class, job);
+ MultithreadedTableMapper.setMapperClass(job, ProcessContentsMapper.class);
+ MultithreadedTableMapper.setNumberOfThreads(job, NUMBER_OF_THREADS);
+ TableMapReduceUtil.initTableReducerJob(
+ Bytes.toString(table.getTableName()),
+ IdentityTableReducer.class, job);
+ FileOutputFormat.setOutputPath(job, new Path("test"));
+ LOG.info("Started " + Bytes.toString(table.getTableName()));
+ assertTrue(job.waitForCompletion(true));
+ LOG.info("After map/reduce completion");
+ // verify map-reduce results
+ verify(Bytes.toString(table.getTableName()));
+ } finally {
+ table.close();
+ if (job != null) {
+ FileUtil.fullyDelete(
+ new File(job.getConfiguration().get("hadoop.tmp.dir")));
+ }
+ }
+ }
+
+ private void verify(String tableName) throws IOException {
+ HTable table = new HTable(new Configuration(UTIL.getConfiguration()), tableName);
+ boolean verified = false;
+ long pause = UTIL.getConfiguration().getLong("hbase.client.pause", 5 * 1000);
+ int numRetries = UTIL.getConfiguration().getInt("hbase.client.retries.number", 5);
+ for (int i = 0; i < numRetries; i++) {
+ try {
+ LOG.info("Verification attempt #" + i);
+ verifyAttempt(table);
+ verified = true;
+ break;
+ } catch (NullPointerException e) {
+ // If here, a cell was empty. Presume its because updates came in
+ // after the scanner had been opened. Wait a while and retry.
+ LOG.debug("Verification attempt failed: " + e.getMessage());
+ }
+ try {
+ Thread.sleep(pause);
+ } catch (InterruptedException e) {
+ // continue
+ }
+ }
+ assertTrue(verified);
+ table.close();
+ }
+
+ /**
+ * Looks at every value of the mapreduce output and verifies that indeed
+ * the values have been reversed.
+ *
+ * @param table Table to scan.
+ * @throws IOException
+ * @throws NullPointerException if we failed to find a cell value
+ */
+ private void verifyAttempt(final HTable table)
+ throws IOException, NullPointerException {
+ Scan scan = new Scan();
+ scan.addFamily(INPUT_FAMILY);
+ scan.addFamily(OUTPUT_FAMILY);
+ ResultScanner scanner = table.getScanner(scan);
+ try {
+ Iterator<Result> itr = scanner.iterator();
+ assertTrue(itr.hasNext());
+ while(itr.hasNext()) {
+ Result r = itr.next();
+ if (LOG.isDebugEnabled()) {
+ if (r.size() > 2 ) {
+ throw new IOException("Too many results, expected 2 got " +
+ r.size());
+ }
+ }
+ byte[] firstValue = null;
+ byte[] secondValue = null;
+ int count = 0;
+ for(KeyValue kv : r.list()) {
+ if (count == 0) {
+ firstValue = kv.getValue();
+ }else if (count == 1) {
+ secondValue = kv.getValue();
+ }else if (count == 2) {
+ break;
+ }
+ count++;
+ }
+ String first = "";
+ if (firstValue == null) {
+ throw new NullPointerException(Bytes.toString(r.getRow()) +
+ ": first value is null");
+ }
+ first = new String(firstValue, HConstants.UTF8_ENCODING);
+ String second = "";
+ if (secondValue == null) {
+ throw new NullPointerException(Bytes.toString(r.getRow()) +
+ ": second value is null");
+ }
+ byte[] secondReversed = new byte[secondValue.length];
+ for (int i = 0, j = secondValue.length - 1; j >= 0; j--, i++) {
+ secondReversed[i] = secondValue[j];
+ }
+ second = new String(secondReversed, HConstants.UTF8_ENCODING);
+ if (first.compareTo(second) != 0) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("second key is not the reverse of first. row=" +
+ Bytes.toStringBinary(r.getRow()) + ", first value=" + first +
+ ", second value=" + second);
+ }
+ fail();
+ }
+ }
+ } finally {
+ scanner.close();
+ }
+ }
+
+ @org.junit.Rule
+ public org.apache.hadoop.hbase.ResourceCheckerJUnitRule cu =
+ new org.apache.hadoop.hbase.ResourceCheckerJUnitRule();
+}
+
Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduce.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduce.java?rev=1308353&r1=1308352&r2=1308353&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduce.java (original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduce.java Mon Apr 2 13:50:03 2012
@@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.mapreduc
import java.io.File;
import java.io.IOException;
+import java.util.Iterator;
import java.util.Map;
import java.util.NavigableMap;
@@ -30,7 +31,6 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.*;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
@@ -59,18 +59,16 @@ public class TestTableMapReduce {
private static final Log LOG = LogFactory.getLog(TestTableMapReduce.class);
private static final HBaseTestingUtility UTIL =
new HBaseTestingUtility();
- static final String MULTI_REGION_TABLE_NAME = "mrtest";
+ static final byte[] MULTI_REGION_TABLE_NAME = Bytes.toBytes("mrtest");
static final byte[] INPUT_FAMILY = Bytes.toBytes("contents");
static final byte[] OUTPUT_FAMILY = Bytes.toBytes("text");
@BeforeClass
public static void beforeClass() throws Exception {
- HTableDescriptor desc = new HTableDescriptor(MULTI_REGION_TABLE_NAME);
- desc.addFamily(new HColumnDescriptor(INPUT_FAMILY));
- desc.addFamily(new HColumnDescriptor(OUTPUT_FAMILY));
UTIL.startMiniCluster();
- HBaseAdmin admin = new HBaseAdmin(UTIL.getConfiguration());
- admin.createTable(desc, HBaseTestingUtility.KEYS);
+ HTable table = UTIL.createTable(MULTI_REGION_TABLE_NAME, new byte[][] {INPUT_FAMILY, OUTPUT_FAMILY});
+ UTIL.createMultiRegions(table, INPUT_FAMILY);
+ UTIL.loadTable(table, INPUT_FAMILY);
UTIL.startMiniMapReduceCluster();
}
@@ -150,7 +148,7 @@ public class TestTableMapReduce {
IdentityTableReducer.class, job);
FileOutputFormat.setOutputPath(job, new Path("test"));
LOG.info("Started " + Bytes.toString(table.getTableName()));
- job.waitForCompletion(true);
+ assertTrue(job.waitForCompletion(true));
LOG.info("After map/reduce completion");
// verify map-reduce results
@@ -204,7 +202,10 @@ public class TestTableMapReduce {
scan.addFamily(OUTPUT_FAMILY);
ResultScanner scanner = table.getScanner(scan);
try {
- for (Result r : scanner) {
+ Iterator<Result> itr = scanner.iterator();
+ assertTrue(itr.hasNext());
+ while(itr.hasNext()) {
+ Result r = itr.next();
if (LOG.isDebugEnabled()) {
if (r.size() > 2 ) {
throw new IOException("Too many results, expected 2 got " +