You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by yh...@apache.org on 2009/10/20 09:11:41 UTC
svn commit: r826979 - in /hadoop/mapreduce/trunk: CHANGES.txt
src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingTaskLog.java
src/java/org/apache/hadoop/mapred/TaskLog.java
src/java/org/apache/hadoop/mapred/TaskRunner.java
Author: yhemanth
Date: Tue Oct 20 07:11:41 2009
New Revision: 826979
URL: http://svn.apache.org/viewvc?rev=826979&view=rev
Log:
MAPREDUCE-1086. Setup Hadoop logging environment for tasks to point to task related parameters. Contributed by Ravi Gummadi.
Added:
hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingTaskLog.java
Modified:
hadoop/mapreduce/trunk/CHANGES.txt
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskLog.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java
Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=826979&r1=826978&r2=826979&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Tue Oct 20 07:11:41 2009
@@ -794,3 +794,7 @@
MAPREDUCE-1070. Prevent a deadlock in the fair scheduler servlet.
(Todd Lipcon via cdouglas)
+
+ MAPREDUCE-1086. Setup Hadoop logging environment for tasks to point to
+ task related parameters. (Ravi Gummadi via yhemanth)
+
Added: hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingTaskLog.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingTaskLog.java?rev=826979&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingTaskLog.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingTaskLog.java Tue Oct 20 07:11:41 2009
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.streaming;
+
+import java.io.*;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MiniMRCluster;
+import org.apache.hadoop.mapred.TestMiniMRWithDFS;
+import org.apache.hadoop.util.Shell;
+
+import junit.framework.TestCase;
+
+/**
+ * This tests the environment set by TT for the child of task jvm.
+ * This will launch a streaming job with a shell script as mapper.
+ */
+public class TestStreamingTaskLog extends TestCase {
+ String input = "the dummy input";
+ Path inputPath = new Path("inDir");
+ Path outputPath = new Path("outDir");
+ String map = null;
+ MiniMRCluster mr = null;
+ FileSystem fs = null;
+ final long USERLOG_LIMIT_KB = 5;//consider 5kb as logSize
+
+ String[] genArgs() {
+ return new String[] {
+ "-input", inputPath.toString(),
+ "-output", outputPath.toString(),
+ "-mapper", map,
+ "-reducer", StreamJob.REDUCE_NONE,
+ "-jobconf", "mapred.job.tracker=" + "localhost:" + mr.getJobTrackerPort(),
+ "-jobconf", "fs.default.name=" + fs.getUri().toString(),
+ "-jobconf", "mapred.map.tasks=1",
+ "-jobconf", "keep.failed.task.files=true",
+ "-jobconf", "mapreduce.task.userlog.limit.kb=" + USERLOG_LIMIT_KB,
+ "-jobconf", "stream.tmpdir="+System.getProperty("test.build.data","/tmp")
+ };
+ }
+
+ /**
+ * This test validates the setting of HADOOP_ROOT_LOGGER to 'INFO,TLA' and the
+ * dependent properties
+ * (a) hadoop.tasklog.taskid and
+ * (b) hadoop.tasklog.totalLogFileSize
+ * for the children of java tasks in streaming jobs.
+ */
+ public void testStreamingTaskLogWithHadoopCmd() {
+ try {
+ final int numSlaves = 1;
+ Configuration conf = new Configuration();
+
+ fs = FileSystem.getLocal(conf);
+ Path testDir = new Path(System.getProperty("test.build.data","/tmp"));
+ if (fs.exists(testDir)) {
+ fs.delete(testDir, true);
+ }
+ fs.mkdirs(testDir);
+ File scriptFile = createScript(
+ testDir.toString() + "/testTaskLog.sh");
+ mr = new MiniMRCluster(numSlaves, fs.getUri().toString(), 1);
+
+ writeInputFile(fs, inputPath);
+ map = scriptFile.getAbsolutePath();
+
+ runStreamJobAndValidateEnv();
+
+ fs.delete(outputPath, true);
+ assertFalse("output not cleaned up", fs.exists(outputPath));
+ mr.waitUntilIdle();
+ } catch(IOException e) {
+ fail(e.toString());
+ } finally {
+ if (mr != null) {
+ mr.shutdown();
+ }
+ }
+ }
+
+ private File createScript(String script) throws IOException {
+ File scriptFile = new File(script);
+ UtilTest.recursiveDelete(scriptFile);
+ FileOutputStream in = new FileOutputStream(scriptFile);
+ in.write(("cat > /dev/null 2>&1\n" +
+ "echo $HADOOP_ROOT_LOGGER $HADOOP_CLIENT_OPTS").getBytes());
+ in.close();
+
+ Shell.execCommand(new String[]{"chmod", "+x",
+ scriptFile.getAbsolutePath()});
+ return scriptFile;
+ }
+
+ private void writeInputFile(FileSystem fs, Path dir) throws IOException {
+ DataOutputStream out = fs.create(new Path(dir, "part0"));
+ out.writeBytes(input);
+ out.close();
+ }
+
+ /**
+ * Runs the streaming job and validates the output.
+ * @throws IOException
+ */
+ private void runStreamJobAndValidateEnv() throws IOException {
+ int returnStatus = -1;
+ boolean mayExit = false;
+ StreamJob job = new StreamJob(genArgs(), mayExit);
+ returnStatus = job.go();
+ assertEquals("StreamJob failed.", 0, returnStatus);
+
+ // validate environment variables set for the child(script) of java process
+ String env = TestMiniMRWithDFS.readOutput(outputPath, mr.createJobConf());
+ long logSize = USERLOG_LIMIT_KB * 1024;
+ assertTrue("environment set for child is wrong", env.contains("INFO,TLA")
+ && env.contains("-Dhadoop.tasklog.taskid=attempt_")
+ && env.contains("-Dhadoop.tasklog.totalLogFileSize=" + logSize));
+ }
+}
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskLog.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskLog.java?rev=826979&r1=826978&r2=826979&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskLog.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskLog.java Tue Oct 20 07:11:41 2009
@@ -59,13 +59,9 @@
private static final File LOG_DIR =
new File(getBaseLogDir(), USERLOGS_DIR_NAME).getAbsoluteFile();
+ // localFS is set in (and used by) writeToIndexFile()
static LocalFileSystem localFS = null;
static {
- try {
- localFS = FileSystem.getLocal(new Configuration());
- } catch (IOException ioe) {
- LOG.warn("Getting local file system failed.");
- }
if (!LOG_DIR.exists()) {
boolean b = LOG_DIR.mkdirs();
if (!b) {
@@ -200,6 +196,10 @@
File indexFile = getIndexFile(currentTaskid.toString(), isCleanup);
Path indexFilePath = new Path(indexFile.getAbsolutePath());
Path tmpIndexFilePath = new Path(tmpIndexFile.getAbsolutePath());
+
+ if (localFS == null) {// set localFS once
+ localFS = FileSystem.getLocal(new Configuration());
+ }
localFS.rename (tmpIndexFilePath, indexFilePath);
}
private static void resetPrevLengths(TaskAttemptID firstTaskid) {
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java?rev=826979&r1=826978&r2=826979&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java Tue Oct 20 07:11:41 2009
@@ -210,7 +210,8 @@
stderr);
Map<String, String> env = new HashMap<String, String>();
- errorInfo = getVMEnvironment(errorInfo, workDir, conf, env);
+ errorInfo = getVMEnvironment(errorInfo, workDir, conf, env,
+ taskid, logSize);
jvmManager.launchJvm(this,
jvmManager.constructJvmEnv(setup,vargs,stdout,stderr,logSize,
@@ -484,6 +485,7 @@
}
/**
+ * sets the environment variables needed for task jvm and its children.
* @param errorInfo
* @param workDir
* @param env
@@ -491,7 +493,7 @@
* @throws Throwable
*/
private String getVMEnvironment(String errorInfo, File workDir, JobConf conf,
- Map<String, String> env)
+ Map<String, String> env, TaskAttemptID taskid, long logSize)
throws Throwable {
StringBuffer ldLibraryPath = new StringBuffer();
ldLibraryPath.append(workDir.toString());
@@ -503,6 +505,18 @@
}
env.put("LD_LIBRARY_PATH", ldLibraryPath.toString());
+ // for the child of task jvm, set hadoop.root.logger
+ env.put("HADOOP_ROOT_LOGGER", "INFO,TLA");
+ String hadoopClientOpts = System.getenv("HADOOP_CLIENT_OPTS");
+ if (hadoopClientOpts == null) {
+ hadoopClientOpts = "";
+ } else {
+ hadoopClientOpts = hadoopClientOpts + " ";
+ }
+ hadoopClientOpts = hadoopClientOpts + "-Dhadoop.tasklog.taskid=" + taskid
+ + " -Dhadoop.tasklog.totalLogFileSize=" + logSize;
+ env.put("HADOOP_CLIENT_OPTS", "\"" + hadoopClientOpts + "\"");
+
// add the env variables passed by the user
String mapredChildEnv = getChildEnv(conf);
if (mapredChildEnv != null && mapredChildEnv.length() > 0) {