You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by te...@apache.org on 2013/04/15 22:56:08 UTC
svn commit: r1468223 - in /hbase/trunk/hbase-server/src:
main/java/org/apache/hadoop/hbase/mapreduce/
test/java/org/apache/hadoop/hbase/mapreduce/
Author: tedyu
Date: Mon Apr 15 20:56:08 2013
New Revision: 1468223
URL: http://svn.apache.org/r1468223
Log:
HBASE-8326 mapreduce.TestTableInputFormatScan times out frequently (Nick Dimiduk)
Added:
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatScan1.java
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatScan2.java
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatScanBase.java
Removed:
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatScan.java
Modified:
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java?rev=1468223&r1=1468222&r2=1468223&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java Mon Apr 15 20:56:08 2013
@@ -25,9 +25,13 @@ import java.net.URL;
import java.net.URLDecoder;
import java.util.ArrayList;
import java.util.Enumeration;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.Set;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipFile;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -545,32 +549,33 @@ public class TableMapReduceUtil {
* the DistributedCache.
*/
public static void addDependencyJars(Configuration conf,
- Class... classes) throws IOException {
+ Class<?>... classes) throws IOException {
FileSystem localFs = FileSystem.getLocal(conf);
-
Set<String> jars = new HashSet<String>();
-
// Add jars that are already in the tmpjars variable
- jars.addAll( conf.getStringCollection("tmpjars") );
+ jars.addAll(conf.getStringCollection("tmpjars"));
+
+ // add jars as we find them to a map of contents jar name so that we can avoid
+ // creating new jars for classes that have already been packaged.
+ Map<String, String> packagedClasses = new HashMap<String, String>();
// Add jars containing the specified classes
- for (Class clazz : classes) {
+ for (Class<?> clazz : classes) {
if (clazz == null) continue;
- String pathStr = findOrCreateJar(clazz);
- if (pathStr == null) {
+ Path path = findOrCreateJar(clazz, localFs, packagedClasses);
+ if (path == null) {
LOG.warn("Could not find jar for class " + clazz +
" in order to ship it to the cluster.");
continue;
}
- Path path = new Path(pathStr);
if (!localFs.exists(path)) {
LOG.warn("Could not validate jar file " + path + " for class "
+ clazz);
continue;
}
- jars.add(path.makeQualified(localFs).toString());
+ jars.add(path.toString());
}
if (jars.isEmpty()) return;
@@ -584,17 +589,22 @@ public class TableMapReduceUtil {
* a directory in the classpath, it creates a Jar on the fly with the
* contents of the directory and returns the path to that Jar. If a Jar is
* created, it is created in the system temporary directory. Otherwise,
- * returns an existing jar that contains a class of the same name.
+ * returns an existing jar that contains a class of the same name. Maintains
+ * a mapping from jar contents to the tmp jar created.
* @param my_class the class to find.
+ * @param fs the FileSystem with which to qualify the returned path.
+ * @param packagedClasses a map of class name to path.
* @return a jar file that contains the class.
* @throws IOException
*/
- private static String findOrCreateJar(Class<?> my_class)
+ private static Path findOrCreateJar(Class<?> my_class, FileSystem fs,
+ Map<String, String> packagedClasses)
throws IOException {
// attempt to locate an existing jar for the class.
- String jar = findContainingJar(my_class);
+ String jar = findContainingJar(my_class, packagedClasses);
if (null == jar || jar.isEmpty()) {
jar = getJar(my_class);
+ updateMap(jar, packagedClasses);
}
if (null == jar || jar.isEmpty()) {
@@ -602,23 +612,45 @@ public class TableMapReduceUtil {
}
LOG.debug(String.format("For class %s, using jar %s", my_class.getName(), jar));
- return jar;
+ return new Path(jar).makeQualified(fs);
}
/**
- * Find a jar that contains a class of the same name, if any.
- * It will return a jar file, even if that is not the first thing
- * on the class path that has a class with the same name.
- *
- * This is shamelessly copied from JobConf
- *
+ * Add entries to <code>packagedClasses</code> corresponding to class files
+ * contained in <code>jar</code>.
+ * @param jar The jar who's content to list.
+ * @param packagedClasses map[class -> jar]
+ */
+ private static void updateMap(String jar, Map<String, String> packagedClasses) throws IOException {
+ ZipFile zip = null;
+ try {
+ zip = new ZipFile(jar);
+ for (Enumeration<? extends ZipEntry> iter = zip.entries(); iter.hasMoreElements();) {
+ ZipEntry entry = iter.nextElement();
+ if (entry.getName().endsWith("class")) {
+ packagedClasses.put(entry.getName(), jar);
+ }
+ }
+ } finally {
+ if (null != zip) zip.close();
+ }
+ }
+
+ /**
+ * Find a jar that contains a class of the same name, if any. It will return
+ * a jar file, even if that is not the first thing on the class path that
+ * has a class with the same name. Looks first on the classpath and then in
+ * the <code>packagedClasses</code> map.
* @param my_class the class to find.
* @return a jar file that contains the class, or null.
* @throws IOException
*/
- private static String findContainingJar(Class<?> my_class) throws IOException {
+ private static String findContainingJar(Class<?> my_class, Map<String, String> packagedClasses)
+ throws IOException {
ClassLoader loader = my_class.getClassLoader();
String class_file = my_class.getName().replaceAll("\\.", "/") + ".class";
+
+ // first search the classpath
for (Enumeration<URL> itr = loader.getResources(class_file); itr.hasMoreElements();) {
URL url = itr.nextElement();
if ("jar".equals(url.getProtocol())) {
@@ -637,14 +669,18 @@ public class TableMapReduceUtil {
return toReturn.replaceAll("!.*$", "");
}
}
+
+ // now look in any jars we've packaged using JarFinder
+ for (Map.Entry<String, String> e : packagedClasses.entrySet()) {
+ if (e.getKey().equals(class_file)) return e.getValue();
+ }
return null;
}
/**
- * Invoke 'getJar' on a JarFinder implementation. Useful for some job configuration
- * contexts (HBASE-8140) and also for testing on MRv2. First check if we have
- * HADOOP-9426. Lacking that, fall back to the backport.
- *
+ * Invoke 'getJar' on a JarFinder implementation. Useful for some job
+ * configuration contexts (HBASE-8140) and also for testing on MRv2. First
+ * check if we have HADOOP-9426. Lacking that, fall back to the backport.
* @param my_class the class to find.
* @return a jar file that contains the class, or null.
*/
Added: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatScan1.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatScan1.java?rev=1468223&view=auto
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatScan1.java (added)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatScan1.java Mon Apr 15 20:56:08 2013
@@ -0,0 +1,99 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.LargeTests;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+/**
+ * TestTableInputFormatScan part 1.
+ * @see TestTableInputFormatScanBase
+ */
+@Category(LargeTests.class)
+public class TestTableInputFormatScan1 extends TestTableInputFormatScanBase {
+
+ /**
+ * Tests a MR scan using specific start and stop rows.
+ *
+ * @throws IOException
+ * @throws ClassNotFoundException
+ * @throws InterruptedException
+ */
+ @Test
+ public void testScanEmptyToEmpty()
+ throws IOException, InterruptedException, ClassNotFoundException {
+ testScan(null, null, null);
+ }
+
+ /**
+ * Tests a MR scan using specific start and stop rows.
+ *
+ * @throws IOException
+ * @throws ClassNotFoundException
+ * @throws InterruptedException
+ */
+ @Test
+ public void testScanEmptyToAPP()
+ throws IOException, InterruptedException, ClassNotFoundException {
+ testScan(null, "app", "apo");
+ }
+
+ /**
+ * Tests a MR scan using specific start and stop rows.
+ *
+ * @throws IOException
+ * @throws ClassNotFoundException
+ * @throws InterruptedException
+ */
+ @Test
+ public void testScanEmptyToBBA()
+ throws IOException, InterruptedException, ClassNotFoundException {
+ testScan(null, "bba", "baz");
+ }
+
+ /**
+ * Tests a MR scan using specific start and stop rows.
+ *
+ * @throws IOException
+ * @throws ClassNotFoundException
+ * @throws InterruptedException
+ */
+ @Test
+ public void testScanEmptyToBBB()
+ throws IOException, InterruptedException, ClassNotFoundException {
+ testScan(null, "bbb", "bba");
+ }
+
+ /**
+ * Tests a MR scan using specific start and stop rows.
+ *
+ * @throws IOException
+ * @throws ClassNotFoundException
+ * @throws InterruptedException
+ */
+ @Test
+ public void testScanEmptyToOPP()
+ throws IOException, InterruptedException, ClassNotFoundException {
+ testScan(null, "opp", "opo");
+ }
+
+}
Added: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatScan2.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatScan2.java?rev=1468223&view=auto
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatScan2.java (added)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatScan2.java Mon Apr 15 20:56:08 2013
@@ -0,0 +1,117 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.LargeTests;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+/**
+ * TestTableInputFormatScan part 2.
+ * @see TestTableInputFormatScanBase
+ */
+@Category(LargeTests.class)
+public class TestTableInputFormatScan2 extends TestTableInputFormatScanBase {
+
+ /**
+ * Tests a MR scan using specific start and stop rows.
+ *
+ * @throws IOException
+ * @throws ClassNotFoundException
+ * @throws InterruptedException
+ */
+ @Test
+ public void testScanOBBToOPP()
+ throws IOException, InterruptedException, ClassNotFoundException {
+ testScan("obb", "opp", "opo");
+ }
+
+ /**
+ * Tests a MR scan using specific start and stop rows.
+ *
+ * @throws IOException
+ * @throws ClassNotFoundException
+ * @throws InterruptedException
+ */
+ @Test
+ public void testScanOBBToQPP()
+ throws IOException, InterruptedException, ClassNotFoundException {
+ testScan("obb", "qpp", "qpo");
+ }
+
+ /**
+ * Tests a MR scan using specific start and stop rows.
+ *
+ * @throws IOException
+ * @throws ClassNotFoundException
+ * @throws InterruptedException
+ */
+ @Test
+ public void testScanOPPToEmpty()
+ throws IOException, InterruptedException, ClassNotFoundException {
+ testScan("opp", null, "zzz");
+ }
+
+ /**
+ * Tests a MR scan using specific start and stop rows.
+ *
+ * @throws IOException
+ * @throws ClassNotFoundException
+ * @throws InterruptedException
+ */
+ @Test
+ public void testScanYYXToEmpty()
+ throws IOException, InterruptedException, ClassNotFoundException {
+ testScan("yyx", null, "zzz");
+ }
+
+ /**
+ * Tests a MR scan using specific start and stop rows.
+ *
+ * @throws IOException
+ * @throws ClassNotFoundException
+ * @throws InterruptedException
+ */
+ @Test
+ public void testScanYYYToEmpty()
+ throws IOException, InterruptedException, ClassNotFoundException {
+ testScan("yyy", null, "zzz");
+ }
+
+ /**
+ * Tests a MR scan using specific start and stop rows.
+ *
+ * @throws IOException
+ * @throws ClassNotFoundException
+ * @throws InterruptedException
+ */
+ @Test
+ public void testScanYZYToEmpty()
+ throws IOException, InterruptedException, ClassNotFoundException {
+ testScan("yzy", null, "zzz");
+ }
+
+ @Test
+ public void testScanFromConfiguration()
+ throws IOException, InterruptedException, ClassNotFoundException {
+ testScanFromConfiguration("bba", "bbd", "bbc");
+ }
+}
Added: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatScanBase.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatScanBase.java?rev=1468223&view=auto
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatScanBase.java (added)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatScanBase.java Mon Apr 15 20:56:08 2013
@@ -0,0 +1,239 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.NavigableMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+
+/**
+ * <p>
+ * Tests various scan start and stop row scenarios. This is set in a scan and
+ * tested in a MapReduce job to see if that is handed over and done properly
+ * too.
+ * </p>
+ * <p>
+ * This test is broken into two parts in order to side-step the test timeout
+ * period of 900, as documented in HBASE-8326.
+ * </p>
+ */
+public abstract class TestTableInputFormatScanBase {
+
+ static final Log LOG = LogFactory.getLog(TestTableInputFormatScanBase.class);
+ static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+
+ static final byte[] TABLE_NAME = Bytes.toBytes("scantest");
+ static final byte[] INPUT_FAMILY = Bytes.toBytes("contents");
+ static final String KEY_STARTROW = "startRow";
+ static final String KEY_LASTROW = "stpRow";
+
+ private static HTable table = null;
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ // switch TIF to log at DEBUG level
+ TEST_UTIL.enableDebug(TableInputFormat.class);
+ TEST_UTIL.enableDebug(TableInputFormatBase.class);
+ // start mini hbase cluster
+ TEST_UTIL.startMiniCluster(3);
+ // create and fill table
+ table = TEST_UTIL.createTable(TABLE_NAME, INPUT_FAMILY);
+ TEST_UTIL.createMultiRegions(table, INPUT_FAMILY);
+ TEST_UTIL.loadTable(table, INPUT_FAMILY);
+ // start MR cluster
+ TEST_UTIL.startMiniMapReduceCluster();
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ TEST_UTIL.shutdownMiniMapReduceCluster();
+ TEST_UTIL.shutdownMiniCluster();
+ }
+
+ /**
+ * Pass the key and value to reduce.
+ */
+ public static class ScanMapper
+ extends TableMapper<ImmutableBytesWritable, ImmutableBytesWritable> {
+
+ /**
+ * Pass the key and value to reduce.
+ *
+ * @param key The key, here "aaa", "aab" etc.
+ * @param value The value is the same as the key.
+ * @param context The task context.
+ * @throws IOException When reading the rows fails.
+ */
+ @Override
+ public void map(ImmutableBytesWritable key, Result value,
+ Context context)
+ throws IOException, InterruptedException {
+ if (value.size() != 1) {
+ throw new IOException("There should only be one input column");
+ }
+ Map<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>>
+ cf = value.getMap();
+ if(!cf.containsKey(INPUT_FAMILY)) {
+ throw new IOException("Wrong input columns. Missing: '" +
+ Bytes.toString(INPUT_FAMILY) + "'.");
+ }
+ String val = Bytes.toStringBinary(value.getValue(INPUT_FAMILY, null));
+ LOG.info("map: key -> " + Bytes.toStringBinary(key.get()) +
+ ", value -> " + val);
+ context.write(key, key);
+ }
+
+ }
+
+ /**
+ * Checks the last and first key seen against the scanner boundaries.
+ */
+ public static class ScanReducer
+ extends Reducer<ImmutableBytesWritable, ImmutableBytesWritable,
+ NullWritable, NullWritable> {
+
+ private String first = null;
+ private String last = null;
+
+ protected void reduce(ImmutableBytesWritable key,
+ Iterable<ImmutableBytesWritable> values, Context context)
+ throws IOException ,InterruptedException {
+ int count = 0;
+ for (ImmutableBytesWritable value : values) {
+ String val = Bytes.toStringBinary(value.get());
+ LOG.info("reduce: key[" + count + "] -> " +
+ Bytes.toStringBinary(key.get()) + ", value -> " + val);
+ if (first == null) first = val;
+ last = val;
+ count++;
+ }
+ }
+
+ protected void cleanup(Context context)
+ throws IOException, InterruptedException {
+ Configuration c = context.getConfiguration();
+ String startRow = c.get(KEY_STARTROW);
+ String lastRow = c.get(KEY_LASTROW);
+ LOG.info("cleanup: first -> \"" + first + "\", start row -> \"" + startRow + "\"");
+ LOG.info("cleanup: last -> \"" + last + "\", last row -> \"" + lastRow + "\"");
+ if (startRow != null && startRow.length() > 0) {
+ assertEquals(startRow, first);
+ }
+ if (lastRow != null && lastRow.length() > 0) {
+ assertEquals(lastRow, last);
+ }
+ }
+
+ }
+
+ /**
+ * Tests an MR Scan initialized from properties set in the Configuration.
+ *
+ * @throws IOException
+ * @throws ClassNotFoundException
+ * @throws InterruptedException
+ */
+ protected void testScanFromConfiguration(String start, String stop, String last)
+ throws IOException, InterruptedException, ClassNotFoundException {
+ String jobName = "ScanFromConfig" + (start != null ? start.toUpperCase() : "Empty") +
+ "To" + (stop != null ? stop.toUpperCase() : "Empty");
+ Configuration c = new Configuration(TEST_UTIL.getConfiguration());
+ c.set(TableInputFormat.INPUT_TABLE, Bytes.toString(TABLE_NAME));
+ c.set(TableInputFormat.SCAN_COLUMN_FAMILY, Bytes.toString(INPUT_FAMILY));
+ c.set(KEY_STARTROW, start != null ? start : "");
+ c.set(KEY_LASTROW, last != null ? last : "");
+
+ if (start != null) {
+ c.set(TableInputFormat.SCAN_ROW_START, start);
+ }
+
+ if (stop != null) {
+ c.set(TableInputFormat.SCAN_ROW_STOP, stop);
+ }
+
+ Job job = new Job(c, jobName);
+ job.setMapperClass(ScanMapper.class);
+ job.setReducerClass(ScanReducer.class);
+ job.setMapOutputKeyClass(ImmutableBytesWritable.class);
+ job.setMapOutputValueClass(ImmutableBytesWritable.class);
+ job.setInputFormatClass(TableInputFormat.class);
+ job.setNumReduceTasks(1);
+ FileOutputFormat.setOutputPath(job, new Path(job.getJobName()));
+ TableMapReduceUtil.addDependencyJars(job);
+ assertTrue(job.waitForCompletion(true));
+ }
+
+ /**
+ * Tests a MR scan using specific start and stop rows.
+ *
+ * @throws IOException
+ * @throws ClassNotFoundException
+ * @throws InterruptedException
+ */
+ protected void testScan(String start, String stop, String last)
+ throws IOException, InterruptedException, ClassNotFoundException {
+ String jobName = "Scan" + (start != null ? start.toUpperCase() : "Empty") +
+ "To" + (stop != null ? stop.toUpperCase() : "Empty");
+ LOG.info("Before map/reduce startup - job " + jobName);
+ Configuration c = new Configuration(TEST_UTIL.getConfiguration());
+ Scan scan = new Scan();
+ scan.addFamily(INPUT_FAMILY);
+ if (start != null) {
+ scan.setStartRow(Bytes.toBytes(start));
+ }
+ c.set(KEY_STARTROW, start != null ? start : "");
+ if (stop != null) {
+ scan.setStopRow(Bytes.toBytes(stop));
+ }
+ c.set(KEY_LASTROW, last != null ? last : "");
+ LOG.info("scan before: " + scan);
+ Job job = new Job(c, jobName);
+ TableMapReduceUtil.initTableMapperJob(
+ Bytes.toString(TABLE_NAME), scan, ScanMapper.class,
+ ImmutableBytesWritable.class, ImmutableBytesWritable.class, job);
+ job.setReducerClass(ScanReducer.class);
+ job.setNumReduceTasks(1); // one to get final "first" and "last" key
+ FileOutputFormat.setOutputPath(job, new Path(job.getJobName()));
+ LOG.info("Started " + job.getJobName());
+ assertTrue(job.waitForCompletion(true));
+ LOG.info("After map/reduce completion - job " + jobName);
+ }
+
+}
+