You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by dd...@apache.org on 2013/03/26 21:47:19 UTC

svn commit: r1461303 - /hbase/trunk/hbase-it/src/test/java/org/apache/hadoop/hbase/mapreduce/IntegrationTestImportTsv.java

Author: ddas
Date: Tue Mar 26 20:47:19 2013
New Revision: 1461303

URL: http://svn.apache.org/r1461303
Log:
HBASE-8147. Adds an integration test for HBASE-8140 (issue observed when commitJob was launching another job).

Modified:
    hbase/trunk/hbase-it/src/test/java/org/apache/hadoop/hbase/mapreduce/IntegrationTestImportTsv.java

Modified: hbase/trunk/hbase-it/src/test/java/org/apache/hadoop/hbase/mapreduce/IntegrationTestImportTsv.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-it/src/test/java/org/apache/hadoop/hbase/mapreduce/IntegrationTestImportTsv.java?rev=1461303&r1=1461302&r2=1461303&view=diff
==============================================================================
--- hbase/trunk/hbase-it/src/test/java/org/apache/hadoop/hbase/mapreduce/IntegrationTestImportTsv.java (original)
+++ hbase/trunk/hbase-it/src/test/java/org/apache/hadoop/hbase/mapreduce/IntegrationTestImportTsv.java Tue Mar 26 20:47:19 2013
@@ -16,9 +16,12 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
 import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.IntegrationTestingUtility;
 import org.apache.hadoop.hbase.IntegrationTests;
 import org.apache.hadoop.hbase.KeyValue;
@@ -27,6 +30,17 @@ import org.apache.hadoop.hbase.client.HT
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner;
 import org.apache.hadoop.util.GenericOptionsParser;
 import org.apache.hadoop.util.Tool;
@@ -154,6 +168,7 @@ public class IntegrationTestImportTsv im
 
   @Test
   public void testGenerateAndLoad() throws Exception {
+    LOG.info("Running test testGenerateAndLoad.");
     String table = NAME + "-" + UUID.randomUUID();
     String cf = "d";
     Path hfiles = new Path(util.getDataTestDirOnTestFS(table), "hfiles");
@@ -179,6 +194,153 @@ public class IntegrationTestImportTsv im
     // clean up after ourselves.
     util.deleteTable(table);
     util.cleanupDataTestDirOnTestFS(table);
+    LOG.info("testGenerateAndLoad completed successfully.");
+  }
+
+  //
+  // helper classes used in the following test.
+  //
+
+  /**
+   * A {@link FileOutputCommitter} that launches an ImportTsv job through
+   * its {@link #commitJob(JobContext)} method.
+   */
+  private static class JobLaunchingOuputCommitter extends FileOutputCommitter {
+
+    public JobLaunchingOuputCommitter(Path outputPath, TaskAttemptContext context)
+        throws IOException {
+      super(outputPath, context);
+    }
+
+    @Override
+    public void commitJob(JobContext context) throws IOException {
+      super.commitJob(context);
+
+      // inherit jar dependencies added to distributed cache loaded by parent job
+      Configuration conf = HBaseConfiguration.create();
+      conf.set("mapred.job.classpath.archives",
+        context.getConfiguration().get("mapred.job.classpath.archives", ""));
+      conf.set("mapreduce.job.cache.archives.visibilities",
+        context.getConfiguration().get("mapreduce.job.cache.archives.visibilities", ""));
+
+      // can't use IntegrationTest instance of util because it hasn't been
+      // instantiated on the JVM running this method. Create our own.
+      IntegrationTestingUtility util =
+          new IntegrationTestingUtility(conf);
+
+      // this is why we're here: launch a child job. The rest of this should
+      // look a lot like TestImportTsv#testMROnTable.
+      final String table = format("%s-%s-child", NAME, context.getJobID());
+      final String cf = "FAM";
+
+      String[] args = {
+          "-D" + ImportTsv.COLUMNS_CONF_KEY + "=HBASE_ROW_KEY,FAM:A,FAM:B",
+          "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=\u001b",
+          table
+      };
+
+      try {
+        util.createTable(table, cf);
+        LOG.info("testRunFromOutputCommitter: launching child job.");
+        TestImportTsv.doMROnTableTest(util, cf, null, args, 1);
+      } catch (Exception e) {
+        throw new IOException("Underlying MapReduce job failed. Aborting commit.", e);
+      } finally {
+        util.deleteTable(table);
+      }
+    }
+  }
+
+  /**
+   * An {@link OutputFormat} that exposes the <code>JobLaunchingOutputCommitter</code>.
+   */
+  public static class JobLaunchingOutputFormat extends FileOutputFormat<LongWritable, Text> {
+
+    private OutputCommitter committer = null;
+
+    @Override
+    public RecordWriter<LongWritable, Text> getRecordWriter(TaskAttemptContext job)
+        throws IOException, InterruptedException {
+      return new RecordWriter<LongWritable, Text>() {
+        @Override
+        public void write(LongWritable key, Text value) throws IOException,
+            InterruptedException {
+          /* do nothing */
+        }
+
+        @Override
+        public void close(TaskAttemptContext context) throws IOException,
+            InterruptedException {
+          /* do nothing */
+        }
+      };
+    }
+
+    @Override
+    public synchronized OutputCommitter getOutputCommitter(TaskAttemptContext context)
+        throws IOException {
+      if (committer == null) {
+        Path output = getOutputPath(context);
+        LOG.debug("Using JobLaunchingOuputCommitter.");
+        committer = new JobLaunchingOuputCommitter(output, context);
+      }
+      return committer;
+    }
+  }
+
+  /**
+   * Add classes necessary for integration-test jobs.
+   */
+  public static void addTestDependencyJars(Configuration conf) throws IOException {
+    TableMapReduceUtil.addDependencyJars(conf,
+      org.apache.hadoop.hbase.BaseConfigurable.class, // hbase-server
+      HBaseTestingUtility.class,                      // hbase-server-test
+      HBaseCommonTestingUtility.class,                // hbase-common-test
+      com.google.common.collect.ListMultimap.class,   // Guava
+      org.cloudera.htrace.Trace.class);               // HTrace
+  }
+
+  /**
+   * {@link TableMapReduceUtil#addDependencyJars(Job)} is used when
+   * configuring a mapreduce job to ensure dependencies of the job are shipped
+   * to the cluster. Sometimes those dependencies are on the classpath, but not
+   * packaged as a jar, for instance, when run at the end of another mapreduce
+   * job. In that case, dependency jars have already been shipped to the cluster
+   * and expanded in the parent job's run folder. This test validates the child
+   * job's classpath is constructed correctly under that scenario.
+   */
+  @Test
+  public void testRunFromOutputCommitter() throws Exception {
+    LOG.info("Running test testRunFromOutputCommitter.");
+
+    FileSystem fs = FileSystem.get(getConf());
+    Path inputPath = new Path(util.getDataTestDirOnTestFS("parent"), "input.txt");
+    Path outputPath = new Path(util.getDataTestDirOnTestFS("parent"), "output");
+    FSDataOutputStream fout = null;
+    try {
+      fout = fs.create(inputPath, true);
+      fout.write(Bytes.toBytes("testRunFromOutputCommitter\n"));
+      LOG.debug(format("Wrote test data to file: %s", inputPath));
+    } finally {
+      fout.close();
+    }
+
+    // create a parent job that ships the HBase dependencies. This is
+    // accurate as the expected calling context.
+    Job job = new Job(getConf(), NAME + ".testRunFromOutputCommitter - parent");
+    job.setJarByClass(IntegrationTestImportTsv.class);
+    job.setInputFormatClass(TextInputFormat.class);
+    job.setOutputFormatClass(JobLaunchingOutputFormat.class);
+    TextInputFormat.addInputPath(job, inputPath);
+    JobLaunchingOutputFormat.setOutputPath(job, outputPath);
+    TableMapReduceUtil.addDependencyJars(job);
+    addTestDependencyJars(job.getConfiguration());
+
+    // Job launched by the OutputCommitter will fail if dependency jars are
+    // not shipped properly.
+    LOG.info("testRunFromOutputCommitter: launching parent job.");
+    assertTrue(job.waitForCompletion(true));
+    LOG.info("testRunFromOutputCommitter completed successfully.");
   }
 
   public int run(String[] args) throws Exception {
@@ -194,6 +356,7 @@ public class IntegrationTestImportTsv im
     // IntegrationTestsDriver does.
     provisionCluster();
     testGenerateAndLoad();
+    testRunFromOutputCommitter();
     releaseCluster();
 
     return 0;