You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by sz...@apache.org on 2010/03/16 19:00:00 UTC

svn commit: r923907 - in /hadoop/mapreduce/trunk: CHANGES.txt src/java/org/apache/hadoop/mapred/TaskStatus.java src/test/mapred/org/apache/hadoop/mapred/TestTaskStatus.java

Author: szetszwo
Date: Tue Mar 16 18:00:00 2010
New Revision: 923907

URL: http://svn.apache.org/viewvc?rev=923907&view=rev
Log:
MAPREDUCE-1482. Truncate state string and diagnostic information in TaskStatus. Contributed by Amar Kamat

Modified:
    hadoop/mapreduce/trunk/CHANGES.txt
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskStatus.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskStatus.java

Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=923907&r1=923906&r2=923907&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Tue Mar 16 18:00:00 2010
@@ -211,6 +211,9 @@ Trunk (unreleased changes)
     MAPREDUCE-1403. Save the size and number of distributed cache artifacts in
     the configuration. (Arun Murthy via cdouglas)
 
+    MAPREDUCE-1482. Truncate state string and diagnostic information in
+    TaskStatus. (Amar Kamat via szetszwo)
+
   OPTIMIZATIONS
 
     MAPREDUCE-270. Fix the tasktracker to optionally send an out-of-band

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskStatus.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskStatus.java?rev=923907&r1=923906&r2=923907&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskStatus.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskStatus.java Tue Mar 16 18:00:00 2010
@@ -60,7 +60,20 @@ public abstract class TaskStatus impleme
   private Counters counters;
   private boolean includeCounters;
   private SortedRanges.Range nextRecordRange = new SortedRanges.Range();
+  
+  // max task-status string size
+  static final int MAX_STRING_SIZE = 1024;
 
+  /**
+   * Testcases can override {@link #getMaxStringSize()} to control the max-size 
+   * of strings in {@link TaskStatus}. Note that the {@link TaskStatus} is never
+   * exposed to clients or users (i.e Map or Reduce) and hence users cannot 
+   * override this api to pass large strings in {@link TaskStatus}.
+   */
+  protected int getMaxStringSize() {
+    return MAX_STRING_SIZE;
+  }
+  
   public TaskStatus() {
     taskid = new TaskAttemptID();
     numSlots = 0;
@@ -74,8 +87,8 @@ public abstract class TaskStatus impleme
     this.progress = progress;
     this.numSlots = numSlots;
     this.runState = runState;
-    this.diagnosticInfo = diagnosticInfo;
-    this.stateString = stateString;
+    setDiagnosticInfo(diagnosticInfo);
+    setStateString(stateString);
     this.taskTracker = taskTracker;
     this.phase = phase;
     this.counters = counters;
@@ -97,12 +110,39 @@ public abstract class TaskStatus impleme
   public void setTaskTracker(String tracker) { this.taskTracker = tracker;}
   public void setRunState(State runState) { this.runState = runState; }
   public String getDiagnosticInfo() { return diagnosticInfo; }
-  public void setDiagnosticInfo(String info) { 
+  public void setDiagnosticInfo(String info) {
+    // if the diag-info has already reached its max then log and return
+    if (diagnosticInfo != null 
+        && diagnosticInfo.length() == getMaxStringSize()) {
+      LOG.info("task-diagnostic-info for task " + taskid + " : " + info);
+      return;
+    }
     diagnosticInfo = 
       ((diagnosticInfo == null) ? info : diagnosticInfo.concat(info)); 
+    // trim the string to MAX_STRING_SIZE if needed
+    if (diagnosticInfo != null 
+        && diagnosticInfo.length() > getMaxStringSize()) {
+      LOG.info("task-diagnostic-info for task " + taskid + " : " 
+               + diagnosticInfo);
+      diagnosticInfo = diagnosticInfo.substring(0, getMaxStringSize());
+    }
   }
   public String getStateString() { return stateString; }
-  public void setStateString(String stateString) { this.stateString = stateString; }
+  /**
+   * Set the state of the {@link TaskStatus}.
+   */
+  public void setStateString(String stateString) {
+    if (stateString != null) {
+      if (stateString.length() <= getMaxStringSize()) {
+        this.stateString = stateString;
+      } else {
+        // log it
+        LOG.info("state-string for task " + taskid + " : " + stateString);
+        // trim the state string
+        this.stateString = stateString.substring(0, getMaxStringSize());
+      }
+    }
+  }
   
   /**
    * Get the next record range which is going to be processed by Task.
@@ -341,7 +381,7 @@ public abstract class TaskStatus impleme
   synchronized void statusUpdate(TaskStatus status) {
     setProgress (status.getProgress());
     this.runState = status.getRunState();
-    this.stateString = status.getStateString();
+    setStateString(status.getStateString());
     this.nextRecordRange = status.getNextRecordRange();
 
     setDiagnosticInfo(status.getDiagnosticInfo());
@@ -430,8 +470,8 @@ public abstract class TaskStatus impleme
     setProgress(in.readFloat());
     this.numSlots = in.readInt();
     this.runState = WritableUtils.readEnum(in, State.class);
-    this.diagnosticInfo = Text.readString(in);
-    this.stateString = Text.readString(in);
+    setDiagnosticInfo(Text.readString(in));
+    setStateString(Text.readString(in));
     this.phase = WritableUtils.readEnum(in, Phase.class); 
     this.startTime = in.readLong(); 
     this.finishTime = in.readLong(); 

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskStatus.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskStatus.java?rev=923907&r1=923906&r2=923907&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskStatus.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskStatus.java Tue Mar 16 18:00:00 2010
@@ -17,10 +17,13 @@
  */
 package org.apache.hadoop.mapred;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.junit.Test;
 import static org.junit.Assert.*;
 
 public class TestTaskStatus {
+  private static final Log LOG = LogFactory.getLog(TestTaskStatus.class);
 
   @Test
   public void testMapTaskStatusStartAndFinishTimes() {
@@ -70,5 +73,135 @@ public class TestTaskStatus {
     status.setFinishTime(currentTime);
     assertEquals("Finish time of the task status not set correctly.",
         currentTime, status.getFinishTime());
+    
+    // test with null task-diagnostics
+    TaskStatus ts = ((TaskStatus)status.clone());
+    ts.setDiagnosticInfo(null);
+    ts.setDiagnosticInfo("");
+    ts.setStateString(null);
+    ts.setStateString("");
+    ((TaskStatus)status.clone()).statusUpdate(ts);
+    
+    // test with null state-string
+    ((TaskStatus)status.clone()).statusUpdate(0, null, null);
+    ((TaskStatus)status.clone()).statusUpdate(0, "", null);
+    ((TaskStatus)status.clone()).statusUpdate(null, 0, "", null, 1);
+  }
+  
+  /**
+   * Test the {@link TaskStatus} against large sized task-diagnostic-info and 
+   * state-string. Does the following
+   *  - create Map/Reduce TaskStatus such that the task-diagnostic-info and 
+   *    state-string are small strings and check their contents
+   *  - append them with small string and check their contents
+   *  - append them with large string and check their size
+   *  - update the status using statusUpdate() calls and check the size/contents
+   *  - create Map/Reduce TaskStatus with large string and check their size
+   */
+  @Test
+  public void testTaskDiagnosticsAndStateString() {
+    // check the default case
+    String test = "hi";
+    final int maxSize = 16;
+    TaskStatus status = new TaskStatus(null, 0, 0, null, test, test, null, null, 
+                                       null) {
+      @Override
+      protected int getMaxStringSize() {
+        return maxSize;
+      }
+
+      @Override
+      public void addFetchFailedMap(TaskAttemptID mapTaskId) {
+      }
+
+      @Override
+      public boolean getIsMap() {
+        return false;
+      }
+    };
+    assertEquals("Small diagnostic info test failed", 
+                 status.getDiagnosticInfo(), test);
+    assertEquals("Small state string test failed", status.getStateString(), 
+                 test);
+    
+    // now append some small string and check
+    String newDInfo = test.concat(test);
+    status.setDiagnosticInfo(test);
+    status.setStateString(newDInfo);
+    assertEquals("Small diagnostic info append failed", 
+                 newDInfo, status.getDiagnosticInfo());
+    assertEquals("Small state-string append failed", 
+                 newDInfo, status.getStateString());
+    
+    // update the status with small state strings
+    TaskStatus newStatus = (TaskStatus)status.clone();
+    String newSInfo = "hi1";
+    newStatus.setStateString(newSInfo);
+    status.statusUpdate(newStatus);
+    newDInfo = newDInfo.concat(newStatus.getDiagnosticInfo());
+    
+    assertEquals("Status-update on diagnostic-info failed", 
+                 newDInfo, status.getDiagnosticInfo());
+    assertEquals("Status-update on state-string failed", 
+                 newSInfo, status.getStateString());
+    
+    newSInfo = "hi2";
+    status.statusUpdate(0, newSInfo, null);
+    assertEquals("Status-update on state-string failed", 
+                 newSInfo, status.getStateString());
+    
+    newSInfo = "hi3";
+    status.statusUpdate(null, 0, newSInfo, null, 0);
+    assertEquals("Status-update on state-string failed", 
+                 newSInfo, status.getStateString());
+    
+    
+    // now append each with large string
+    String large = "hihihihihihihihihihi"; // 20 chars
+    status.setDiagnosticInfo(large);
+    status.setStateString(large);
+    assertEquals("Large diagnostic info append test failed", 
+                 maxSize, status.getDiagnosticInfo().length());
+    assertEquals("Large state-string append test failed",
+                 maxSize, status.getStateString().length());
+    
+    // update a large status with large strings
+    newStatus.setDiagnosticInfo(large + "0");
+    newStatus.setStateString(large + "1");
+    status.statusUpdate(newStatus);
+    assertEquals("Status-update on diagnostic info failed",
+                 maxSize, status.getDiagnosticInfo().length());
+    assertEquals("Status-update on state-string failed", 
+                 maxSize, status.getStateString().length());
+    
+    status.statusUpdate(0, large + "2", null);
+    assertEquals("Status-update on state-string failed", 
+                 maxSize, status.getStateString().length());
+    
+    status.statusUpdate(null, 0, large + "3", null, 0);
+    assertEquals("Status-update on state-string failed", 
+                 maxSize, status.getStateString().length());
+    
+    // test passing large string in constructor
+    status = new TaskStatus(null, 0, 0, null, large, large, null, null, 
+        null) {
+      @Override
+      protected int getMaxStringSize() {
+        return maxSize;
+      }
+
+      @Override
+      public void addFetchFailedMap(TaskAttemptID mapTaskId) {
+      }
+
+      @Override
+      public boolean getIsMap() {
+        return false;
+      }
+    };
+    assertEquals("Large diagnostic info test failed", 
+                maxSize, status.getDiagnosticInfo().length());
+    assertEquals("Large state-string test failed", 
+                 maxSize, status.getStateString().length());
   }
 }