You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2011/03/04 04:31:44 UTC

svn commit: r1077030 - in /hadoop/common/branches/branch-0.20-security-patches/src: contrib/streaming/src/test/org/apache/hadoop/streaming/ mapred/org/apache/hadoop/mapred/

Author: omalley
Date: Fri Mar  4 03:31:44 2011
New Revision: 1077030

URL: http://svn.apache.org/viewvc?rev=1077030&view=rev
Log:
commit c17f63f6707117e4d07c2768a8836216d05fee08
Author: Hemanth Yamijala <yh...@apache.org>
Date:   Wed Oct 21 11:32:59 2009 +0530

    MAPREDUCE:1086 from https://issues.apache.org/jira/secure/attachment/12422677/MR-1086-yhadoop20.patch
    
    +++ b/YAHOO-CHANGES.txt
    +    MAPREDUCE-1086. Setup Hadoop logging environment for tasks to point to
    +    task related parameters. (Ravi Gummadi via yhemanth)
    +

Added:
    hadoop/common/branches/branch-0.20-security-patches/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingTaskLog.java
Modified:
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskLog.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskRunner.java

Added: hadoop/common/branches/branch-0.20-security-patches/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingTaskLog.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingTaskLog.java?rev=1077030&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingTaskLog.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingTaskLog.java Fri Mar  4 03:31:44 2011
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.streaming;
+
+import java.io.*;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MiniMRCluster;
+import org.apache.hadoop.mapred.TestMiniMRWithDFS;
+import org.apache.hadoop.util.Shell;
+
+import junit.framework.TestCase;
+
+/**
+ * This tests the environment set by TT for the child of task jvm.
+ * This will launch a streaming job with a shell script as mapper.
+ */
+public class TestStreamingTaskLog extends TestCase {
+  String input = "the dummy input";
+  Path inputPath = new Path("inDir");
+  Path outputPath = new Path("outDir");
+  String map = null;
+  MiniMRCluster mr = null;
+  FileSystem fs = null;
+  final long USERLOG_LIMIT_KB = 5;//consider 5kb as logSize
+
+  String[] genArgs() {
+    return new String[] {
+      "-input", inputPath.toString(),
+      "-output", outputPath.toString(),
+      "-mapper", map,
+      "-reducer", StreamJob.REDUCE_NONE,
+      "-jobconf", "mapred.job.tracker=" + "localhost:" + mr.getJobTrackerPort(),
+      "-jobconf", "fs.default.name=" + fs.getUri().toString(),
+      "-jobconf", "mapred.map.tasks=1",
+      "-jobconf", "keep.failed.task.files=true",
+      "-jobconf", "mapred.userlog.limit.kb=" + USERLOG_LIMIT_KB,
+      "-jobconf", "stream.tmpdir="+System.getProperty("test.build.data","/tmp")
+    };
+  }
+
+  /**
+   * This test validates the setting of HADOOP_ROOT_LOGGER to 'INFO,TLA' and the
+   * dependent properties
+   *  (a) hadoop.tasklog.taskid and
+   *  (b) hadoop.tasklog.totalLogFileSize
+   * for the children of java tasks in streaming jobs.
+   */
+  public void testStreamingTaskLogWithHadoopCmd() {
+    try {
+      final int numSlaves = 1;
+      Configuration conf = new Configuration();
+
+      fs = FileSystem.getLocal(conf);
+      Path testDir = new Path(System.getProperty("test.build.data","/tmp"));
+      if (fs.exists(testDir)) {
+        fs.delete(testDir, true);
+      }
+      fs.mkdirs(testDir);
+      File scriptFile = createScript(
+          testDir.toString() + "/testTaskLog.sh");
+      mr = new MiniMRCluster(numSlaves, fs.getUri().toString(), 1);
+      
+      writeInputFile(fs, inputPath);
+      map = scriptFile.getAbsolutePath();
+      
+      runStreamJobAndValidateEnv();
+      
+      fs.delete(outputPath, true);
+      assertFalse("output not cleaned up", fs.exists(outputPath));
+      mr.waitUntilIdle();
+    } catch(IOException e) {
+      fail(e.toString());
+    } finally {
+      if (mr != null) {
+        mr.shutdown();
+      }
+    }
+  }
+
+  private File createScript(String script) throws IOException {
+    File scriptFile = new File(script);
+    UtilTest.recursiveDelete(scriptFile);
+    FileOutputStream in = new FileOutputStream(scriptFile);
+    in.write(("cat > /dev/null 2>&1\n" +
+              "echo $HADOOP_ROOT_LOGGER $HADOOP_CLIENT_OPTS").getBytes());
+    in.close();
+    
+    Shell.execCommand(new String[]{"chmod", "+x",
+                                   scriptFile.getAbsolutePath()});
+    return scriptFile;
+  }
+  
+  private void writeInputFile(FileSystem fs, Path dir) throws IOException {
+    DataOutputStream out = fs.create(new Path(dir, "part0"));
+    out.writeBytes(input);
+    out.close();
+  }
+
+  /**
+   * Runs the streaming job and validates the output.
+   * @throws IOException
+   */
+  private void runStreamJobAndValidateEnv() throws IOException {
+    int returnStatus = -1;
+    boolean mayExit = false;
+    StreamJob job = new StreamJob(genArgs(), mayExit);
+    returnStatus = job.go();
+    assertEquals("StreamJob failed.", 0, returnStatus);
+    
+    // validate environment variables set for the child(script) of java process
+    String env = TestMiniMRWithDFS.readOutput(outputPath, mr.createJobConf());
+    long logSize = USERLOG_LIMIT_KB * 1024;
+    assertTrue("environment set for child is wrong", env.contains("INFO,TLA")
+               && env.contains("-Dhadoop.tasklog.taskid=attempt_")
+               && env.contains("-Dhadoop.tasklog.totalLogFileSize=" + logSize));
+  }
+}

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskLog.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskLog.java?rev=1077030&r1=1077029&r2=1077030&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskLog.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskLog.java Fri Mar  4 03:31:44 2011
@@ -58,13 +58,9 @@ public class TaskLog {
     new File(System.getProperty("hadoop.log.dir"), 
              "userlogs").getAbsoluteFile();
   
+  // localFS is set in (and used by) writeToIndexFile()
   static LocalFileSystem localFS = null;
   static {
-    try {
-      localFS = FileSystem.getLocal(new Configuration());
-    } catch (IOException ioe) {
-      LOG.warn("Getting local file system failed.");
-    }
     if (!LOG_DIR.exists()) {
       LOG_DIR.mkdirs();
     }
@@ -192,6 +188,10 @@ public class TaskLog {
     File indexFile = getIndexFile(currentTaskid.toString(), isCleanup);
     Path indexFilePath = new Path(indexFile.getAbsolutePath());
     Path tmpIndexFilePath = new Path(tmpIndexFile.getAbsolutePath());
+
+    if (localFS == null) {// set localFS once
+      localFS = FileSystem.getLocal(new Configuration());
+    }
     localFS.rename (tmpIndexFilePath, indexFilePath);
   }
   private static void resetPrevLengths(TaskAttemptID firstTaskid) {

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskRunner.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskRunner.java?rev=1077030&r1=1077029&r2=1077030&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskRunner.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskRunner.java Fri Mar  4 03:31:44 2011
@@ -447,6 +447,18 @@ abstract class TaskRunner extends Thread
         ldLibraryPath.append(oldLdLibraryPath);
       }
       env.put("LD_LIBRARY_PATH", ldLibraryPath.toString());
+
+      // for the child of task jvm, set hadoop.root.logger
+      env.put("HADOOP_ROOT_LOGGER","INFO,TLA");
+      String hadoopClientOpts = System.getenv("HADOOP_CLIENT_OPTS");
+      if (hadoopClientOpts == null) {
+        hadoopClientOpts = "";
+      } else {
+        hadoopClientOpts = hadoopClientOpts + " ";
+      }
+      hadoopClientOpts = hadoopClientOpts + "-Dhadoop.tasklog.taskid=" + taskid
+                         + " -Dhadoop.tasklog.totalLogFileSize=" + logSize;
+      env.put("HADOOP_CLIENT_OPTS", "\"" + hadoopClientOpts + "\"");
       
       // add the env variables passed by the user
       String mapredChildEnv = getChildEnv(conf);