You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by yh...@apache.org on 2009/10/20 09:11:41 UTC

svn commit: r826979 - in /hadoop/mapreduce/trunk: CHANGES.txt src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingTaskLog.java src/java/org/apache/hadoop/mapred/TaskLog.java src/java/org/apache/hadoop/mapred/TaskRunner.java

Author: yhemanth
Date: Tue Oct 20 07:11:41 2009
New Revision: 826979

URL: http://svn.apache.org/viewvc?rev=826979&view=rev
Log:
MAPREDUCE-1086. Setup Hadoop logging environment for tasks to point to task related parameters. Contributed by Ravi Gummadi.

Added:
    hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingTaskLog.java
Modified:
    hadoop/mapreduce/trunk/CHANGES.txt
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskLog.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java

Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=826979&r1=826978&r2=826979&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Tue Oct 20 07:11:41 2009
@@ -794,3 +794,7 @@
 
     MAPREDUCE-1070. Prevent a deadlock in the fair scheduler servlet.
     (Todd Lipcon via cdouglas)
+
+    MAPREDUCE-1086. Setup Hadoop logging environment for tasks to point to
+    task related parameters. (Ravi Gummadi via yhemanth)
+

Added: hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingTaskLog.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingTaskLog.java?rev=826979&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingTaskLog.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingTaskLog.java Tue Oct 20 07:11:41 2009
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.streaming;
+
+import java.io.*;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MiniMRCluster;
+import org.apache.hadoop.mapred.TestMiniMRWithDFS;
+import org.apache.hadoop.util.Shell;
+
+import junit.framework.TestCase;
+
+/**
+ * This tests the environment set by TT for the child of task jvm.
+ * This will launch a streaming job with a shell script as mapper.
+ */
+public class TestStreamingTaskLog extends TestCase {
+  String input = "the dummy input";
+  Path inputPath = new Path("inDir");
+  Path outputPath = new Path("outDir");
+  String map = null;
+  MiniMRCluster mr = null;
+  FileSystem fs = null;
+  final long USERLOG_LIMIT_KB = 5;//consider 5kb as logSize
+
+  String[] genArgs() {
+    return new String[] {
+      "-input", inputPath.toString(),
+      "-output", outputPath.toString(),
+      "-mapper", map,
+      "-reducer", StreamJob.REDUCE_NONE,
+      "-jobconf", "mapred.job.tracker=" + "localhost:" + mr.getJobTrackerPort(),
+      "-jobconf", "fs.default.name=" + fs.getUri().toString(),
+      "-jobconf", "mapred.map.tasks=1",
+      "-jobconf", "keep.failed.task.files=true",
+      "-jobconf", "mapreduce.task.userlog.limit.kb=" + USERLOG_LIMIT_KB,
+      "-jobconf", "stream.tmpdir="+System.getProperty("test.build.data","/tmp")
+    };
+  }
+
+  /**
+   * This test validates the setting of HADOOP_ROOT_LOGGER to 'INFO,TLA' and the
+   * dependent properties
+   *  (a) hadoop.tasklog.taskid and
+   *  (b) hadoop.tasklog.totalLogFileSize
+   * for the children of java tasks in streaming jobs.
+   */
+  public void testStreamingTaskLogWithHadoopCmd() {
+    try {
+      final int numSlaves = 1;
+      Configuration conf = new Configuration();
+
+      fs = FileSystem.getLocal(conf);
+      Path testDir = new Path(System.getProperty("test.build.data","/tmp"));
+      if (fs.exists(testDir)) {
+        fs.delete(testDir, true);
+      }
+      fs.mkdirs(testDir);
+      File scriptFile = createScript(
+          testDir.toString() + "/testTaskLog.sh");
+      mr = new MiniMRCluster(numSlaves, fs.getUri().toString(), 1);
+      
+      writeInputFile(fs, inputPath);
+      map = scriptFile.getAbsolutePath();
+      
+      runStreamJobAndValidateEnv();
+      
+      fs.delete(outputPath, true);
+      assertFalse("output not cleaned up", fs.exists(outputPath));
+      mr.waitUntilIdle();
+    } catch(IOException e) {
+      fail(e.toString());
+    } finally {
+      if (mr != null) {
+        mr.shutdown();
+      }
+    }
+  }
+
+  private File createScript(String script) throws IOException {
+    File scriptFile = new File(script);
+    UtilTest.recursiveDelete(scriptFile);
+    FileOutputStream in = new FileOutputStream(scriptFile);
+    in.write(("cat > /dev/null 2>&1\n" +
+              "echo $HADOOP_ROOT_LOGGER $HADOOP_CLIENT_OPTS").getBytes());
+    in.close();
+    
+    Shell.execCommand(new String[]{"chmod", "+x",
+                                   scriptFile.getAbsolutePath()});
+    return scriptFile;
+  }
+  
+  private void writeInputFile(FileSystem fs, Path dir) throws IOException {
+    DataOutputStream out = fs.create(new Path(dir, "part0"));
+    out.writeBytes(input);
+    out.close();
+  }
+
+  /**
+   * Runs the streaming job and validates the output.
+   * @throws IOException
+   */
+  private void runStreamJobAndValidateEnv() throws IOException {
+    int returnStatus = -1;
+    boolean mayExit = false;
+    StreamJob job = new StreamJob(genArgs(), mayExit);
+    returnStatus = job.go();
+    assertEquals("StreamJob failed.", 0, returnStatus);
+    
+    // validate environment variables set for the child(script) of java process
+    String env = TestMiniMRWithDFS.readOutput(outputPath, mr.createJobConf());
+    long logSize = USERLOG_LIMIT_KB * 1024;
+    assertTrue("environment set for child is wrong", env.contains("INFO,TLA")
+               && env.contains("-Dhadoop.tasklog.taskid=attempt_")
+               && env.contains("-Dhadoop.tasklog.totalLogFileSize=" + logSize));
+  }
+}

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskLog.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskLog.java?rev=826979&r1=826978&r2=826979&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskLog.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskLog.java Tue Oct 20 07:11:41 2009
@@ -59,13 +59,9 @@
   private static final File LOG_DIR = 
     new File(getBaseLogDir(), USERLOGS_DIR_NAME).getAbsoluteFile();
   
+  // localFS is set in (and used by) writeToIndexFile()
   static LocalFileSystem localFS = null;
   static {
-    try {
-      localFS = FileSystem.getLocal(new Configuration());
-    } catch (IOException ioe) {
-      LOG.warn("Getting local file system failed.");
-    }
     if (!LOG_DIR.exists()) {
       boolean b = LOG_DIR.mkdirs();
       if (!b) {
@@ -200,6 +196,10 @@
     File indexFile = getIndexFile(currentTaskid.toString(), isCleanup);
     Path indexFilePath = new Path(indexFile.getAbsolutePath());
     Path tmpIndexFilePath = new Path(tmpIndexFile.getAbsolutePath());
+
+    if (localFS == null) {// set localFS once
+      localFS = FileSystem.getLocal(new Configuration());
+    }
     localFS.rename (tmpIndexFilePath, indexFilePath);
   }
   private static void resetPrevLengths(TaskAttemptID firstTaskid) {

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java?rev=826979&r1=826978&r2=826979&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java Tue Oct 20 07:11:41 2009
@@ -210,7 +210,8 @@
           stderr);
 
       Map<String, String> env = new HashMap<String, String>();
-      errorInfo = getVMEnvironment(errorInfo, workDir, conf, env);
+      errorInfo = getVMEnvironment(errorInfo, workDir, conf, env,
+                                   taskid, logSize);
 
       jvmManager.launchJvm(this, 
           jvmManager.constructJvmEnv(setup,vargs,stdout,stderr,logSize, 
@@ -484,6 +485,7 @@
   }
 
   /**
+   * sets the environment variables needed for task jvm and its children.
    * @param errorInfo
    * @param workDir
    * @param env
@@ -491,7 +493,7 @@
    * @throws Throwable
    */
   private String getVMEnvironment(String errorInfo, File workDir, JobConf conf,
-      Map<String, String> env)
+      Map<String, String> env, TaskAttemptID taskid, long logSize)
       throws Throwable {
     StringBuffer ldLibraryPath = new StringBuffer();
     ldLibraryPath.append(workDir.toString());
@@ -503,6 +505,18 @@
     }
     env.put("LD_LIBRARY_PATH", ldLibraryPath.toString());
     
+    // for the child of task jvm, set hadoop.root.logger
+    env.put("HADOOP_ROOT_LOGGER", "INFO,TLA");
+    String hadoopClientOpts = System.getenv("HADOOP_CLIENT_OPTS");
+    if (hadoopClientOpts == null) {
+      hadoopClientOpts = "";
+    } else {
+      hadoopClientOpts = hadoopClientOpts + " ";
+    }
+    hadoopClientOpts = hadoopClientOpts + "-Dhadoop.tasklog.taskid=" + taskid
+                       + " -Dhadoop.tasklog.totalLogFileSize=" + logSize;
+    env.put("HADOOP_CLIENT_OPTS", "\"" + hadoopClientOpts + "\"");
+
     // add the env variables passed by the user
     String mapredChildEnv = getChildEnv(conf);
     if (mapredChildEnv != null && mapredChildEnv.length() > 0) {