You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by sz...@apache.org on 2010/03/16 19:00:00 UTC
svn commit: r923907 - in /hadoop/mapreduce/trunk: CHANGES.txt
src/java/org/apache/hadoop/mapred/TaskStatus.java
src/test/mapred/org/apache/hadoop/mapred/TestTaskStatus.java
Author: szetszwo
Date: Tue Mar 16 18:00:00 2010
New Revision: 923907
URL: http://svn.apache.org/viewvc?rev=923907&view=rev
Log:
MAPREDUCE-1482. Truncate state string and diagnostic information in TaskStatus. Contributed by Amar Kamat
Modified:
hadoop/mapreduce/trunk/CHANGES.txt
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskStatus.java
hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskStatus.java
Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=923907&r1=923906&r2=923907&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Tue Mar 16 18:00:00 2010
@@ -211,6 +211,9 @@ Trunk (unreleased changes)
MAPREDUCE-1403. Save the size and number of distributed cache artifacts in
the configuration. (Arun Murthy via cdouglas)
+ MAPREDUCE-1482. Truncate state string and diagnostic information in
+ TaskStatus. (Amar Kamat via szetszwo)
+
OPTIMIZATIONS
MAPREDUCE-270. Fix the tasktracker to optionally send an out-of-band
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskStatus.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskStatus.java?rev=923907&r1=923906&r2=923907&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskStatus.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskStatus.java Tue Mar 16 18:00:00 2010
@@ -60,7 +60,20 @@ public abstract class TaskStatus impleme
private Counters counters;
private boolean includeCounters;
private SortedRanges.Range nextRecordRange = new SortedRanges.Range();
+
+ // max task-status string size
+ static final int MAX_STRING_SIZE = 1024;
+ /**
+ * Testcases can override {@link #getMaxStringSize()} to control the max-size
+ * of strings in {@link TaskStatus}. Note that the {@link TaskStatus} is never
+ * exposed to clients or users (i.e Map or Reduce) and hence users cannot
+ * override this api to pass large strings in {@link TaskStatus}.
+ */
+ protected int getMaxStringSize() {
+ return MAX_STRING_SIZE;
+ }
+
public TaskStatus() {
taskid = new TaskAttemptID();
numSlots = 0;
@@ -74,8 +87,8 @@ public abstract class TaskStatus impleme
this.progress = progress;
this.numSlots = numSlots;
this.runState = runState;
- this.diagnosticInfo = diagnosticInfo;
- this.stateString = stateString;
+ setDiagnosticInfo(diagnosticInfo);
+ setStateString(stateString);
this.taskTracker = taskTracker;
this.phase = phase;
this.counters = counters;
@@ -97,12 +110,39 @@ public abstract class TaskStatus impleme
public void setTaskTracker(String tracker) { this.taskTracker = tracker;}
public void setRunState(State runState) { this.runState = runState; }
public String getDiagnosticInfo() { return diagnosticInfo; }
- public void setDiagnosticInfo(String info) {
+ public void setDiagnosticInfo(String info) {
+ // if the diag-info has already reached its max then log and return
+ if (diagnosticInfo != null
+ && diagnosticInfo.length() == getMaxStringSize()) {
+ LOG.info("task-diagnostic-info for task " + taskid + " : " + info);
+ return;
+ }
diagnosticInfo =
((diagnosticInfo == null) ? info : diagnosticInfo.concat(info));
+ // trim the string to MAX_STRING_SIZE if needed
+ if (diagnosticInfo != null
+ && diagnosticInfo.length() > getMaxStringSize()) {
+ LOG.info("task-diagnostic-info for task " + taskid + " : "
+ + diagnosticInfo);
+ diagnosticInfo = diagnosticInfo.substring(0, getMaxStringSize());
+ }
}
public String getStateString() { return stateString; }
- public void setStateString(String stateString) { this.stateString = stateString; }
+ /**
+ * Set the state of the {@link TaskStatus}.
+ */
+ public void setStateString(String stateString) {
+ if (stateString != null) {
+ if (stateString.length() <= getMaxStringSize()) {
+ this.stateString = stateString;
+ } else {
+ // log it
+ LOG.info("state-string for task " + taskid + " : " + stateString);
+ // trim the state string
+ this.stateString = stateString.substring(0, getMaxStringSize());
+ }
+ }
+ }
/**
* Get the next record range which is going to be processed by Task.
@@ -341,7 +381,7 @@ public abstract class TaskStatus impleme
synchronized void statusUpdate(TaskStatus status) {
setProgress (status.getProgress());
this.runState = status.getRunState();
- this.stateString = status.getStateString();
+ setStateString(status.getStateString());
this.nextRecordRange = status.getNextRecordRange();
setDiagnosticInfo(status.getDiagnosticInfo());
@@ -430,8 +470,8 @@ public abstract class TaskStatus impleme
setProgress(in.readFloat());
this.numSlots = in.readInt();
this.runState = WritableUtils.readEnum(in, State.class);
- this.diagnosticInfo = Text.readString(in);
- this.stateString = Text.readString(in);
+ setDiagnosticInfo(Text.readString(in));
+ setStateString(Text.readString(in));
this.phase = WritableUtils.readEnum(in, Phase.class);
this.startTime = in.readLong();
this.finishTime = in.readLong();
Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskStatus.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskStatus.java?rev=923907&r1=923906&r2=923907&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskStatus.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskStatus.java Tue Mar 16 18:00:00 2010
@@ -17,10 +17,13 @@
*/
package org.apache.hadoop.mapred;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.junit.Test;
import static org.junit.Assert.*;
public class TestTaskStatus {
+ private static final Log LOG = LogFactory.getLog(TestTaskStatus.class);
@Test
public void testMapTaskStatusStartAndFinishTimes() {
@@ -70,5 +73,135 @@ public class TestTaskStatus {
status.setFinishTime(currentTime);
assertEquals("Finish time of the task status not set correctly.",
currentTime, status.getFinishTime());
+
+ // test with null task-diagnostics
+ TaskStatus ts = ((TaskStatus)status.clone());
+ ts.setDiagnosticInfo(null);
+ ts.setDiagnosticInfo("");
+ ts.setStateString(null);
+ ts.setStateString("");
+ ((TaskStatus)status.clone()).statusUpdate(ts);
+
+ // test with null state-string
+ ((TaskStatus)status.clone()).statusUpdate(0, null, null);
+ ((TaskStatus)status.clone()).statusUpdate(0, "", null);
+ ((TaskStatus)status.clone()).statusUpdate(null, 0, "", null, 1);
+ }
+
+ /**
+ * Test the {@link TaskStatus} against large sized task-diagnostic-info and
+ * state-string. Does the following
+ * - create Map/Reduce TaskStatus such that the task-diagnostic-info and
+ * state-string are small strings and check their contents
+ * - append them with small string and check their contents
+ * - append them with large string and check their size
+ * - update the status using statusUpdate() calls and check the size/contents
+ * - create Map/Reduce TaskStatus with large string and check their size
+ */
+ @Test
+ public void testTaskDiagnosticsAndStateString() {
+ // check the default case
+ String test = "hi";
+ final int maxSize = 16;
+ TaskStatus status = new TaskStatus(null, 0, 0, null, test, test, null, null,
+ null) {
+ @Override
+ protected int getMaxStringSize() {
+ return maxSize;
+ }
+
+ @Override
+ public void addFetchFailedMap(TaskAttemptID mapTaskId) {
+ }
+
+ @Override
+ public boolean getIsMap() {
+ return false;
+ }
+ };
+ assertEquals("Small diagnostic info test failed",
+ status.getDiagnosticInfo(), test);
+ assertEquals("Small state string test failed", status.getStateString(),
+ test);
+
+ // now append some small string and check
+ String newDInfo = test.concat(test);
+ status.setDiagnosticInfo(test);
+ status.setStateString(newDInfo);
+ assertEquals("Small diagnostic info append failed",
+ newDInfo, status.getDiagnosticInfo());
+ assertEquals("Small state-string append failed",
+ newDInfo, status.getStateString());
+
+ // update the status with small state strings
+ TaskStatus newStatus = (TaskStatus)status.clone();
+ String newSInfo = "hi1";
+ newStatus.setStateString(newSInfo);
+ status.statusUpdate(newStatus);
+ newDInfo = newDInfo.concat(newStatus.getDiagnosticInfo());
+
+ assertEquals("Status-update on diagnostic-info failed",
+ newDInfo, status.getDiagnosticInfo());
+ assertEquals("Status-update on state-string failed",
+ newSInfo, status.getStateString());
+
+ newSInfo = "hi2";
+ status.statusUpdate(0, newSInfo, null);
+ assertEquals("Status-update on state-string failed",
+ newSInfo, status.getStateString());
+
+ newSInfo = "hi3";
+ status.statusUpdate(null, 0, newSInfo, null, 0);
+ assertEquals("Status-update on state-string failed",
+ newSInfo, status.getStateString());
+
+
+ // now append each with large string
+ String large = "hihihihihihihihihihi"; // 20 chars
+ status.setDiagnosticInfo(large);
+ status.setStateString(large);
+ assertEquals("Large diagnostic info append test failed",
+ maxSize, status.getDiagnosticInfo().length());
+ assertEquals("Large state-string append test failed",
+ maxSize, status.getStateString().length());
+
+ // update a large status with large strings
+ newStatus.setDiagnosticInfo(large + "0");
+ newStatus.setStateString(large + "1");
+ status.statusUpdate(newStatus);
+ assertEquals("Status-update on diagnostic info failed",
+ maxSize, status.getDiagnosticInfo().length());
+ assertEquals("Status-update on state-string failed",
+ maxSize, status.getStateString().length());
+
+ status.statusUpdate(0, large + "2", null);
+ assertEquals("Status-update on state-string failed",
+ maxSize, status.getStateString().length());
+
+ status.statusUpdate(null, 0, large + "3", null, 0);
+ assertEquals("Status-update on state-string failed",
+ maxSize, status.getStateString().length());
+
+ // test passing large string in constructor
+ status = new TaskStatus(null, 0, 0, null, large, large, null, null,
+ null) {
+ @Override
+ protected int getMaxStringSize() {
+ return maxSize;
+ }
+
+ @Override
+ public void addFetchFailedMap(TaskAttemptID mapTaskId) {
+ }
+
+ @Override
+ public boolean getIsMap() {
+ return false;
+ }
+ };
+ assertEquals("Large diagnostic info test failed",
+ maxSize, status.getDiagnosticInfo().length());
+ assertEquals("Large state-string test failed",
+ maxSize, status.getStateString().length());
}
}