You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ac...@apache.org on 2011/04/12 03:14:45 UTC
svn commit: r1091273 - in /hadoop/common/branches/branch-0.20-security: ./
src/mapred/org/apache/hadoop/mapred/ src/test/org/apache/hadoop/mapred/
src/test/org/apache/hadoop/mapreduce/security/
Author: acmurthy
Date: Tue Apr 12 01:14:44 2011
New Revision: 1091273
URL: http://svn.apache.org/viewvc?rev=1091273&view=rev
Log:
MAPREDUCE-2429. Validate JVM in TaskUmbilicalProtocol. Contributed by Siddharth Seth.
Modified:
hadoop/common/branches/branch-0.20-security/CHANGES.txt
hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/Child.java
hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/IsolationRunner.java
hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/JvmManager.java
hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java
hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/MapTask.java
hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/ReduceTask.java
hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/Task.java
hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java
hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/mapred/MiniMRCluster.java
hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/mapred/TestTaskCommit.java
hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/mapreduce/security/TestUmbilicalProtocolWithJobToken.java
Modified: hadoop/common/branches/branch-0.20-security/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/CHANGES.txt?rev=1091273&r1=1091272&r2=1091273&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.20-security/CHANGES.txt Tue Apr 12 01:14:44 2011
@@ -4,6 +4,9 @@ Release 0.20.204.0 - unreleased
BUG FIXES
+ MAPREDUCE-2429. Validate JVM in TaskUmbilicalProtocol. (Siddharth Seth via
+ acmurthy)
+
MAPREDUCE-2418. Show job errors in JobHistory page. (Siddharth Seth via
acmurthy)
Modified: hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/Child.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/Child.java?rev=1091273&r1=1091272&r2=1091273&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/Child.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/Child.java Tue Apr 12 01:14:44 2011
@@ -169,6 +169,7 @@ class Child {
UserGroupInformation childUGI = null;
+ final JvmContext jvmContext = context;
try {
while (true) {
taskid = null;
@@ -250,13 +251,14 @@ class Child {
// Create a final reference to the task for the doAs block
final Task taskFinal = task;
+ taskFinal.setJvmContext(jvmContext);
childUGI.doAs(new PrivilegedExceptionAction<Object>() {
@Override
public Object run() throws Exception {
try {
// use job-specified working directory
FileSystem.get(job).setWorkingDirectory(job.getWorkingDirectory());
- taskFinal.run(job, umbilical); // run the task
+ taskFinal.run(job, umbilical); // run the task
} finally {
TaskLog.syncLogs
(logLocation, taskid, isCleanup, logIsSegmented(job));
@@ -275,7 +277,7 @@ class Child {
}
} catch (FSError e) {
LOG.fatal("FSError from child", e);
- umbilical.fsError(taskid, e.getMessage());
+ umbilical.fsError(taskid, e.getMessage(), jvmContext);
} catch (Exception exception) {
LOG.warn("Error running child", exception);
try {
@@ -301,7 +303,7 @@ class Child {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
exception.printStackTrace(new PrintStream(baos));
if (taskid != null) {
- umbilical.reportDiagnosticInfo(taskid, baos.toString());
+ umbilical.reportDiagnosticInfo(taskid, baos.toString(), jvmContext);
}
} catch (Throwable throwable) {
LOG.fatal("Error running child : "
@@ -311,7 +313,7 @@ class Child {
String cause = tCause == null
? throwable.getMessage()
: StringUtils.stringifyException(tCause);
- umbilical.fatalError(taskid, cause);
+ umbilical.fatalError(taskid, cause, jvmContext);
}
} finally {
RPC.stopProxy(umbilical);
Modified: hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/IsolationRunner.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/IsolationRunner.java?rev=1091273&r1=1091272&r2=1091273&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/IsolationRunner.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/IsolationRunner.java Tue Apr 12 01:14:44 2011
@@ -53,19 +53,23 @@ public class IsolationRunner {
return TaskUmbilicalProtocol.versionID;
}
- public void done(TaskAttemptID taskid) throws IOException {
+ public void done(TaskAttemptID taskid, JvmContext jvmContext)
+ throws IOException {
LOG.info("Task " + taskid + " reporting done.");
}
- public void fsError(TaskAttemptID taskId, String message) throws IOException {
+ public void fsError(TaskAttemptID taskId, String message,
+ JvmContext jvmContext) throws IOException {
LOG.info("Task " + taskId + " reporting file system error: " + message);
}
- public void shuffleError(TaskAttemptID taskId, String message) throws IOException {
+ public void shuffleError(TaskAttemptID taskId, String message,
+ JvmContext jvmContext) throws IOException {
LOG.info("Task " + taskId + " reporting shuffle error: " + message);
}
- public void fatalError(TaskAttemptID taskId, String msg) throws IOException{
+ public void fatalError(TaskAttemptID taskId, String msg,
+ JvmContext jvmContext) throws IOException {
LOG.info("Task " + taskId + " reporting fatal error: " + msg);
}
@@ -73,20 +77,21 @@ public class IsolationRunner {
return null;
}
- public boolean ping(TaskAttemptID taskid) throws IOException {
+ public boolean ping(TaskAttemptID taskid, JvmContext jvmContext) throws IOException {
return true;
}
- public void commitPending(TaskAttemptID taskId, TaskStatus taskStatus)
- throws IOException, InterruptedException {
- statusUpdate(taskId, taskStatus);
+ public void commitPending(TaskAttemptID taskId, TaskStatus taskStatus,
+ JvmContext jvmContext) throws IOException, InterruptedException {
+ statusUpdate(taskId, taskStatus, jvmContext);
}
- public boolean canCommit(TaskAttemptID taskid) throws IOException {
+ public boolean canCommit(TaskAttemptID taskid, JvmContext jvmContext)
+ throws IOException {
return true;
}
- public boolean statusUpdate(TaskAttemptID taskId, TaskStatus taskStatus)
+ public boolean statusUpdate(TaskAttemptID taskId, TaskStatus taskStatus, JvmContext context)
throws IOException, InterruptedException {
StringBuffer buf = new StringBuffer("Task ");
buf.append(taskId);
@@ -103,18 +108,20 @@ public class IsolationRunner {
return true;
}
- public void reportDiagnosticInfo(TaskAttemptID taskid, String trace) throws IOException {
+ public void reportDiagnosticInfo(TaskAttemptID taskid, String trace,
+ JvmContext jvmContext) throws IOException {
LOG.info("Task " + taskid + " has problem " + trace);
}
- public MapTaskCompletionEventsUpdate getMapCompletionEvents(JobID jobId,
- int fromEventId, int maxLocs, TaskAttemptID id) throws IOException {
- return new MapTaskCompletionEventsUpdate(TaskCompletionEvent.EMPTY_ARRAY,
- false);
+ public MapTaskCompletionEventsUpdate getMapCompletionEvents(JobID jobId,
+ int fromEventId, int maxLocs, TaskAttemptID id, JvmContext jvmContext)
+ throws IOException {
+ return new MapTaskCompletionEventsUpdate(TaskCompletionEvent.EMPTY_ARRAY,
+ false);
}
public void reportNextRecordRange(TaskAttemptID taskid,
- SortedRanges.Range range) throws IOException {
+ SortedRanges.Range range, JvmContext jvmContext) throws IOException {
LOG.info("Task " + taskid + " reportedNextRecordRange " + range);
}
Modified: hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/JvmManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/JvmManager.java?rev=1091273&r1=1091272&r2=1091273&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/JvmManager.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/JvmManager.java Tue Apr 12 01:14:44 2011
@@ -128,6 +128,14 @@ class JvmManager {
}
}
+ public boolean validateTipToJvm(TaskInProgress tip, JVMId jvmId) {
+ if (jvmId.isMapJVM()) {
+ return mapJvmManager.validateTipToJvm(tip, jvmId);
+ } else {
+ return reduceJvmManager.validateTipToJvm(tip, jvmId);
+ }
+ }
+
public TaskInProgress getTaskForJvm(JVMId jvmId)
throws IOException {
if (jvmId.isMapJVM()) {
@@ -223,6 +231,23 @@ class JvmManager {
jvmIdToRunner.get(jvmId).setBusy(true);
}
+ synchronized public boolean validateTipToJvm(TaskInProgress tip, JVMId jvmId) {
+ if (jvmId == null) {
+ LOG.warn("Null jvmId. Cannot verify Jvm. validateTipToJvm returning false");
+ return false;
+ }
+ TaskRunner taskRunner = jvmToRunningTask.get(jvmId);
+ if (taskRunner == null) {
+ return false; //JvmId not known.
+ }
+ TaskInProgress knownTip = taskRunner.getTaskInProgress();
+ if (knownTip == tip) { // Valid to compare the addresses ? (or equals)
+ return true;
+ } else {
+ return false;
+ }
+ }
+
synchronized public TaskInProgress getTaskForJvm(JVMId jvmId)
throws IOException {
if (jvmToRunningTask.containsKey(jvmId)) {
Modified: hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java?rev=1091273&r1=1091272&r2=1091273&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java Tue Apr 12 01:14:44 2011
@@ -311,7 +311,7 @@ class LocalJobRunner implements JobSubmi
public JvmTask getTask(JvmContext context) { return null; }
- public boolean statusUpdate(TaskAttemptID taskId, TaskStatus taskStatus)
+ public boolean statusUpdate(TaskAttemptID taskId, TaskStatus taskStatus, JvmContext context)
throws IOException, InterruptedException {
LOG.info(taskStatus.getStateString());
float taskIndex = mapIds.indexOf(taskId);
@@ -333,9 +333,10 @@ class LocalJobRunner implements JobSubmi
* and it is waiting for the commit Response
*/
public void commitPending(TaskAttemptID taskid,
- TaskStatus taskStatus)
+ TaskStatus taskStatus,
+ JvmContext jvmContext)
throws IOException, InterruptedException {
- statusUpdate(taskid, taskStatus);
+ statusUpdate(taskid, taskStatus, jvmContext);
}
/**
@@ -347,51 +348,55 @@ class LocalJobRunner implements JobSubmi
completedTaskCounters.incrAllCounters(task.getCounters());
}
- public void reportDiagnosticInfo(TaskAttemptID taskid, String trace) {
+ public void reportDiagnosticInfo(TaskAttemptID taskid, String trace,
+ JvmContext jvmContext) {
// Ignore for now
}
public void reportNextRecordRange(TaskAttemptID taskid,
- SortedRanges.Range range) throws IOException {
+ SortedRanges.Range range, JvmContext jvmContext) throws IOException {
LOG.info("Task " + taskid + " reportedNextRecordRange " + range);
}
- public boolean ping(TaskAttemptID taskid) throws IOException {
+ public boolean ping(TaskAttemptID taskid, JvmContext jvmContext) throws IOException {
return true;
}
- public boolean canCommit(TaskAttemptID taskid)
+ public boolean canCommit(TaskAttemptID taskid, JvmContext jvmContext)
throws IOException {
return true;
}
- public void done(TaskAttemptID taskId) throws IOException {
+ public void done(TaskAttemptID taskId, JvmContext jvmContext)
+ throws IOException {
int taskIndex = mapIds.indexOf(taskId);
- if (taskIndex >= 0) { // mapping
+ if (taskIndex >= 0) { // mapping
status.setMapProgress(1.0f);
} else {
status.setReduceProgress(1.0f);
}
}
- public synchronized void fsError(TaskAttemptID taskId, String message)
- throws IOException {
- LOG.fatal("FSError: "+ message + "from task: " + taskId);
+ public synchronized void fsError(TaskAttemptID taskId, String message,
+ JvmContext jvmContext) throws IOException {
+ LOG.fatal("FSError: " + message + "from task: " + taskId);
}
- public void shuffleError(TaskAttemptID taskId, String message) throws IOException {
- LOG.fatal("shuffleError: "+ message + "from task: " + taskId);
+ public void shuffleError(TaskAttemptID taskId, String message,
+ JvmContext jvmContext) throws IOException {
+ LOG.fatal("shuffleError: " + message + "from task: " + taskId);
}
- public synchronized void fatalError(TaskAttemptID taskId, String msg)
- throws IOException {
- LOG.fatal("Fatal: "+ msg + "from task: " + taskId);
+ public synchronized void fatalError(TaskAttemptID taskId, String msg,
+ JvmContext jvmContext) throws IOException {
+ LOG.fatal("Fatal: " + msg + "from task: " + taskId);
}
- public MapTaskCompletionEventsUpdate getMapCompletionEvents(JobID jobId,
- int fromEventId, int maxLocs, TaskAttemptID id) throws IOException {
+ public MapTaskCompletionEventsUpdate getMapCompletionEvents(JobID jobId,
+ int fromEventId, int maxLocs, TaskAttemptID id, JvmContext jvmContext)
+ throws IOException {
return new MapTaskCompletionEventsUpdate(TaskCompletionEvent.EMPTY_ARRAY,
- false);
+ false);
}
@Override
Modified: hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/MapTask.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/MapTask.java?rev=1091273&r1=1091272&r2=1091273&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/MapTask.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/MapTask.java Tue Apr 12 01:14:44 2011
@@ -341,12 +341,13 @@ class MapTask extends Task {
}
@Override
- public void run(final JobConf job, final TaskUmbilicalProtocol umbilical)
+ public void run(final JobConf job, final TaskUmbilicalProtocol umbilical)
throws IOException, ClassNotFoundException, InterruptedException {
this.umbilical = umbilical;
// start thread that will handle communication with parent
- TaskReporter reporter = new TaskReporter(getProgress(), umbilical);
+ TaskReporter reporter = new TaskReporter(getProgress(), umbilical,
+ jvmContext);
reporter.startCommunicationThread();
boolean useNewApi = job.getUseNewMapper();
initialize(job, getJobID(), reporter, useNewApi);
Modified: hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/ReduceTask.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/ReduceTask.java?rev=1091273&r1=1091272&r2=1091273&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/ReduceTask.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/ReduceTask.java Tue Apr 12 01:14:44 2011
@@ -356,7 +356,8 @@ class ReduceTask extends Task {
reducePhase = getProgress().addPhase("reduce");
}
// start thread that will handle communication with parent
- TaskReporter reporter = new TaskReporter(getProgress(), umbilical);
+ TaskReporter reporter = new TaskReporter(getProgress(), umbilical,
+ jvmContext);
reporter.startCommunicationThread();
boolean useNewApi = job.getUseNewReducer();
initialize(job, getJobID(), reporter, useNewApi);
@@ -1330,7 +1331,7 @@ class ReduceTask extends Task {
LOG.error("Task: " + reduceTask.getTaskID() + " - FSError: " +
StringUtils.stringifyException(e));
try {
- umbilical.fsError(reduceTask.getTaskID(), e.getMessage());
+ umbilical.fsError(reduceTask.getTaskID(), e.getMessage(), jvmContext);
} catch (IOException io) {
LOG.error("Could not notify TT of FSError: " +
StringUtils.stringifyException(io));
@@ -2299,7 +2300,7 @@ class ReduceTask extends Task {
"Killing task " + getTaskID() + ".");
umbilical.shuffleError(getTaskID(),
"Exceeded MAX_FAILED_UNIQUE_FETCHES;"
- + " bailing-out.");
+ + " bailing-out.", jvmContext);
}
}
@@ -2857,7 +2858,7 @@ class ReduceTask extends Task {
umbilical.getMapCompletionEvents(reduceTask.getJobID(),
fromEventId.get(),
MAX_EVENTS_TO_FETCH,
- reduceTask.getTaskID());
+ reduceTask.getTaskID(), jvmContext);
TaskCompletionEvent events[] = update.getMapTaskCompletionEvents();
// Check if the reset is required.
Modified: hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/Task.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/Task.java?rev=1091273&r1=1091272&r2=1091273&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/Task.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/Task.java Tue Apr 12 01:14:44 2011
@@ -154,6 +154,7 @@ abstract public class Task implements Wr
private String pidFile = "";
protected TaskUmbilicalProtocol umbilical;
protected SecretKey tokenSecret;
+ protected JvmContext jvmContext;
////////////////////////////////////////////
// Constructors
@@ -220,6 +221,21 @@ abstract public class Task implements Wr
return this.tokenSecret;
}
+ /**
+ * Set the task JvmContext
+ * @param jvmContext
+ */
+ public void setJvmContext(JvmContext jvmContext) {
+ this.jvmContext = jvmContext;
+ }
+
+ /**
+ * Gets the task JvmContext
+ * @return the jvm context
+ */
+ public JvmContext getJvmContext() {
+ return this.jvmContext;
+ }
/**
* Get the index of this task within the job.
@@ -269,7 +285,7 @@ abstract public class Task implements Wr
? StringUtils.stringifyException(throwable)
: StringUtils.stringifyException(tCause);
try {
- umbilical.fatalError(id, cause);
+ umbilical.fatalError(id, cause, jvmContext);
} catch (IOException ioe) {
LOG.fatal("Failed to contact the tasktracker", ioe);
System.exit(-1);
@@ -446,8 +462,7 @@ abstract public class Task implements Wr
* @param umbilical for progress reports
*/
public abstract void run(JobConf job, TaskUmbilicalProtocol umbilical)
- throws IOException, ClassNotFoundException, InterruptedException;
-
+ throws IOException, ClassNotFoundException, InterruptedException;
/** Return an approprate thread runner for this task.
* @param tip TODO*/
@@ -509,6 +524,7 @@ abstract public class Task implements Wr
private TaskUmbilicalProtocol umbilical;
private InputSplit split = null;
private Progress taskProgress;
+ private JvmContext jvmContext;
private Thread pingThread = null;
private static final int PROGRESS_STATUS_LEN_LIMIT = 512;
private boolean done = true;
@@ -522,9 +538,10 @@ abstract public class Task implements Wr
private AtomicBoolean progressFlag = new AtomicBoolean(false);
TaskReporter(Progress taskProgress,
- TaskUmbilicalProtocol umbilical) {
+ TaskUmbilicalProtocol umbilical, JvmContext jvmContext) {
this.umbilical = umbilical;
this.taskProgress = taskProgress;
+ this.jvmContext = jvmContext;
}
// getters and setters for flag
void setProgressFlag() {
@@ -630,12 +647,12 @@ abstract public class Task implements Wr
taskStatus.statusUpdate(taskProgress.get(),
taskProgress.toString(),
counters);
- taskFound = umbilical.statusUpdate(taskId, taskStatus);
+ taskFound = umbilical.statusUpdate(taskId, taskStatus, jvmContext);
taskStatus.clearStatus();
}
else {
// send ping
- taskFound = umbilical.ping(taskId);
+ taskFound = umbilical.ping(taskId, jvmContext);
}
// if Task Tracker is not aware of our task ID (probably because it died and
@@ -709,7 +726,7 @@ abstract public class Task implements Wr
if (LOG.isDebugEnabled()) {
LOG.debug("sending reportNextRecordRange " + range);
}
- umbilical.reportNextRecordRange(taskId, range);
+ umbilical.reportNextRecordRange(taskId, range, jvmContext);
}
/**
@@ -783,7 +800,7 @@ abstract public class Task implements Wr
// say the task tracker that task is commit pending
while (true) {
try {
- umbilical.commitPending(taskId, taskStatus);
+ umbilical.commitPending(taskId, taskStatus, jvmContext);
break;
} catch (InterruptedException ie) {
// ignore
@@ -826,7 +843,7 @@ abstract public class Task implements Wr
int retries = MAX_RETRIES;
while (true) {
try {
- if (!umbilical.statusUpdate(getTaskID(), taskStatus)) {
+ if (!umbilical.statusUpdate(getTaskID(), taskStatus, jvmContext)) {
LOG.warn("Parent died. Exiting "+taskId);
System.exit(66);
}
@@ -883,7 +900,7 @@ abstract public class Task implements Wr
int retries = MAX_RETRIES;
while (true) {
try {
- umbilical.done(getTaskID());
+ umbilical.done(getTaskID(), jvmContext);
LOG.info("Task '" + taskId + "' done.");
return;
} catch (IOException ie) {
@@ -903,7 +920,7 @@ abstract public class Task implements Wr
int retries = MAX_RETRIES;
while (true) {
try {
- while (!umbilical.canCommit(taskId)) {
+ while (!umbilical.canCommit(taskId, jvmContext)) {
try {
Thread.sleep(1000);
} catch(InterruptedException ie) {
Modified: hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/TaskTracker.java?rev=1091273&r1=1091272&r2=1091273&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/TaskTracker.java Tue Apr 12 01:14:44 2011
@@ -2980,6 +2980,12 @@ public class TaskTracker implements MRCo
}
}
+ private void validateJVM(TaskInProgress tip, JvmContext jvmContext, TaskAttemptID taskid) throws IOException {
+ if (!jvmManager.validateTipToJvm(tip, jvmContext.jvmId)) {
+ throw new IOException("JvmValidate Failed. Ignoring request from task: " + taskid + ", with JvmId: " + jvmContext.jvmId);
+ }
+ }
+
private void authorizeJVM(org.apache.hadoop.mapreduce.JobID jobId)
throws IOException {
String currentJobId =
@@ -3039,11 +3045,13 @@ public class TaskTracker implements MRCo
* Called periodically to report Task progress, from 0.0 to 1.0.
*/
public synchronized boolean statusUpdate(TaskAttemptID taskid,
- TaskStatus taskStatus)
+ TaskStatus taskStatus,
+ JvmContext jvmContext)
throws IOException {
authorizeJVM(taskid.getJobID());
TaskInProgress tip = tasks.get(taskid);
if (tip != null) {
+ validateJVM(tip, jvmContext, taskid);
tip.reportProgress(taskStatus);
return true;
} else {
@@ -3056,9 +3064,16 @@ public class TaskTracker implements MRCo
* Called when the task dies before completion, and we want to report back
* diagnostic info
*/
- public synchronized void reportDiagnosticInfo(TaskAttemptID taskid, String info) throws IOException {
+ public synchronized void reportDiagnosticInfo(TaskAttemptID taskid,
+ String info, JvmContext jvmContext) throws IOException {
authorizeJVM(taskid.getJobID());
- reportDiagnosticInfoInternal(taskid, info);
+ TaskInProgress tip = tasks.get(taskid);
+ if (tip != null) {
+ validateJVM(tip, jvmContext, taskid);
+ tip.reportDiagnosticInfo(info);
+ } else {
+ LOG.warn("Error from unknown child task: "+taskid+". Ignored.");
+ }
}
/**
* Meant to be used internally
@@ -3077,10 +3092,11 @@ public class TaskTracker implements MRCo
}
public synchronized void reportNextRecordRange(TaskAttemptID taskid,
- SortedRanges.Range range) throws IOException {
+ SortedRanges.Range range, JvmContext jvmContext) throws IOException {
authorizeJVM(taskid.getJobID());
TaskInProgress tip = tasks.get(taskid);
if (tip != null) {
+ validateJVM(tip, jvmContext, taskid);
tip.reportNextRecordRange(range);
} else {
LOG.warn("reportNextRecordRange from unknown child task: "+taskid+". " +
@@ -3088,10 +3104,17 @@ public class TaskTracker implements MRCo
}
}
- /** Child checking to see if we're alive. Normally does nothing.*/
- public synchronized boolean ping(TaskAttemptID taskid) throws IOException {
+ /** Child checking to see if we're alive. Normally does nothing. */
+ public synchronized boolean ping(TaskAttemptID taskid, JvmContext jvmContext)
+ throws IOException {
authorizeJVM(taskid.getJobID());
- return tasks.get(taskid) != null;
+ TaskInProgress tip = tasks.get(taskid);
+ if (tip != null) {
+ validateJVM(tip, jvmContext, taskid);
+ return true;
+ } else {
+ return false;
+ }
}
/**
@@ -3099,33 +3122,38 @@ public class TaskTracker implements MRCo
* and it is waiting for the commit Response
*/
public synchronized void commitPending(TaskAttemptID taskid,
- TaskStatus taskStatus)
+ TaskStatus taskStatus,
+ JvmContext jvmContext)
throws IOException {
authorizeJVM(taskid.getJobID());
LOG.info("Task " + taskid + " is in commit-pending," +"" +
" task state:" +taskStatus.getRunState());
- statusUpdate(taskid, taskStatus);
+ // validateJVM is done in statusUpdate
+ statusUpdate(taskid, taskStatus, jvmContext);
reportTaskFinished(taskid, true);
}
/**
* Child checking whether it can commit
*/
- public synchronized boolean canCommit(TaskAttemptID taskid)
- throws IOException {
+ public synchronized boolean canCommit(TaskAttemptID taskid,
+ JvmContext jvmContext) throws IOException {
authorizeJVM(taskid.getJobID());
- return commitResponses.contains(taskid); //don't remove it now
+ TaskInProgress tip = tasks.get(taskid);
+ validateJVM(tip, jvmContext, taskid);
+ return commitResponses.contains(taskid); // don't remove it now
}
/**
* The task is done.
*/
- public synchronized void done(TaskAttemptID taskid)
+ public synchronized void done(TaskAttemptID taskid, JvmContext jvmContext)
throws IOException {
authorizeJVM(taskid.getJobID());
TaskInProgress tip = tasks.get(taskid);
- commitResponses.remove(taskid);
if (tip != null) {
+ validateJVM(tip, jvmContext, taskid);
+ commitResponses.remove(taskid);
tip.reportDone();
} else {
LOG.warn("Unknown child task done: "+taskid+". Ignored.");
@@ -3136,22 +3164,36 @@ public class TaskTracker implements MRCo
/**
* A reduce-task failed to shuffle the map-outputs. Kill the task.
*/
- public synchronized void shuffleError(TaskAttemptID taskId, String message)
+ public synchronized void shuffleError(TaskAttemptID taskId, String message, JvmContext jvmContext)
throws IOException {
authorizeJVM(taskId.getJobID());
- LOG.fatal("Task: " + taskId + " - Killed due to Shuffle Failure: " + message);
TaskInProgress tip = runningTasks.get(taskId);
- tip.reportDiagnosticInfo("Shuffle Error: " + message);
- purgeTask(tip, true);
+ if (tip != null) {
+ validateJVM(tip, jvmContext, taskId);
+ LOG.fatal("Task: " + taskId + " - Killed due to Shuffle Failure: "
+ + message);
+ tip.reportDiagnosticInfo("Shuffle Error: " + message);
+ purgeTask(tip, true);
+ } else {
+ LOG.warn("Unknown child task shuffleError: " + taskId + ". Ignored.");
+ }
}
/**
* A child task had a local filesystem error. Kill the task.
*/
- public synchronized void fsError(TaskAttemptID taskId, String message)
- throws IOException {
+ public synchronized void fsError(TaskAttemptID taskId, String message,
+ JvmContext jvmContext) throws IOException {
authorizeJVM(taskId.getJobID());
- fsErrorInternal(taskId, message);
+ TaskInProgress tip = runningTasks.get(taskId);
+ if (tip != null) {
+ validateJVM(tip, jvmContext, taskId);
+ LOG.fatal("Task: " + taskId + " - Killed due to FSError: " + message);
+ tip.reportDiagnosticInfo("FSError: " + message);
+ purgeTask(tip, true);
+ } else {
+ LOG.warn("Unknown child task fsError: "+taskId+". Ignored.");
+ }
}
/**
* Meant to be used internally
@@ -3170,18 +3212,29 @@ public class TaskTracker implements MRCo
/**
* A child task had a fatal error. Kill the task.
*/
- public synchronized void fatalError(TaskAttemptID taskId, String msg)
- throws IOException {
+ public synchronized void fatalError(TaskAttemptID taskId, String msg,
+ JvmContext jvmContext) throws IOException {
authorizeJVM(taskId.getJobID());
- LOG.fatal("Task: " + taskId + " - Killed : " + msg);
TaskInProgress tip = runningTasks.get(taskId);
- tip.reportDiagnosticInfo("Error: " + msg);
- purgeTask(tip, true);
+ if (tip != null) {
+ validateJVM(tip, jvmContext, taskId);
+ LOG.fatal("Task: " + taskId + " - Killed : " + msg);
+ tip.reportDiagnosticInfo("Error: " + msg);
+ purgeTask(tip, true);
+ } else {
+ LOG.warn("Unknown child task fatalError: "+taskId+". Ignored.");
+ }
}
public synchronized MapTaskCompletionEventsUpdate getMapCompletionEvents(
- JobID jobId, int fromEventId, int maxLocs, TaskAttemptID id)
- throws IOException {
+ JobID jobId, int fromEventId, int maxLocs, TaskAttemptID id,
+ JvmContext jvmContext) throws IOException {
+ TaskInProgress tip = runningTasks.get(id);
+ if (tip == null) {
+ throw new IOException("Unknown task; " + id
+ + ". Ignoring getMapCompletionEvents Request");
+ }
+ validateJVM(tip, jvmContext, id);
authorizeJVM(jobId);
TaskCompletionEvent[]mapEvents = TaskCompletionEvent.EMPTY_ARRAY;
synchronized (shouldReset) {
Modified: hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java?rev=1091273&r1=1091272&r2=1091273&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java Tue Apr 12 01:14:44 2011
@@ -61,7 +61,7 @@ public interface TaskUmbilicalProtocol e
* Version 18 Added fatalError for child to communicate fatal errors to TT
* */
- public static final long versionID = 18L;
+ public static final long versionID = 19L;
/**
* Called when a child task process starts, to get its task.
@@ -77,66 +77,78 @@ public interface TaskUmbilicalProtocol e
*
* @param taskId task-id of the child
* @param taskStatus status of the child
+ * @param jvmContext context the jvmContext running the task.
* @throws IOException
* @throws InterruptedException
* @return True if the task is known
*/
- boolean statusUpdate(TaskAttemptID taskId, TaskStatus taskStatus)
- throws IOException, InterruptedException;
+ boolean statusUpdate(TaskAttemptID taskId, TaskStatus taskStatus,
+ JvmContext context) throws IOException, InterruptedException;
/** Report error messages back to parent. Calls should be sparing, since all
* such messages are held in the job tracker.
* @param taskid the id of the task involved
* @param trace the text to report
+ * @param jvmContext context the jvmContext running the task.
*/
- void reportDiagnosticInfo(TaskAttemptID taskid, String trace) throws IOException;
+ void reportDiagnosticInfo(TaskAttemptID taskid, String trace,
+ JvmContext jvmContext) throws IOException;
/**
* Report the record range which is going to process next by the Task.
* @param taskid the id of the task involved
* @param range the range of record sequence nos
+ * @param jvmContext context the jvmContext running the task.
* @throws IOException
*/
- void reportNextRecordRange(TaskAttemptID taskid, SortedRanges.Range range)
- throws IOException;
+ void reportNextRecordRange(TaskAttemptID taskid, SortedRanges.Range range,
+ JvmContext jvmContext) throws IOException;
- /** Periodically called by child to check if parent is still alive.
+ /** Periodically called by child to check if parent is still alive.
+ * @param taskid the id of the task involved
+ * @param jvmContext context the jvmContext running the task.
* @return True if the task is known
*/
- boolean ping(TaskAttemptID taskid) throws IOException;
+ boolean ping(TaskAttemptID taskid, JvmContext jvmContext) throws IOException;
/** Report that the task is successfully completed. Failure is assumed if
* the task process exits without calling this.
* @param taskid task's id
+ * @param jvmContext context the jvmContext running the task.
*/
- void done(TaskAttemptID taskid) throws IOException;
+ void done(TaskAttemptID taskid, JvmContext jvmContext) throws IOException;
/**
* Report that the task is complete, but its commit is pending.
*
* @param taskId task's id
* @param taskStatus status of the child
+ * @param jvmContext context the jvmContext running the task.
* @throws IOException
*/
- void commitPending(TaskAttemptID taskId, TaskStatus taskStatus)
- throws IOException, InterruptedException;
+ void commitPending(TaskAttemptID taskId, TaskStatus taskStatus,
+ JvmContext jvmContext) throws IOException, InterruptedException;
/**
* Polling to know whether the task can go-ahead with commit
* @param taskid
+ * @param jvmContext context the jvmContext running the task.
* @return true/false
* @throws IOException
*/
- boolean canCommit(TaskAttemptID taskid) throws IOException;
+ boolean canCommit(TaskAttemptID taskid, JvmContext jvmContext) throws IOException;
- /** Report that a reduce-task couldn't shuffle map-outputs.*/
- void shuffleError(TaskAttemptID taskId, String message) throws IOException;
+ /** Report that a reduce-task couldn't shuffle map-outputs. */
+ void shuffleError(TaskAttemptID taskId, String message, JvmContext jvmContext)
+ throws IOException;
/** Report that the task encounted a local filesystem error.*/
- void fsError(TaskAttemptID taskId, String message) throws IOException;
+ void fsError(TaskAttemptID taskId, String message, JvmContext jvmContext)
+ throws IOException;
/** Report that the task encounted a fatal error.*/
- void fatalError(TaskAttemptID taskId, String message) throws IOException;
+ void fatalError(TaskAttemptID taskId, String message, JvmContext jvmContext)
+ throws IOException;
/** Called by a reduce task to get the map output locations for finished maps.
* Returns an update centered around the map-task-completion-events.
@@ -154,7 +166,8 @@ public interface TaskUmbilicalProtocol e
MapTaskCompletionEventsUpdate getMapCompletionEvents(JobID jobId,
int fromIndex,
int maxLocs,
- TaskAttemptID id)
+ TaskAttemptID id,
+ JvmContext jvmContext)
throws IOException;
/**
Modified: hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/mapred/MiniMRCluster.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/mapred/MiniMRCluster.java?rev=1091273&r1=1091272&r2=1091273&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/mapred/MiniMRCluster.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/mapred/MiniMRCluster.java Tue Apr 12 01:14:44 2011
@@ -581,7 +581,7 @@ public class MiniMRCluster {
new TaskAttemptID(jtId, jobId.getId(), false, 0, 0);
return taskTrackerList.get(index).getTaskTracker()
.getMapCompletionEvents(jobId, 0, max,
- dummy);
+ dummy, null);
}
/**
Modified: hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/mapred/TestTaskCommit.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/mapred/TestTaskCommit.java?rev=1091273&r1=1091272&r2=1091273&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/mapred/TestTaskCommit.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/mapred/TestTaskCommit.java Tue Apr 12 01:14:44 2011
@@ -99,32 +99,37 @@ public class TestTaskCommit extends Hado
boolean taskDone = false;
@Override
- public boolean canCommit(TaskAttemptID taskid) throws IOException {
+ public boolean canCommit(TaskAttemptID taskid, JvmContext jvmContext)
+ throws IOException {
return false;
}
@Override
- public void commitPending(TaskAttemptID taskId, TaskStatus taskStatus)
- throws IOException, InterruptedException {
+ public void commitPending(TaskAttemptID taskId, TaskStatus taskStatus,
+ JvmContext jvmContext) throws IOException, InterruptedException {
fail("Task should not go to commit-pending");
}
@Override
- public void done(TaskAttemptID taskid) throws IOException {
+ public void done(TaskAttemptID taskid, JvmContext jvmContext)
+ throws IOException {
taskDone = true;
}
@Override
- public void fatalError(TaskAttemptID taskId, String message)
- throws IOException { }
+ public void fatalError(TaskAttemptID taskId, String message,
+ JvmContext jvmContext) throws IOException {
+ }
@Override
- public void fsError(TaskAttemptID taskId, String message)
- throws IOException { }
+ public void fsError(TaskAttemptID taskId, String message,
+ JvmContext jvmContext) throws IOException {
+ }
@Override
public MapTaskCompletionEventsUpdate getMapCompletionEvents(JobID jobId,
- int fromIndex, int maxLocs, TaskAttemptID id) throws IOException {
+ int fromIndex, int maxLocs, TaskAttemptID id, JvmContext jvmContext)
+ throws IOException {
return null;
}
@@ -134,28 +139,29 @@ public class TestTaskCommit extends Hado
}
@Override
- public boolean ping(TaskAttemptID taskid) throws IOException {
+ public boolean ping(TaskAttemptID taskid, JvmContext jvmContext)
+ throws IOException {
return true;
}
@Override
- public void reportDiagnosticInfo(TaskAttemptID taskid, String trace)
- throws IOException {
+ public void reportDiagnosticInfo(TaskAttemptID taskid, String trace,
+ JvmContext jvmContext) throws IOException {
}
@Override
- public void reportNextRecordRange(TaskAttemptID taskid, Range range)
- throws IOException {
+ public void reportNextRecordRange(TaskAttemptID taskid, Range range,
+ JvmContext jvmContext) throws IOException {
}
@Override
- public void shuffleError(TaskAttemptID taskId, String message)
- throws IOException {
+ public void shuffleError(TaskAttemptID taskId, String message,
+ JvmContext jvmContext) throws IOException {
}
@Override
- public boolean statusUpdate(TaskAttemptID taskId, TaskStatus taskStatus)
- throws IOException, InterruptedException {
+ public boolean statusUpdate(TaskAttemptID taskId, TaskStatus taskStatus,
+ JvmContext jvmContext) throws IOException, InterruptedException {
return true;
}
@@ -166,9 +172,9 @@ public class TestTaskCommit extends Hado
}
@Override
- public void
- updatePrivateDistributedCacheSizes(org.apache.hadoop.mapreduce.JobID jobId,
- long[] sizes) throws IOException {
+ public void updatePrivateDistributedCacheSizes(
+ org.apache.hadoop.mapreduce.JobID jobId, long[] sizes)
+ throws IOException {
// NOTHING
}
}
Modified: hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/mapreduce/security/TestUmbilicalProtocolWithJobToken.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/mapreduce/security/TestUmbilicalProtocolWithJobToken.java?rev=1091273&r1=1091272&r2=1091273&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/mapreduce/security/TestUmbilicalProtocolWithJobToken.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/mapreduce/security/TestUmbilicalProtocolWithJobToken.java Tue Apr 12 01:14:44 2011
@@ -102,7 +102,7 @@ public class TestUmbilicalProtocolWithJo
proxy = (TaskUmbilicalProtocol) RPC.getProxy(
TaskUmbilicalProtocol.class, TaskUmbilicalProtocol.versionID,
addr, conf);
- proxy.ping(null);
+ proxy.ping(null, null);
} finally {
server.stop();
if (proxy != null) {