You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ac...@apache.org on 2008/09/19 09:31:42 UTC
svn commit: r696957 [1/2] - in /hadoop/core/trunk: ./ conf/
src/core/org/apache/hadoop/filecache/ src/mapred/org/apache/hadoop/mapred/
src/test/org/apache/hadoop/mapred/
Author: acmurthy
Date: Fri Sep 19 00:31:41 2008
New Revision: 696957
URL: http://svn.apache.org/viewvc?rev=696957&view=rev
Log:
HADOOP-249. Reuse JVMs across Map-Reduce Tasks. Contributed by Devaraj Das.
Added:
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Child.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JVMId.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JvmManager.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JvmTask.java
Modified:
hadoop/core/trunk/CHANGES.txt
hadoop/core/trunk/conf/hadoop-default.xml
hadoop/core/trunk/src/core/org/apache/hadoop/filecache/DistributedCache.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/InterTrackerProtocol.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IsolationRunner.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobConf.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Task.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskLog.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskLogAppender.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskMemoryManagerThread.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskRunner.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskStatus.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTrackerStatus.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java
hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRDFSSort.java
Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=696957&r1=696956&r2=696957&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Fri Sep 19 00:31:41 2008
@@ -180,6 +180,11 @@
HADOOP-4176. Implement getFileChecksum(Path) in HftpFileSystem. (szetszwo)
+ HADOOP-249. Reuse JVMs across Map-Reduce Tasks.
+ Configuration changes to hadoop-default.xml:
+ add mapred.job.reuse.jvm.num.tasks
+ (Devaraj Das via acmurthy)
+
IMPROVEMENTS
HADOOP-4106. libhdfs: add time, permission and user attribute support (part 2).
Modified: hadoop/core/trunk/conf/hadoop-default.xml
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/conf/hadoop-default.xml?rev=696957&r1=696956&r2=696957&view=diff
==============================================================================
--- hadoop/core/trunk/conf/hadoop-default.xml (original)
+++ hadoop/core/trunk/conf/hadoop-default.xml Fri Sep 19 00:31:41 2008
@@ -957,6 +957,14 @@
</property>
<property>
+ <name>mapred.job.reuse.jvm.num.tasks</name>
+ <value>1</value>
+ <description>How many tasks to run per jvm. If set to -1, there is
+ no limit.
+ </description>
+</property>
+
+<property>
<name>mapred.min.split.size</name>
<value>0</value>
<description>The minimum size chunk that map input should be split
Modified: hadoop/core/trunk/src/core/org/apache/hadoop/filecache/DistributedCache.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/core/org/apache/hadoop/filecache/DistributedCache.java?rev=696957&r1=696956&r2=696957&view=diff
==============================================================================
--- hadoop/core/trunk/src/core/org/apache/hadoop/filecache/DistributedCache.java (original)
+++ hadoop/core/trunk/src/core/org/apache/hadoop/filecache/DistributedCache.java Fri Sep 19 00:31:41 2008
@@ -152,6 +152,42 @@
boolean isArchive, long confFileStamp,
Path currentWorkDir)
throws IOException {
+ return getLocalCache(cache, conf, baseDir, fileStatus, isArchive,
+ confFileStamp, currentWorkDir, true);
+ }
+ /**
+ * Get the locally cached file or archive; it could either be
+ * previously cached (and valid) or copy it from the {@link FileSystem} now.
+ *
+ * @param cache the cache to be localized, this should be specified as
+ * new URI(hdfs://hostname:port/absolute_path_to_file#LINKNAME). If no schema
+ * or hostname:port is provided the file is assumed to be in the filesystem
+ * being used in the Configuration
+ * @param conf The Confguration file which contains the filesystem
+ * @param baseDir The base cache Dir where you wnat to localize the files/archives
+ * @param fileStatus The file status on the dfs.
+ * @param isArchive if the cache is an archive or a file. In case it is an
+ * archive with a .zip or .jar or .tar or .tgz or .tar.gz extension it will
+ * be unzipped/unjarred/untarred automatically
+ * and the directory where the archive is unzipped/unjarred/untarred is
+ * returned as the Path.
+ * In case of a file, the path to the file is returned
+ * @param confFileStamp this is the hdfs file modification timestamp to verify that the
+ * file to be cached hasn't changed since the job started
+ * @param currentWorkDir this is the directory where you would want to create symlinks
+ * for the locally cached files/archives
+ * @param honorSymLinkConf if this is false, then the symlinks are not
+ * created even if conf says so (this is required for an optimization in task
+ * launches
+ * @return the path to directory where the archives are unjarred in case of archives,
+ * the path to the file where the file is copied locally
+ * @throws IOException
+ */
+ public static Path getLocalCache(URI cache, Configuration conf,
+ Path baseDir, FileStatus fileStatus,
+ boolean isArchive, long confFileStamp,
+ Path currentWorkDir, boolean honorSymLinkConf)
+ throws IOException {
String cacheId = makeRelative(cache, conf);
CacheStatus lcacheStatus;
Path localizedPath;
@@ -162,10 +198,10 @@
lcacheStatus = new CacheStatus(new Path(baseDir, new Path(cacheId)));
cachedArchives.put(cacheId, lcacheStatus);
}
-
+
synchronized (lcacheStatus) {
localizedPath = localizeCache(conf, cache, confFileStamp, lcacheStatus,
- fileStatus, isArchive, currentWorkDir);
+ fileStatus, isArchive, currentWorkDir, honorSymLinkConf);
lcacheStatus.refcount++;
}
}
@@ -180,6 +216,7 @@
}
return localizedPath;
}
+
/**
* Get the locally cached file or archive; it could either be
@@ -292,9 +329,9 @@
CacheStatus cacheStatus,
FileStatus fileStatus,
boolean isArchive,
- Path currentWorkDir)
+ Path currentWorkDir,boolean honorSymLinkConf)
throws IOException {
- boolean doSymlink = getSymlink(conf);
+ boolean doSymlink = honorSymLinkConf && getSymlink(conf);
if(cache.getFragment() == null) {
doSymlink = false;
}
Added: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Child.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Child.java?rev=696957&view=auto
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Child.java (added)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Child.java Fri Sep 19 00:31:41 2008
@@ -0,0 +1,181 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.net.InetSocketAddress;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FSError;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.mapred.JvmTask;
+import org.apache.hadoop.metrics.MetricsContext;
+import org.apache.hadoop.metrics.MetricsUtil;
+import org.apache.hadoop.metrics.jvm.JvmMetrics;
+import org.apache.log4j.LogManager;
+
+/**
+ * The main() for child processes.
+ */
+
+public class Child {
+
+ public static final Log LOG =
+ LogFactory.getLog(TaskTracker.class);
+
+ static volatile TaskAttemptID taskid;
+
+ public static void main(String[] args) throws Throwable {
+ LOG.debug("Child starting");
+
+ JobConf defaultConf = new JobConf();
+ String host = args[0];
+ int port = Integer.parseInt(args[1]);
+ InetSocketAddress address = new InetSocketAddress(host, port);
+ final TaskAttemptID firstTaskid = TaskAttemptID.forName(args[2]);
+ taskid = firstTaskid;
+ int jvmIdInt = Integer.parseInt(args[3]);
+ JVMId jvmId = new JVMId(taskid.getJobID(),taskid.isMap(),jvmIdInt);
+ final int MAX_SLEEP_COUNT = 600; //max idle time of 5 minutes
+ int sleepCount = 0;
+ TaskUmbilicalProtocol umbilical =
+ (TaskUmbilicalProtocol)RPC.getProxy(TaskUmbilicalProtocol.class,
+ TaskUmbilicalProtocol.versionID,
+ address,
+ defaultConf);
+ int numTasksToExecute = -1; //-1 signifies "no limit"
+ int numTasksExecuted = 0;
+ Thread t = new Thread() {
+ public void run() {
+ //every so often wake up and syncLogs so that we can track
+ //logs of the currently running task
+ while (true) {
+ try {
+ Thread.sleep(5000);
+ TaskLog.syncLogs(firstTaskid, taskid);
+ } catch (InterruptedException ie) {
+ } catch (IOException iee) {
+ LOG.error("Error in syncLogs: " + iee);
+ System.exit(-1);
+ }
+ }
+ }
+ };
+ t.setName("Thread for syncLogs");
+ t.setDaemon(true);
+ t.start();
+ //for the memory management, a PID file is written and the PID file
+ //is written once per JVM. We simply symlink the file on a per task
+ //basis later (see below). Long term, we should change the Memory
+ //manager to use JVMId instead of TaskAttemptId
+ Path srcPidPath = null;
+ Path dstPidPath = null;
+ try {
+ while (true) {
+ JvmTask myTask = umbilical.getTask(jvmId, firstTaskid);
+ if (myTask.shouldDie()) {
+ break;
+ } else {
+ if (myTask.getTask() == null) {
+ if (sleepCount == MAX_SLEEP_COUNT) {
+ System.exit(0);
+ }
+ sleepCount++;
+ Thread.sleep(500);
+ continue;
+ }
+ sleepCount = 0; //got a task. reset the sleepCount
+ }
+ Task task = myTask.getTask();
+ taskid = task.getTaskID();
+
+ //create the index file so that the log files
+ //are viewable immediately
+ TaskLog.syncLogs(firstTaskid, taskid);
+ JobConf job = new JobConf(task.getJobFile());
+ if (job.getBoolean("task.memory.mgmt.enabled", false)) {
+ if (srcPidPath == null) {
+ srcPidPath = TaskMemoryManagerThread.getPidFilePath(firstTaskid,
+ job);
+ }
+ //since the JVM is running multiple tasks potentially, we need
+ //to do symlink stuff only for the subsequent tasks
+ if (!taskid.equals(firstTaskid)) {
+ dstPidPath = new Path(srcPidPath.getParent(), taskid.toString());
+ FileUtil.symLink(srcPidPath.toUri().getPath(),
+ dstPidPath.toUri().getPath());
+ }
+ }
+ //setupWorkDir actually sets up the symlinks for the distributed
+ //cache. After a task exits we wipe the workdir clean, and hence
+ //the symlinks have to be rebuilt.
+ TaskRunner.setupWorkDir(job);
+
+ numTasksToExecute = job.getNumTasksToExecutePerJvm();
+ assert(numTasksToExecute != 0);
+ TaskLog.cleanup(job.getInt("mapred.userlog.retain.hours", 24));
+
+ task.setConf(job);
+
+ defaultConf.addResource(new Path(task.getJobFile()));
+
+ // Initiate Java VM metrics
+ JvmMetrics.init(task.getPhase().toString(), job.getSessionId());
+ // use job-specified working directory
+ FileSystem.get(job).setWorkingDirectory(job.getWorkingDirectory());
+ try {
+ task.run(job, umbilical); // run the task
+ } finally {
+ TaskLog.syncLogs(firstTaskid, taskid);
+ if (!taskid.equals(firstTaskid) &&
+ job.getBoolean("task.memory.mgmt.enabled", false)) {
+ new File(dstPidPath.toUri().getPath()).delete();
+ }
+ }
+ if (numTasksToExecute > 0 && ++numTasksExecuted == numTasksToExecute) {
+ break;
+ }
+ }
+ } catch (FSError e) {
+ LOG.fatal("FSError from child", e);
+ umbilical.fsError(taskid, e.getMessage());
+ } catch (Throwable throwable) {
+ LOG.warn("Error running child", throwable);
+ // Report back any failures, for diagnostic purposes
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ throwable.printStackTrace(new PrintStream(baos));
+ umbilical.reportDiagnosticInfo(taskid, baos.toString());
+ } finally {
+ RPC.stopProxy(umbilical);
+ MetricsContext metricsContext = MetricsUtil.getContext("mapred");
+ metricsContext.close();
+ // Shutting down log4j of the child-vm...
+ // This assumes that on return from Task.run()
+ // there is no more logging done.
+ LogManager.shutdown();
+ }
+ }
+}
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/InterTrackerProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/InterTrackerProtocol.java?rev=696957&r1=696956&r2=696957&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/InterTrackerProtocol.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/InterTrackerProtocol.java Fri Sep 19 00:31:41 2008
@@ -47,8 +47,9 @@
* Version 15: Changed format of Task and TaskStatus for HADOOP-153
* Version 16: adds ResourceStatus to TaskTrackerStatus for HADOOP-3759
* Version 17: Changed format of Task and TaskStatus for HADOOP-3150
+ * Version 18: Changed status message due to changes in TaskStatus
*/
- public static final long versionID = 17L;
+ public static final long versionID = 18L;
public final static int TRACKERS_OK = 0;
public final static int UNKNOWN_TASKTRACKER = 1;
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IsolationRunner.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IsolationRunner.java?rev=696957&r1=696956&r2=696957&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IsolationRunner.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IsolationRunner.java Fri Sep 19 00:31:41 2008
@@ -35,6 +35,7 @@
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.JvmTask;
public class IsolationRunner {
private static final Log LOG =
@@ -58,7 +59,7 @@
LOG.info("Task " + taskId + " reporting shuffle error: " + message);
}
- public Task getTask(TaskAttemptID taskid) throws IOException {
+ public JvmTask getTask(JVMId jvmId, TaskAttemptID taskId) throws IOException {
return null;
}
Added: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JVMId.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JVMId.java?rev=696957&view=auto
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JVMId.java (added)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JVMId.java Fri Sep 19 00:31:41 2008
@@ -0,0 +1,147 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.text.NumberFormat;
+
+public class JVMId extends ID {
+ boolean isMap;
+ JobID jobId;
+ private static final String JVM = "jvm";
+ private static char UNDERSCORE = '_';
+ private static NumberFormat idFormat = NumberFormat.getInstance();
+ static {
+ idFormat.setGroupingUsed(false);
+ idFormat.setMinimumIntegerDigits(6);
+ }
+
+ public JVMId(JobID jobId, boolean isMap, int id) {
+ super(id);
+ this.isMap = isMap;
+ this.jobId = jobId;
+ }
+
+ public JVMId (String jtIdentifier, int jobId, boolean isMap, int id) {
+ this(new JobID(jtIdentifier, jobId), isMap, id);
+ }
+
+ private JVMId() { }
+
+ public boolean isMapJVM() {
+ return isMap;
+ }
+ public JobID getJobId() {
+ return jobId;
+ }
+ public boolean equals(Object o) {
+ if(o == null)
+ return false;
+ if(o.getClass().equals(JVMId.class)) {
+ JVMId that = (JVMId)o;
+ return this.id==that.id
+ && this.isMap == that.isMap
+ && this.jobId.equals(that.jobId);
+ }
+ else return false;
+ }
+
+ /**Compare TaskInProgressIds by first jobIds, then by tip numbers. Reduces are
+ * defined as greater then maps.*/
+ @Override
+ public int compareTo(ID o) {
+ JVMId that = (JVMId)o;
+ int jobComp = this.jobId.compareTo(that.jobId);
+ if(jobComp == 0) {
+ if(this.isMap == that.isMap) {
+ return this.id - that.id;
+ }
+ else return this.isMap ? -1 : 1;
+ }
+ else return jobComp;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder builder = new StringBuilder();
+ return builder.append(JVM).append(UNDERSCORE)
+ .append(toStringWOPrefix()).toString();
+ }
+
+ StringBuilder toStringWOPrefix() {
+ StringBuilder builder = new StringBuilder();
+ builder.append(jobId.toStringWOPrefix())
+ .append(isMap ? "_m_" : "_r_");
+ return builder.append(idFormat.format(id));
+ }
+
+ @Override
+ public int hashCode() {
+ return toStringWOPrefix().toString().hashCode();
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ super.readFields(in);
+ this.jobId = JobID.read(in);
+ this.isMap = in.readBoolean();
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ super.write(out);
+ jobId.write(out);
+ out.writeBoolean(isMap);
+ }
+
+ public static JVMId read(DataInput in) throws IOException {
+ JVMId jvmId = new JVMId();
+ jvmId.readFields(in);
+ return jvmId;
+ }
+
+ /** Construct a JVMId object from given string
+ * @return constructed JVMId object or null if the given String is null
+ * @throws IllegalArgumentException if the given string is malformed
+ */
+ public static JVMId forName(String str)
+ throws IllegalArgumentException {
+ if(str == null)
+ return null;
+ try {
+ String[] parts = str.split("_");
+ if(parts.length == 5) {
+ if(parts[0].equals(JVM)) {
+ boolean isMap = false;
+ if(parts[3].equals("m")) isMap = true;
+ else if(parts[3].equals("r")) isMap = false;
+ else throw new Exception();
+ return new JVMId(parts[1], Integer.parseInt(parts[2]),
+ isMap, Integer.parseInt(parts[4]));
+ }
+ }
+ }catch (Exception ex) {//fall below
+ }
+ throw new IllegalArgumentException("TaskId string : " + str
+ + " is not properly formed");
+ }
+
+}
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobConf.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobConf.java?rev=696957&r1=696956&r2=696957&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobConf.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobConf.java Fri Sep 19 00:31:41 2008
@@ -326,6 +326,23 @@
}
/**
+ * Sets the number of tasks that a spawned task JVM should run
+ * before it exits
+ * @param numTasks the number of tasks to execute; defaults to 1;
+ * -1 signifies no limit
+ */
+ public void setNumTasksToExecutePerJvm(int numTasks) {
+ setInt("mapred.job.reuse.jvm.num.tasks", numTasks);
+ }
+
+ /**
+ * Get the number of tasks that a spawned JVM should execute
+ */
+ public int getNumTasksToExecutePerJvm() {
+ return getInt("mapred.job.reuse.jvm.num.tasks", 1);
+ }
+
+ /**
* Get the {@link InputFormat} implementation for the map-reduce job,
* defaults to {@link TextInputFormat} if not specified explicity.
*
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java?rev=696957&r1=696956&r2=696957&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java Fri Sep 19 00:31:41 2008
@@ -1415,7 +1415,9 @@
for (TaskInProgress tip : job.getMapTasks()) {
for (TaskStatus taskStatus : tip.getTaskStatuses()) {
if (taskStatus.getRunState() != TaskStatus.State.RUNNING &&
- taskStatus.getRunState() != TaskStatus.State.COMMIT_PENDING) {
+ taskStatus.getRunState() != TaskStatus.State.COMMIT_PENDING &&
+ taskStatus.getRunState() != TaskStatus.State.UNASSIGNED &&
+ taskStatus.getRunState() != TaskStatus.State.INITIALIZED) {
markCompletedTaskAttempt(taskStatus.getTaskTracker(),
taskStatus.getTaskID());
}
@@ -1424,7 +1426,9 @@
for (TaskInProgress tip : job.getReduceTasks()) {
for (TaskStatus taskStatus : tip.getTaskStatuses()) {
if (taskStatus.getRunState() != TaskStatus.State.RUNNING &&
- taskStatus.getRunState() != TaskStatus.State.COMMIT_PENDING) {
+ taskStatus.getRunState() != TaskStatus.State.COMMIT_PENDING &&
+ taskStatus.getRunState() != TaskStatus.State.UNASSIGNED &&
+ taskStatus.getRunState() != TaskStatus.State.INITIALIZED) {
markCompletedTaskAttempt(taskStatus.getTaskTracker(),
taskStatus.getTaskID());
}
Added: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JvmManager.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JvmManager.java?rev=696957&view=auto
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JvmManager.java (added)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JvmManager.java Fri Sep 19 00:31:41 2008
@@ -0,0 +1,372 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Vector;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.Shell.ShellCommandExecutor;
+
+class JvmManager {
+
+ public static final Log LOG =
+ LogFactory.getLog("org.apache.hadoop.mapred.JvmManager");
+
+ JvmManagerForType mapJvmManager;
+
+ JvmManagerForType reduceJvmManager;
+
+ TaskTracker tracker;
+
+ public JvmEnv constructJvmEnv(List<String> setup, Vector<String>vargs,
+ File stdout,File stderr,long logSize, File workDir,
+ Map<String,String> env, String pidFile, JobConf conf) {
+ return new JvmEnv(setup,vargs,stdout,stderr,logSize,workDir,env,pidFile,conf);
+ }
+
+ public JvmManager(TaskTracker tracker) {
+ mapJvmManager = new JvmManagerForType(tracker.getMaxCurrentMapTasks(),
+ true, tracker);
+ reduceJvmManager = new JvmManagerForType(tracker.getMaxCurrentReduceTasks(),
+ false, tracker);
+ this.tracker = tracker;
+ }
+
+ public void stop() {
+ mapJvmManager.stop();
+ reduceJvmManager.stop();
+ }
+
+ public boolean isJvmKnown(JVMId jvmId) {
+ if (jvmId.isMapJVM()) {
+ return mapJvmManager.isJvmknown(jvmId);
+ } else {
+ return reduceJvmManager.isJvmknown(jvmId);
+ }
+ }
+
+ public void launchJvm(JobID jobId, boolean isMap, JvmEnv env) {
+ if (isMap) {
+ mapJvmManager.reapJvm(env, jobId, tracker);
+ } else {
+ reduceJvmManager.reapJvm(env, jobId, tracker);
+ }
+ }
+
+ public void setRunningTaskForJvm(JVMId jvmId, TaskRunner t) {
+ if (jvmId.isMapJVM()) {
+ mapJvmManager.setRunningTaskForJvm(jvmId, t);
+ } else {
+ reduceJvmManager.setRunningTaskForJvm(jvmId, t);
+ }
+ }
+
+ public void taskFinished(TaskRunner tr) {
+ if (tr.getTask().isMapTask()) {
+ mapJvmManager.taskFinished(tr);
+ } else {
+ reduceJvmManager.taskFinished(tr);
+ }
+ }
+
+ public void taskKilled(TaskRunner tr) {
+ if (tr.getTask().isMapTask()) {
+ mapJvmManager.taskKilled(tr);
+ } else {
+ reduceJvmManager.taskKilled(tr);
+ }
+ }
+
+ public void killJvm(JVMId jvmId) {
+ if (jvmId.isMap) {
+ mapJvmManager.killJvm(jvmId);
+ } else {
+ reduceJvmManager.killJvm(jvmId);
+ }
+ }
+
+ private static class JvmManagerForType {
+ //Mapping from the JVM IDs to running Tasks
+ Map <JVMId,TaskRunner> jvmToRunningTask =
+ new HashMap<JVMId, TaskRunner>();
+ //Mapping from the tasks to JVM IDs
+ Map <TaskRunner,JVMId> runningTaskToJvm =
+ new HashMap<TaskRunner, JVMId>();
+ //Mapping from the JVM IDs to Reduce JVM processes
+ Map <JVMId, JvmRunner> jvmIdToRunner =
+ new HashMap<JVMId, JvmRunner>();
+ int maxJvms;
+ boolean isMap;
+
+ Random rand = new Random(System.currentTimeMillis());
+ TaskTracker tracker;
+
+ public JvmManagerForType(int maxJvms, boolean isMap, TaskTracker tracker) {
+ this.maxJvms = maxJvms;
+ this.isMap = isMap;
+ this.tracker = tracker;
+ }
+
+ synchronized public void setRunningTaskForJvm(JVMId jvmId,
+ TaskRunner t) {
+ if (t == null) {
+ //signifies the JVM asked for a task and it
+ //was not given anything.
+ jvmIdToRunner.get(jvmId).setBusy(false);
+ return;
+ }
+ jvmToRunningTask.put(jvmId, t);
+ runningTaskToJvm.put(t,jvmId);
+ jvmIdToRunner.get(jvmId).setBusy(true);
+ }
+
+ synchronized public boolean isJvmknown(JVMId jvmId) {
+ return jvmIdToRunner.containsKey(jvmId);
+ }
+
+ synchronized public void taskFinished(TaskRunner tr) {
+ JVMId jvmId = runningTaskToJvm.remove(tr);
+ if (jvmId != null) {
+ jvmToRunningTask.remove(jvmId);
+ JvmRunner jvmRunner;
+ if ((jvmRunner = jvmIdToRunner.get(jvmId)) != null) {
+ jvmRunner.taskRan();
+ }
+ }
+ }
+
+ synchronized public void taskKilled(TaskRunner tr) {
+ JVMId jvmId = runningTaskToJvm.remove(tr);
+ if (jvmId != null) {
+ jvmToRunningTask.remove(jvmId);
+ killJvm(jvmId);
+ }
+ }
+
+ synchronized public void killJvm(JVMId jvmId) {
+ JvmRunner jvmRunner;
+ if ((jvmRunner = jvmIdToRunner.get(jvmId)) != null) {
+ jvmRunner.kill();
+ }
+ }
+
+ synchronized public void stop() {
+ for (JvmRunner jvm : jvmIdToRunner.values()) {
+ jvm.kill();
+ }
+ }
+
+ synchronized private void removeJvm(JVMId jvmId) {
+ jvmIdToRunner.remove(jvmId);
+ }
+ private synchronized void reapJvm(
+ JvmEnv env,
+ JobID jobId, TaskTracker tracker) {
+ boolean spawnNewJvm = false;
+ //Check whether there is a free slot to start a new JVM.
+ //,or, Kill a (idle) JVM and launch a new one
+ int numJvmsSpawned = jvmIdToRunner.size();
+
+ if (numJvmsSpawned >= maxJvms) {
+ //go through the list of JVMs for all jobs.
+ //for each JVM see whether it is currently running something and
+ //if not, then kill the JVM
+ Iterator<Map.Entry<JVMId, JvmRunner>> jvmIter =
+ jvmIdToRunner.entrySet().iterator();
+
+ while (jvmIter.hasNext()) {
+ JvmRunner jvmRunner = jvmIter.next().getValue();
+ JobID jId = jvmRunner.jvmId.getJobId();
+ //Cases when a JVM is killed:
+ // (1) the JVM under consideration belongs to the same job
+ // (passed in the argument). In this case, kill only when
+ // the JVM ran all the tasks it was scheduled to run (in terms
+ // of count).
+ // (2) the JVM under consideration belongs to a different job and is
+ // currently not busy
+ //
+ if ((jId.equals(jobId) && jvmRunner.ranAll()) ||
+ (!jId.equals(jobId) && !jvmRunner.isBusy())) {
+ jvmIter.remove();
+ jvmRunner.kill();
+ spawnNewJvm = true;
+ break;
+ }
+ }
+ } else {
+ spawnNewJvm = true;
+ }
+
+ if (spawnNewJvm) {
+ spawnNewJvm(jobId, env, tracker);
+ } else {
+ LOG.info("No new JVM spawned for jobId: " + jobId);
+ }
+ }
+
+ private void spawnNewJvm(JobID jobId, JvmEnv env, TaskTracker tracker) {
+ JvmRunner jvmRunner = new JvmRunner(env,jobId);
+ jvmIdToRunner.put(jvmRunner.jvmId, jvmRunner);
+ //spawn the JVM in a new thread. Note that there will be very little
+ //extra overhead of launching the new thread for a new JVM since
+ //most of the cost is involved in launching the process. Moreover,
+ //since we are going to be using the JVM for running many tasks,
+ //the thread launch cost becomes trivial when amortized over all
+ //tasks. Doing it this way also keeps code simple.
+ jvmRunner.setDaemon(true);
+ jvmRunner.setName("JVM Runner " + jvmRunner.jvmId + " spawned.");
+ if (tracker.isTaskMemoryManagerEnabled()) {
+ tracker.getTaskMemoryManager().addTask(
+ TaskAttemptID.forName(env.conf.get("mapred.task.id")),
+ tracker.getMemoryForTask(env.conf));
+ }
+ LOG.info(jvmRunner.getName());
+ jvmRunner.start();
+ }
+ synchronized private void updateOnJvmExit(JVMId jvmId,
+ int exitCode, boolean killed) {
+ removeJvm(jvmId);
+ TaskRunner t = jvmToRunningTask.remove(jvmId);
+
+ if (t != null) {
+ runningTaskToJvm.remove(t);
+ if (!killed && exitCode != 0) {
+ t.setExitCode(exitCode);
+ }
+ t.signalDone();
+ }
+ }
+
+ private class JvmRunner extends Thread {
+ JvmEnv env;
+ volatile boolean killed = false;
+ volatile int numTasksRan;
+ final int numTasksToRun;
+ JVMId jvmId;
+ volatile boolean busy = true;
+ private ShellCommandExecutor shexec; // shell terminal for running the task
+ public JvmRunner(JvmEnv env, JobID jobId) {
+ this.env = env;
+ this.jvmId = new JVMId(jobId, isMap, rand.nextInt());
+ this.numTasksToRun = env.conf.getNumTasksToExecutePerJvm();
+ LOG.info("In JvmRunner constructed JVM ID: " + jvmId);
+ }
+ public void run() {
+ runChild(env);
+ }
+
+ public void runChild(JvmEnv env) {
+ try {
+ env.vargs.add(Integer.toString(jvmId.getId()));
+ List<String> wrappedCommand =
+ TaskLog.captureOutAndError(env.setup, env.vargs, env.stdout, env.stderr,
+ env.logSize, env.pidFile);
+ shexec = new ShellCommandExecutor(wrappedCommand.toArray(new String[0]),
+ env.workDir, env.env);
+ shexec.execute();
+ } catch (IOException ioe) {
+ // do nothing
+ // error and output are appropriately redirected
+ } finally { // handle the exit code
+ if (shexec == null) {
+ return;
+ }
+ int exitCode = shexec.getExitCode();
+ updateOnJvmExit(jvmId, exitCode, killed);
+ LOG.info("JVM : " + jvmId +" exited. Number of tasks it ran: " +
+ numTasksRan);
+ try {
+ //the task jvm cleans up the common workdir for every
+ //task at the beginning of each task in the task JVM.
+ //For the last task, we do it here.
+ if (env.conf.getNumTasksToExecutePerJvm() != 1) {
+ FileUtil.fullyDelete(env.workDir);
+ }
+ } catch (IOException ie){}
+ if (tracker.isTaskMemoryManagerEnabled()) {
+ // Remove the associated pid-file, if any
+ tracker.getTaskMemoryManager().
+ removePidFile(TaskAttemptID.forName(
+ env.conf.get("mapred.task.id")));
+ }
+ }
+ }
+
+ public void kill() {
+ if (shexec != null) {
+ Process process = shexec.getProcess();
+ if (process != null) {
+ process.destroy();
+ }
+ }
+ removeJvm(jvmId);
+ }
+
+ public void taskRan() {
+ busy = false;
+ numTasksRan++;
+ }
+
+ public boolean ranAll() {
+ return(numTasksRan == numTasksToRun);
+ }
+ public void setBusy(boolean busy) {
+ this.busy = busy;
+ }
+ public boolean isBusy() {
+ return busy;
+ }
+ }
+ }
+ static class JvmEnv { //Helper class
+ List<String> vargs;
+ List<String> setup;
+ File stdout;
+ File stderr;
+ File workDir;
+ String pidFile;
+ long logSize;
+ JobConf conf;
+ Map<String, String> env;
+
+ public JvmEnv(List<String> setup, Vector<String> vargs, File stdout,
+ File stderr, long logSize, File workDir, Map<String,String> env,
+ String pidFile, JobConf conf) {
+ this.setup = setup;
+ this.vargs = vargs;
+ this.stdout = stdout;
+ this.stderr = stderr;
+ this.workDir = workDir;
+ this.env = env;
+ this.pidFile = pidFile;
+ this.conf = conf;
+ }
+ }
+}
Added: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JvmTask.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JvmTask.java?rev=696957&view=auto
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JvmTask.java (added)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JvmTask.java Fri Sep 19 00:31:41 2008
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import org.apache.hadoop.io.Writable;
+
+class JvmTask implements Writable {
+ Task t;
+ boolean shouldDie;
+ public JvmTask(Task t, boolean shouldDie) {
+ this.t = t;
+ this.shouldDie = shouldDie;
+ }
+ public JvmTask() {}
+ public Task getTask() {
+ return t;
+ }
+ public boolean shouldDie() {
+ return shouldDie;
+ }
+ public void write(DataOutput out) throws IOException {
+ out.writeBoolean(shouldDie);
+ if (t != null) {
+ out.writeBoolean(true);
+ out.writeBoolean(t.isMapTask());
+ t.write(out);
+ } else {
+ out.writeBoolean(false);
+ }
+ }
+ public void readFields(DataInput in) throws IOException {
+ shouldDie = in.readBoolean();
+ boolean taskComing = in.readBoolean();
+ if (taskComing) {
+ boolean isMap = in.readBoolean();
+ if (isMap) {
+ t = new MapTask();
+ } else {
+ t = new ReduceTask();
+ }
+ t.readFields(in);
+ }
+ }
+}
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java?rev=696957&r1=696956&r2=696957&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java Fri Sep 19 00:31:41 2008
@@ -29,6 +29,7 @@
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.mapred.JobTrackerMetricsInst;
+import org.apache.hadoop.mapred.JvmTask;
/** Implements MapReduce locally, in-process, for debugging. */
class LocalJobRunner implements JobSubmissionProtocol {
@@ -206,7 +207,7 @@
// TaskUmbilicalProtocol methods
- public Task getTask(TaskAttemptID taskid) { return null; }
+ public JvmTask getTask(JVMId jvmId, TaskAttemptID taskId) { return null; }
public boolean statusUpdate(TaskAttemptID taskId, TaskStatus taskStatus)
throws IOException, InterruptedException {
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Task.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Task.java?rev=696957&r1=696956&r2=696957&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Task.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Task.java Fri Sep 19 00:31:41 2008
@@ -108,6 +108,7 @@
private int partition; // id within job
TaskStatus taskStatus; // current status of the task
protected boolean cleanupJob = false;
+ private Thread pingProgressThread;
//skip ranges based on failed ranges from previous attempts
private SortedRanges skipRanges = new SortedRanges();
@@ -344,7 +345,7 @@
* let the parent know that it's alive. It also pings the parent to see if it's alive.
*/
protected void startCommunicationThread(final TaskUmbilicalProtocol umbilical) {
- Thread thread = new Thread(new Runnable() {
+ pingProgressThread = new Thread(new Runnable() {
public void run() {
final int MAX_RETRIES = 3;
int remainingRetries = MAX_RETRIES;
@@ -407,8 +408,8 @@
}
}
}, "Comm thread for "+taskId);
- thread.setDaemon(true);
- thread.start();
+ pingProgressThread.setDaemon(true);
+ pingProgressThread.start();
LOG.debug(getTaskID() + " Progress/ping thread started");
}
@@ -596,6 +597,10 @@
commit(umbilical, outputCommitter);
}
taskDone.set(true);
+ pingProgressThread.interrupt();
+ try {
+ pingProgressThread.join();
+ } catch (InterruptedException ie) {}
sendLastUpdate(umbilical);
//signal the tasktracker that we are done
sendDone(umbilical);
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java?rev=696957&r1=696956&r2=696957&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java Fri Sep 19 00:31:41 2008
@@ -485,7 +485,9 @@
// and is addressed better at the TaskTracker to ensure this.
// @see {@link TaskTracker.transmitHeartbeat()}
if ((newState != TaskStatus.State.RUNNING &&
- newState != TaskStatus.State.COMMIT_PENDING ) &&
+ newState != TaskStatus.State.COMMIT_PENDING &&
+ newState != TaskStatus.State.INITIALIZED &&
+ newState != TaskStatus.State.UNASSIGNED) &&
(oldState == newState)) {
LOG.warn("Recieved duplicate status update of '" + newState +
"' for '" + taskid + "' of TIP '" + getTIPId() + "'");
@@ -496,7 +498,9 @@
// We have seen out of order status messagesmoving tasks from complete
// to running. This is a spot fix, but it should be addressed more
// globally.
- if (newState == TaskStatus.State.RUNNING &&
+ if ((newState == TaskStatus.State.RUNNING ||
+ newState == TaskStatus.State.UNASSIGNED ||
+ newState == TaskStatus.State.INITIALIZED) &&
(oldState == TaskStatus.State.FAILED ||
oldState == TaskStatus.State.KILLED ||
oldState == TaskStatus.State.SUCCEEDED ||
@@ -708,7 +712,9 @@
boolean killTask(TaskAttemptID taskId, boolean shouldFail) {
TaskStatus st = taskStatuses.get(taskId);
if(st != null && (st.getRunState() == TaskStatus.State.RUNNING
- || st.getRunState() == TaskStatus.State.COMMIT_PENDING)
+ || st.getRunState() == TaskStatus.State.COMMIT_PENDING ||
+ st.getRunState() == TaskStatus.State.INITIALIZED ||
+ st.getRunState() == TaskStatus.State.UNASSIGNED)
&& tasksToKill.put(taskId, shouldFail) == null ) {
String logStr = "Request received to " + (shouldFail ? "fail" : "kill")
+ " task '" + taskId + "' by user";
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskLog.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskLog.java?rev=696957&r1=696956&r2=696957&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskLog.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskLog.java Fri Sep 19 00:31:41 2008
@@ -18,17 +18,25 @@
package org.apache.hadoop.mapred;
+import java.io.BufferedOutputStream;
+import java.io.BufferedReader;
+import java.io.DataOutputStream;
import java.io.File;
import java.io.FileFilter;
import java.io.FileInputStream;
+import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
+import java.util.Enumeration;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileUtil;
+import org.apache.log4j.Appender;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
/**
* A simple logger to handle the task-specific user logs.
@@ -50,8 +58,130 @@
}
public static File getTaskLogFile(TaskAttemptID taskid, LogName filter) {
- return new File(new File(LOG_DIR, taskid.toString()), filter.toString());
+ return new File(getBaseDir(taskid.toString()), filter.toString());
}
+ public static File getRealTaskLogFileLocation(TaskAttemptID taskid,
+ LogName filter) {
+ LogFileDetail l;
+ try {
+ l = getTaskLogFileDetail(taskid, filter);
+ } catch (IOException ie) {
+ LOG.error("getTaskLogFileDetail threw an exception " + ie);
+ return null;
+ }
+ return new File(getBaseDir(l.location), filter.toString());
+ }
+ private static class LogFileDetail {
+ final static String LOCATION = "LOG_DIR:";
+ String location;
+ long start;
+ long length;
+ }
+
+ private static LogFileDetail getTaskLogFileDetail(TaskAttemptID taskid,
+ LogName filter) throws IOException {
+ File indexFile = new File(getBaseDir(taskid.toString()), "log.index");
+ BufferedReader fis = new BufferedReader(new java.io.FileReader(indexFile));
+ //the format of the index file is
+ //LOG_DIR: <the dir where the task logs are really stored>
+ //stdout:<start-offset in the stdout file> <length>
+ //stderr:<start-offset in the stderr file> <length>
+ //syslog:<start-offset in the syslog file> <length>
+ LogFileDetail l = new LogFileDetail();
+ String str = fis.readLine();
+ if (str == null) { //the file doesn't have anything
+ throw new IOException ("Index file for the log of " + taskid+" doesn't exist.");
+ }
+ l.location = str.substring(str.indexOf(LogFileDetail.LOCATION)+
+ LogFileDetail.LOCATION.length());
+ //special cases are the debugout and profile.out files. They are guaranteed
+ //to be associated with each task attempt since jvm reuse is disabled
+ //when profiling/debugging is enabled
+ if (filter.equals(LogName.DEBUGOUT) || filter.equals(LogName.PROFILE)) {
+ l.length = new File(getBaseDir(l.location), filter.toString()).length();
+ l.start = 0;
+ fis.close();
+ return l;
+ }
+ str = fis.readLine();
+ while (str != null) {
+ //look for the exact line containing the logname
+ if (str.contains(filter.toString())) {
+ str = str.substring(filter.toString().length()+1);
+ String[] startAndLen = str.split(" ");
+ l.start = Long.parseLong(startAndLen[0]);
+ l.length = Long.parseLong(startAndLen[1]);
+ break;
+ }
+ str = fis.readLine();
+ }
+ fis.close();
+ return l;
+ }
+
+ public static File getIndexFile(String taskid) {
+ return new File(getBaseDir(taskid), "log.index");
+ }
+ private static File getBaseDir(String taskid) {
+ return new File(LOG_DIR, taskid);
+ }
+ private static long prevOutLength;
+ private static long prevErrLength;
+ private static long prevLogLength;
+
+ private static void writeToIndexFile(TaskAttemptID firstTaskid)
+ throws IOException {
+ File indexFile = getIndexFile(currentTaskid.toString());
+ BufferedOutputStream bos =
+ new BufferedOutputStream(new FileOutputStream(indexFile,false));
+ DataOutputStream dos = new DataOutputStream(bos);
+ //the format of the index file is
+ //LOG_DIR: <the dir where the task logs are really stored>
+ //STDOUT: <start-offset in the stdout file> <length>
+ //STDERR: <start-offset in the stderr file> <length>
+ //SYSLOG: <start-offset in the syslog file> <length>
+ dos.writeBytes(LogFileDetail.LOCATION + firstTaskid.toString()+"\n"+
+ LogName.STDOUT.toString()+":");
+ dos.writeBytes(Long.toString(prevOutLength)+" ");
+ dos.writeBytes(Long.toString(getTaskLogFile(firstTaskid, LogName.STDOUT)
+ .length() - prevOutLength)+"\n"+LogName.STDERR+":");
+ dos.writeBytes(Long.toString(prevErrLength)+" ");
+ dos.writeBytes(Long.toString(getTaskLogFile(firstTaskid, LogName.STDERR)
+ .length() - prevErrLength)+"\n"+LogName.SYSLOG.toString()+":");
+ dos.writeBytes(Long.toString(prevLogLength)+" ");
+ dos.writeBytes(Long.toString(getTaskLogFile(firstTaskid, LogName.SYSLOG)
+ .length() - prevLogLength)+"\n");
+ dos.close();
+ }
+ private static void resetPrevLengths(TaskAttemptID firstTaskid) {
+ prevOutLength = getTaskLogFile(firstTaskid, LogName.STDOUT).length();
+ prevErrLength = getTaskLogFile(firstTaskid, LogName.STDERR).length();
+ prevLogLength = getTaskLogFile(firstTaskid, LogName.SYSLOG).length();
+ }
+ private volatile static TaskAttemptID currentTaskid = null;
+ @SuppressWarnings("unchecked")
+ public synchronized static void syncLogs(TaskAttemptID firstTaskid, TaskAttemptID taskid)
+ throws IOException {
+ System.out.flush();
+ System.err.flush();
+ Enumeration<Logger> allLoggers = LogManager.getCurrentLoggers();
+ while (allLoggers.hasMoreElements()) {
+ Logger l = allLoggers.nextElement();
+ Enumeration<Appender> allAppenders = l.getAllAppenders();
+ while (allAppenders.hasMoreElements()) {
+ Appender a = allAppenders.nextElement();
+ if (a instanceof TaskLogAppender) {
+ ((TaskLogAppender)a).flush();
+ }
+ }
+ }
+ if (currentTaskid != taskid) {
+ currentTaskid = taskid;
+ resetPrevLengths(firstTaskid);
+ }
+ writeToIndexFile(firstTaskid);
+ }
+
/**
* The filter for userlogs.
@@ -133,9 +263,9 @@
public Reader(TaskAttemptID taskid, LogName kind,
long start, long end) throws IOException {
// find the right log file
- File filename = getTaskLogFile(taskid, kind);
+ LogFileDetail fileDetail = getTaskLogFileDetail(taskid, kind);
// calculate the start and stop
- long size = filename.length();
+ long size = fileDetail.length;
if (start < 0) {
start += size + 1;
}
@@ -144,8 +274,11 @@
}
start = Math.max(0, Math.min(start, size));
end = Math.max(0, Math.min(end, size));
+ start += fileDetail.start;
+ end += fileDetail.start;
bytesRemaining = end - start;
- file = new FileInputStream(filename);
+ file = new FileInputStream(new File(getBaseDir(fileDetail.location),
+ kind.toString()));
// skip upto start
long pos = 0;
while (pos < start) {
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskLogAppender.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskLogAppender.java?rev=696957&r1=696956&r2=696957&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskLogAppender.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskLogAppender.java Fri Sep 19 00:31:41 2008
@@ -61,6 +61,10 @@
}
}
}
+
+ public void flush() {
+ qw.flush();
+ }
@Override
public synchronized void close() {
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskMemoryManagerThread.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskMemoryManagerThread.java?rev=696957&r1=696956&r2=696957&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskMemoryManagerThread.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskMemoryManagerThread.java Fri Sep 19 00:31:41 2008
@@ -27,6 +27,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.TaskTracker;
@@ -235,30 +236,43 @@
* @return the pid of the task process.
*/
private String getPid(TaskAttemptID tipID) {
- Path pidFileName = getPidFilePath(tipID);
+ Path pidFileName = getPidFilePath(tipID, taskTracker.getJobConf());
if (pidFileName == null) {
return null;
}
return ProcfsBasedProcessTree.getPidFromPidFile(pidFileName.toString());
}
- private LocalDirAllocator lDirAlloc = new LocalDirAllocator("mapred.local.dir");
+ private static LocalDirAllocator lDirAlloc =
+ new LocalDirAllocator("mapred.local.dir");
/**
* Get the pidFile path of a Task
* @param tipID
* @return pidFile's Path
*/
- Path getPidFilePath(TaskAttemptID tipID) {
+ public static Path getPidFilePath(TaskAttemptID tipID, JobConf conf) {
Path pidFileName = null;
try {
+ //this actually need not use a localdirAllocator since the PID
+ //files are really small..
pidFileName = lDirAlloc.getLocalPathToRead(
(TaskTracker.getPidFilesSubdir() + Path.SEPARATOR + tipID),
- taskTracker.getJobConf());
+ conf);
} catch (IOException i) {
// PID file is not there
LOG.debug("Failed to get pidFile name for " + tipID);
}
return pidFileName;
}
+ public void removePidFile(TaskAttemptID tid) {
+ if (taskTracker.isTaskMemoryManagerEnabled()) {
+ Path pidFilePath = getPidFilePath(tid, taskTracker.getJobConf());
+ if (pidFilePath != null) {
+ try {
+ FileSystem.getLocal(taskTracker.getJobConf()).delete(pidFilePath, false);
+ } catch(IOException ie) {}
+ }
+ }
+ }
}
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskRunner.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskRunner.java?rev=696957&r1=696956&r2=696957&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskRunner.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskRunner.java Fri Sep 19 00:31:41 2008
@@ -20,7 +20,6 @@
import org.apache.commons.logging.*;
import org.apache.hadoop.fs.*;
-import org.apache.hadoop.util.Shell.ShellCommandExecutor;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.filecache.*;
import org.apache.hadoop.util.*;
@@ -43,11 +42,16 @@
LogFactory.getLog(TaskRunner.class);
volatile boolean killed = false;
- private ShellCommandExecutor shexec; // shell terminal for running the task
private Task t;
+ private Object lock = new Object();
+ private volatile boolean done = false;
+ private int exitCode = -1;
+ private boolean exitCodeSet = false;
+
private TaskTracker tracker;
protected JobConf conf;
+ JvmManager jvmManager;
/**
* for cleaning up old map outputs
@@ -60,6 +64,7 @@
this.conf = conf;
this.mapOutputFile = new MapOutputFile(t.getJobID());
this.mapOutputFile.setConf(conf);
+ this.jvmManager = tracker.getJvmManagerInstance();
}
public Task getTask() { return t; }
@@ -78,7 +83,7 @@
*/
public void close() throws IOException {}
- private String stringifyPathArray(Path[] p){
+ private static String stringifyPathArray(Path[] p){
if (p == null){
return null;
}
@@ -143,7 +148,8 @@
true, Long.parseLong(
archivesTimestamps[i]),
new Path(workDir.
- getAbsolutePath()));
+ getAbsolutePath()),
+ false);
}
DistributedCache.setLocalArchives(conf, stringifyPathArray(p));
@@ -171,7 +177,8 @@
false, Long.parseLong(
fileTimestamps[i]),
new Path(workDir.
- getAbsolutePath()));
+ getAbsolutePath()),
+ false);
}
DistributedCache.setLocalFiles(conf, stringifyPathArray(p));
}
@@ -185,17 +192,7 @@
out.close();
}
}
-
- // create symlinks for all the files in job cache dir in current
- // workingdir for streaming
- try{
- DistributedCache.createAllSymlink(conf, jobCacheDir,
- workDir);
- } catch(IOException ie){
- // Do not exit even if symlinks have not been created.
- LOG.warn(StringUtils.stringifyException(ie));
- }
-
+
if (!prepare()) {
return;
}
@@ -366,7 +363,7 @@
}
// Add main class and its arguments
- vargs.add(TaskTracker.Child.class.getName()); // main of Child
+ vargs.add(Child.class.getName()); // main of Child
// pass umbilical address
InetSocketAddress address = tracker.getTaskTrackerReportAddress();
vargs.add(address.getAddress().getHostAddress());
@@ -395,9 +392,7 @@
File stderr = TaskLog.getTaskLogFile(taskid, TaskLog.LogName.STDERR);
stdout.getParentFile().mkdirs();
tracker.getTaskTrackerInstrumentation().reportTaskLaunch(taskid, stdout, stderr);
- List<String> wrappedCommand =
- TaskLog.captureOutAndError(setup, vargs, stdout, stderr, logSize, pidFile);
- LOG.debug("child jvm command : " + wrappedCommand.toString());
+
Map<String, String> env = new HashMap<String, String>();
StringBuffer ldLibraryPath = new StringBuffer();
ldLibraryPath.append(workDir.toString());
@@ -408,9 +403,26 @@
ldLibraryPath.append(oldLdLibraryPath);
}
env.put("LD_LIBRARY_PATH", ldLibraryPath.toString());
- // Run the task as child of the parent TaskTracker process
- runChild(wrappedCommand, workDir, env, taskid);
-
+ tracker.taskInitialized(t.getTaskID());
+ LOG.info("Task ID: " + t.getTaskID() +" initialized");
+ jvmManager.launchJvm(t.getJobID(), t.isMapTask(),
+ jvmManager.constructJvmEnv(setup,vargs,stdout,stderr,logSize,
+ workDir, env, pidFile, conf));
+ synchronized (lock) {
+ while (!done) {
+ lock.wait();
+ }
+ }
+ tracker.getTaskTrackerInstrumentation().reportTaskEnd(t.getTaskID());
+ if (exitCodeSet) {
+ if (!killed && exitCode != 0) {
+ if (exitCode == 65) {
+ tracker.getTaskTrackerInstrumentation().taskFailedPing(t.getTaskID());
+ }
+ throw new IOException("Task process exit with nonzero status of " +
+ exitCode + ".");
+ }
+ }
} catch (FSError e) {
LOG.fatal("FSError", e);
try {
@@ -445,31 +457,76 @@
LOG.warn("Error releasing caches : Cache files might not have been cleaned up");
}
tracker.reportTaskFinished(t.getTaskID(), false);
+ if (t.isMapTask()) {
+ tracker.addFreeMapSlot();
+ } else {
+ tracker.addFreeReduceSlot();
+ }
}
}
-
- /**
- * Run the child process
- */
- private void runChild(List<String> args, File dir,
- Map<String, String> env,
- TaskAttemptID taskid) throws IOException {
-
- shexec = new ShellCommandExecutor(args.toArray(new String[0]), dir, env);
- try {
- shexec.execute();
- } catch (IOException ioe) {
- // do nothing
- // error and output are appropriately redirected
- } finally { // handle the exit code
- int exit_code = shexec.getExitCode();
- tracker.getTaskTrackerInstrumentation().reportTaskEnd(t.getTaskID());
- if (!killed && exit_code != 0) {
- if (exit_code == 65) {
- tracker.getTaskTrackerInstrumentation().taskFailedPing(t.getTaskID());
+
+ //Mostly for setting up the symlinks. Note that when we setup the distributed
+ //cache, we didn't create the symlinks. This is done on a per task basis
+ //by the currently executing task.
+ public static void setupWorkDir(JobConf conf) throws IOException {
+ File workDir = new File(".").getAbsoluteFile();
+ FileUtil.fullyDelete(workDir);
+ if (DistributedCache.getSymlink(conf)) {
+ URI[] archives = DistributedCache.getCacheArchives(conf);
+ URI[] files = DistributedCache.getCacheFiles(conf);
+ Path[] localArchives = DistributedCache.getLocalCacheArchives(conf);
+ Path[] localFiles = DistributedCache.getLocalCacheFiles(conf);
+ if (archives != null) {
+ for (int i = 0; i < archives.length; i++) {
+ String link = archives[i].getFragment();
+ if (link != null) {
+ link = workDir.toString() + Path.SEPARATOR + link;
+ File flink = new File(link);
+ if (!flink.exists()) {
+ FileUtil.symLink(localArchives[i].toString(), link);
+ }
+ }
}
- throw new IOException("Task process exit with nonzero status of " +
- exit_code + ": "+ shexec);
+ }
+ if (files != null) {
+ for (int i = 0; i < files.length; i++) {
+ String link = files[i].getFragment();
+ if (link != null) {
+ link = workDir.toString() + Path.SEPARATOR + link;
+ File flink = new File(link);
+ if (!flink.exists()) {
+ FileUtil.symLink(localFiles[i].toString(), link);
+ }
+ }
+ }
+ }
+ }
+ File jobCacheDir = null;
+ if (conf.getJar() != null) {
+ jobCacheDir = new File(
+ new Path(conf.getJar()).getParent().toString());
+ }
+
+ // create symlinks for all the files in job cache dir in current
+ // workingdir for streaming
+ try{
+ DistributedCache.createAllSymlink(conf, jobCacheDir,
+ workDir);
+ } catch(IOException ie){
+ // Do not exit even if symlinks have not been created.
+ LOG.warn(StringUtils.stringifyException(ie));
+ }
+ // add java.io.tmpdir given by mapred.child.tmp
+ String tmp = conf.get("mapred.child.tmp", "./tmp");
+ Path tmpDir = new Path(tmp);
+
+ // if temp directory path is not absolute
+ // prepend it with workDir.
+ if (!tmpDir.isAbsolute()) {
+ tmpDir = new Path(workDir.toString(), tmp);
+ FileSystem localFs = FileSystem.getLocal(conf);
+ if (!localFs.mkdirs(tmpDir) && !localFs.getFileStatus(tmpDir).isDir()){
+ throw new IOException("Mkdirs failed to create " + tmpDir.toString());
}
}
}
@@ -478,13 +535,17 @@
* Kill the child process
*/
public void kill() {
- if (shexec != null) {
- Process process = shexec.getProcess();
- if (process != null) {
- process.destroy();
- }
- }
killed = true;
+ jvmManager.taskKilled(this);
+ }
+ public void signalDone() {
+ synchronized (lock) {
+ done = true;
+ lock.notify();
+ }
+ }
+ public void setExitCode(int exitCode) {
+ this.exitCodeSet = true;
+ this.exitCode = exitCode;
}
-
}
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskStatus.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskStatus.java?rev=696957&r1=696956&r2=696957&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskStatus.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskStatus.java Fri Sep 19 00:31:41 2008
@@ -41,11 +41,11 @@
// what state is the task in?
public static enum State {RUNNING, SUCCEEDED, FAILED, UNASSIGNED, KILLED,
- COMMIT_PENDING}
+ COMMIT_PENDING, INITIALIZED}
private TaskAttemptID taskid;
private float progress;
- private State runState;
+ private volatile State runState;
private String diagnosticInfo;
private String stateString;
private String taskTracker;