You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by yu...@apache.org on 2013/02/20 23:34:52 UTC

svn commit: r1448447 - in /incubator/ambari/trunk: ./ ambari-server/src/main/java/org/apache/ambari/eventdb/db/ ambari-server/src/main/java/org/apache/ambari/eventdb/model/ ambari-server/src/main/java/org/apache/ambari/eventdb/webservice/ ambari-web/ve...

Author: yusaku
Date: Wed Feb 20 22:34:52 2013
New Revision: 1448447

URL: http://svn.apache.org/r1448447
Log:
AMBARI-1457. Improve Job Diagnostics. (Billie Rinaldi via yusaku)

Modified:
    incubator/ambari/trunk/CHANGES.txt
    incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/eventdb/db/DBConnector.java
    incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/eventdb/db/PostgresConnector.java
    incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/eventdb/model/Jobs.java
    incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/eventdb/model/TaskAttempt.java
    incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/eventdb/webservice/WorkflowJsonService.java
    incubator/ambari/trunk/ambari-web/vendor/scripts/workflow_visualization.js

Modified: incubator/ambari/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/CHANGES.txt?rev=1448447&r1=1448446&r2=1448447&view=diff
==============================================================================
--- incubator/ambari/trunk/CHANGES.txt (original)
+++ incubator/ambari/trunk/CHANGES.txt Wed Feb 20 22:34:52 2013
@@ -42,6 +42,8 @@ Trunk (unreleased changes):
  accessible for demo/test purposes. (mahadev)
 
  IMPROVEMENTS
+
+ AMBARI-1457. Improve Job Diagnostics. (Billie Rinaldi via yusaku)
  
  AMBARI-1453. Move Ambari Web application config from initialize.js to
  another config file. (yusaku)

Modified: incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/eventdb/db/DBConnector.java
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/eventdb/db/DBConnector.java?rev=1448447&r1=1448446&r2=1448447&view=diff
==============================================================================
--- incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/eventdb/db/DBConnector.java (original)
+++ incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/eventdb/db/DBConnector.java Wed Feb 20 22:34:52 2013
@@ -37,13 +37,20 @@ public interface DBConnector {
   
   public DataTable fetchWorkflows(int offset, int limit, String searchTerm, int echo, WorkflowFields field, boolean sortAscending, String searchWorkflowId,
       String searchWorkflowName, String searchWorkflowType, String searchUserName, int minJobs, int maxJobs, long minInputBytes, long maxInputBytes,
-      long minOutputBytes, long maxOutputBytes, long minDuration, long maxDuration, long minStartTime, long maxStartTime) throws IOException;
+      long minOutputBytes, long maxOutputBytes, long minDuration, long maxDuration, long minStartTime, long maxStartTime, long minFinishTime, long maxFinishTime)
+      throws IOException;
   
   public List<JobDBEntry> fetchJobDetails(String workflowID) throws IOException;
   
+  public List<JobDBEntry> fetchJobDetails(long minFinishTime, long maxStartTime) throws IOException;
+  
   public long[] fetchJobStartStopTimes(String jobID) throws IOException;
   
-  public List<TaskAttempt> fetchTaskAttempts(String jobID, String taskType) throws IOException;
+  public List<TaskAttempt> fetchJobTaskAttempts(String jobID) throws IOException;
+  
+  public List<TaskAttempt> fetchWorkflowTaskAttempts(String workflowID) throws IOException;
+  
+  public List<TaskAttempt> fetchTaskAttempts(long minFinishTime, long maxStartTime) throws IOException;
   
   public void close();
 }

Modified: incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/eventdb/db/PostgresConnector.java
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/eventdb/db/PostgresConnector.java?rev=1448447&r1=1448446&r2=1448447&view=diff
==============================================================================
--- incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/eventdb/db/PostgresConnector.java (original)
+++ incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/eventdb/db/PostgresConnector.java Wed Feb 20 22:34:52 2013
@@ -75,9 +75,17 @@ public class PostgresConnector implement
         + WorkflowFields.STARTTIME + ") as " + SummaryFields.youngest + ", max(" + WorkflowFields.STARTTIME + ") as " + SummaryFields.oldest + " FROM "
         + WORKFLOW_TABLE_NAME),
     FJD_PS("SELECT " + JobDBEntry.JOB_FIELDS + " FROM " + JOB_TABLE_NAME + " WHERE " + JobFields.WORKFLOWID.toString() + " = ?"),
+    FJD_TIMERANGE_PS("SELECT " + JobDBEntry.JOB_FIELDS + " FROM " + JOB_TABLE_NAME + " WHERE " + JobFields.FINISHTIME.toString() + " >= ? AND "
+        + JobFields.SUBMITTIME.toString() + " <= ? ORDER BY " + JobFields.WORKFLOWID + ", " + JobFields.JOBID),
     FJSS_PS("SELECT " + JobFields.SUBMITTIME + ", " + JobFields.FINISHTIME + " FROM " + JOB_TABLE_NAME + " WHERE " + JobFields.JOBID + " = ?"),
-    FTA_PS("SELECT " + TaskAttempt.TASK_ATTEMPT_FIELDS + " FROM " + TASK_ATTEMPT_TABLE_NAME + " WHERE " + TaskAttemptFields.JOBID + " = ? AND "
-        + TaskAttemptFields.TASKTYPE + " = ? ORDER BY " + TaskAttemptFields.STARTTIME);
+    FJTA_PS("SELECT " + TaskAttempt.TASK_ATTEMPT_FIELDS + " FROM " + TASK_ATTEMPT_TABLE_NAME + " WHERE " + TaskAttemptFields.JOBID + " = ? ORDER BY "
+        + TaskAttemptFields.STARTTIME),
+    FWTA_PS("SELECT " + TaskAttempt.TASK_ATTEMPT_FIELDS + " FROM " + TASK_ATTEMPT_TABLE_NAME + ", (SELECT " + JobFields.JOBID + " as id FROM " + JOB_TABLE_NAME
+        + " WHERE " + JobFields.WORKFLOWID + " = ?) AS jobs WHERE " + TASK_ATTEMPT_TABLE_NAME + "." + TaskAttemptFields.JOBID + " = jobs.id "
+        + " ORDER BY " + TaskAttemptFields.JOBID + "," + TaskAttemptFields.STARTTIME + ", " + TaskAttemptFields.FINISHTIME),
+    FTA_TIMERANGE_PS("SELECT " + TaskAttempt.TASK_ATTEMPT_FIELDS + " FROM " + TASK_ATTEMPT_TABLE_NAME + " WHERE " + TaskAttemptFields.FINISHTIME + " >= ? AND "
+        + TaskAttemptFields.STARTTIME + " <= ? AND (" + TaskAttemptFields.TASKTYPE + " = 'MAP' OR  " + TaskAttemptFields.TASKTYPE + " = 'REDUCE') ORDER BY "
+        + TaskAttemptFields.STARTTIME);
     
     private String statementString;
     
@@ -219,7 +227,8 @@ public class PostgresConnector implement
   @Override
   public DataTable fetchWorkflows(int offset, int limit, String searchTerm, int echo, WorkflowFields col, boolean sortAscending, String searchWorkflowId,
       String searchWorkflowName, String searchWorkflowType, String searchUserName, int minJobs, int maxJobs, long minInputBytes, long maxInputBytes,
-      long minOutputBytes, long maxOutputBytes, long minDuration, long maxDuration, long minStartTime, long maxStartTime) throws IOException {
+      long minOutputBytes, long maxOutputBytes, long minDuration, long maxDuration, long minStartTime, long maxStartTime, long minFinishTime, long maxFinishTime)
+      throws IOException {
     int total = 0;
     PreparedStatement ps = getPS(Statements.FW_COUNT_PS);
     ResultSet rs = null;
@@ -239,7 +248,7 @@ public class PostgresConnector implement
     }
     
     String searchClause = buildSearchClause(searchTerm, searchWorkflowId, searchWorkflowName, searchWorkflowType, searchUserName, minJobs, maxJobs,
-        minInputBytes, maxInputBytes, minOutputBytes, maxOutputBytes, minDuration, maxDuration, minStartTime, maxStartTime);
+        minInputBytes, maxInputBytes, minOutputBytes, maxOutputBytes, minDuration, maxDuration, minStartTime, maxStartTime, minFinishTime, maxFinishTime);
     List<WorkflowDBEntry> workflows = fetchWorkflows(getQualifiedPS(Statements.FW_PS, searchClause, col, sortAscending, offset, limit));
     Summary summary = fetchSummary(getQualifiedPS(Statements.FW_SUMMARY_PS, searchClause));
     DataTable table = new DataTable();
@@ -258,6 +267,28 @@ public class PostgresConnector implement
     return table;
   }
   
+  private static JobDBEntry getJobDBEntry(ResultSet rs) throws SQLException {
+    JobDBEntry j = new JobDBEntry();
+    j.setConfPath(JobFields.CONFPATH.getString(rs));
+    j.setSubmitTime(JobFields.SUBMITTIME.getLong(rs));
+    long finishTime = JobFields.FINISHTIME.getLong(rs);
+    if (finishTime > j.getSubmitTime())
+      j.setElapsedTime(finishTime - j.getSubmitTime());
+    else
+      j.setElapsedTime(0);
+    j.setInputBytes(JobFields.INPUTBYTES.getLong(rs));
+    j.setJobId(JobFields.JOBID.getString(rs));
+    j.setJobName(JobFields.JOBNAME.getString(rs));
+    j.setMaps(JobFields.MAPS.getInt(rs));
+    j.setOutputBytes(JobFields.OUTPUTBYTES.getLong(rs));
+    j.setReduces(JobFields.REDUCES.getInt(rs));
+    j.setStatus(JobFields.STATUS.getString(rs));
+    j.setUserName(JobFields.USERNAME.getString(rs));
+    j.setWorkflowEntityName(JobFields.WORKFLOWENTITYNAME.getString(rs));
+    j.setWorkflowId(JobFields.WORKFLOWID.getString(rs));
+    return j;
+  }
+  
   @Override
   public List<JobDBEntry> fetchJobDetails(String workflowId) throws IOException {
     PreparedStatement ps = getPS(Statements.FJD_PS);
@@ -267,25 +298,34 @@ public class PostgresConnector implement
       ps.setString(1, workflowId);
       rs = ps.executeQuery();
       while (rs.next()) {
-        JobDBEntry j = new JobDBEntry();
-        j.setConfPath(JobFields.CONFPATH.getString(rs));
-        j.setSubmitTime(JobFields.SUBMITTIME.getLong(rs));
-        long finishTime = JobFields.FINISHTIME.getLong(rs);
-        if (finishTime > j.getSubmitTime())
-          j.setElapsedTime(finishTime - j.getSubmitTime());
-        else
-          j.setElapsedTime(0);
-        j.setInputBytes(JobFields.INPUTBYTES.getLong(rs));
-        j.setJobId(JobFields.JOBID.getString(rs));
-        j.setJobName(JobFields.JOBNAME.getString(rs));
-        j.setMaps(JobFields.MAPS.getInt(rs));
-        j.setOutputBytes(JobFields.OUTPUTBYTES.getLong(rs));
-        j.setReduces(JobFields.REDUCES.getInt(rs));
-        j.setStatus(JobFields.STATUS.getString(rs));
-        j.setUserName(JobFields.USERNAME.getString(rs));
-        j.setWorkflowEntityName(JobFields.WORKFLOWENTITYNAME.getString(rs));
-        j.setWorkflowId(JobFields.WORKFLOWID.getString(rs));
-        jobs.add(j);
+        jobs.add(getJobDBEntry(rs));
+      }
+      rs.close();
+    } catch (SQLException e) {
+      throw new IOException(e);
+    } finally {
+      if (rs != null)
+        try {
+          rs.close();
+        } catch (SQLException e) {
+          LOG.error("Exception while closing ResultSet", e);
+        }
+      
+    }
+    return jobs;
+  }
+  
+  @Override
+  public List<JobDBEntry> fetchJobDetails(long minFinishTime, long maxStartTime) throws IOException {
+    PreparedStatement ps = getPS(Statements.FJD_TIMERANGE_PS);
+    List<JobDBEntry> jobs = new ArrayList<JobDBEntry>();
+    ResultSet rs = null;
+    try {
+      ps.setLong(1, minFinishTime);
+      ps.setLong(2, maxStartTime);
+      rs = ps.executeQuery();
+      while (rs.next()) {
+        jobs.add(getJobDBEntry(rs));
       }
       rs.close();
     } catch (SQLException e) {
@@ -332,29 +372,84 @@ public class PostgresConnector implement
     return times;
   }
   
+  private static TaskAttempt getTaskAttempt(ResultSet rs) throws SQLException {
+    TaskAttempt t = new TaskAttempt();
+    t.setFinishTime(TaskAttemptFields.FINISHTIME.getLong(rs));
+    t.setInputBytes(TaskAttemptFields.INPUTBYTES.getLong(rs));
+    t.setJobId(TaskAttemptFields.JOBID.getString(rs));
+    t.setLocality(TaskAttemptFields.LOCALITY.getString(rs));
+    t.setMapFinishTime(TaskAttemptFields.MAPFINISHTIME.getLong(rs));
+    t.setOutputBytes(TaskAttemptFields.OUTPUTBYTES.getLong(rs));
+    t.setShuffleFinishTime(TaskAttemptFields.SHUFFLEFINISHTIME.getLong(rs));
+    t.setSortFinishTime(TaskAttemptFields.SORTFINISHTIME.getLong(rs));
+    t.setStartTime(TaskAttemptFields.STARTTIME.getLong(rs));
+    t.setStatus(TaskAttemptFields.STATUS.getString(rs));
+    t.setTaskAttemptId(TaskAttemptFields.TASKATTEMPTID.getString(rs));
+    t.setTaskType(TaskAttemptFields.TASKTYPE.getString(rs));
+    return t;
+  }
+  
   @Override
-  public List<TaskAttempt> fetchTaskAttempts(String jobID, String taskType) throws IOException {
-    PreparedStatement ps = getPS(Statements.FTA_PS);
+  public List<TaskAttempt> fetchTaskAttempts(long minFinishTime, long maxStartTime) throws IOException {
+    PreparedStatement ps = getPS(Statements.FTA_TIMERANGE_PS);
+    List<TaskAttempt> taskAttempts = new ArrayList<TaskAttempt>();
+    ResultSet rs = null;
+    try {
+      ps.setLong(1, minFinishTime);
+      ps.setLong(2, maxStartTime);
+      rs = ps.executeQuery();
+      while (rs.next()) {
+        taskAttempts.add(getTaskAttempt(rs));
+      }
+      rs.close();
+    } catch (SQLException e) {
+      throw new IOException(e);
+    } finally {
+      if (rs != null)
+        try {
+          rs.close();
+        } catch (SQLException e) {
+          LOG.error("Exception while closing ResultSet", e);
+        }
+    }
+    return taskAttempts;
+  }
+  
+  @Override
+  public List<TaskAttempt> fetchJobTaskAttempts(String jobID) throws IOException {
+    PreparedStatement ps = getPS(Statements.FJTA_PS);
     List<TaskAttempt> taskAttempts = new ArrayList<TaskAttempt>();
     ResultSet rs = null;
     try {
       ps.setString(1, jobID);
-      ps.setString(2, taskType);
       rs = ps.executeQuery();
       while (rs.next()) {
-        TaskAttempt t = new TaskAttempt();
-        t.setFinishTime(TaskAttemptFields.FINISHTIME.getLong(rs));
-        t.setInputBytes(TaskAttemptFields.INPUTBYTES.getLong(rs));
-        t.setLocality(TaskAttemptFields.LOCALITY.getString(rs));
-        t.setMapFinishTime(TaskAttemptFields.MAPFINISHTIME.getLong(rs));
-        t.setOutputBytes(TaskAttemptFields.OUTPUTBYTES.getLong(rs));
-        t.setShuffleFinishTime(TaskAttemptFields.SHUFFLEFINISHTIME.getLong(rs));
-        t.setSortFinishTime(TaskAttemptFields.SORTFINISHTIME.getLong(rs));
-        t.setStartTime(TaskAttemptFields.STARTTIME.getLong(rs));
-        t.setStatus(TaskAttemptFields.STATUS.getString(rs));
-        t.setTaskAttemptId(TaskAttemptFields.TASKATTEMPTID.getString(rs));
-        t.setTaskType(TaskAttemptFields.TASKTYPE.getString(rs));
-        taskAttempts.add(t);
+        taskAttempts.add(getTaskAttempt(rs));
+      }
+      rs.close();
+    } catch (SQLException e) {
+      throw new IOException(e);
+    } finally {
+      if (rs != null)
+        try {
+          rs.close();
+        } catch (SQLException e) {
+          LOG.error("Exception while closing ResultSet", e);
+        }
+    }
+    return taskAttempts;
+  }
+  
+  @Override
+  public List<TaskAttempt> fetchWorkflowTaskAttempts(String workflowId) throws IOException {
+    PreparedStatement ps = getPS(Statements.FWTA_PS);
+    List<TaskAttempt> taskAttempts = new ArrayList<TaskAttempt>();
+    ResultSet rs = null;
+    try {
+      ps.setString(1, workflowId);
+      rs = ps.executeQuery();
+      while (rs.next()) {
+        taskAttempts.add(getTaskAttempt(rs));
       }
       rs.close();
     } catch (SQLException e) {
@@ -377,6 +472,7 @@ public class PostgresConnector implement
     synchronized (preparedStatements) {
       if (!preparedStatements.containsKey(statement)) {
         try {
+          // LOG.debug("preparing " + statement.getStatementString());
           preparedStatements.put(statement, db.prepareStatement(statement.getStatementString()));
         } catch (SQLException e) {
           throw new IOException(e);
@@ -451,7 +547,7 @@ public class PostgresConnector implement
   
   private static String buildSearchClause(String searchTerm, String searchWorkflowId, String searchWorkflowName, String searchWorkflowType,
       String searchUserName, int minJobs, int maxJobs, long minInputBytes, long maxInputBytes, long minOutputBytes, long maxOutputBytes, long minDuration,
-      long maxDuration, long minStartTime, long maxStartTime) {
+      long maxDuration, long minStartTime, long maxStartTime, long minFinishTime, long maxFinishTime) {
     StringBuilder sb = new StringBuilder();
     sb.append(WHERE);
     if (searchTerm != null && searchTerm.length() > 0) {
@@ -476,6 +572,7 @@ public class PostgresConnector implement
     addRangeSearch(sb, WorkflowFields.OUTPUTBYTES, minOutputBytes, maxOutputBytes);
     addRangeSearch(sb, WorkflowFields.DURATION, minDuration, maxDuration);
     addRangeSearch(sb, WorkflowFields.STARTTIME, minStartTime, maxStartTime);
+    addRangeSearch(sb, WorkflowFields.LASTUPDATETIME, minFinishTime, maxFinishTime);
     
     if (sb.length() == WHERE.length())
       return "";

Modified: incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/eventdb/model/Jobs.java
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/eventdb/model/Jobs.java?rev=1448447&r1=1448446&r2=1448447&view=diff
==============================================================================
--- incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/eventdb/model/Jobs.java (original)
+++ incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/eventdb/model/Jobs.java Wed Feb 20 22:34:52 2013
@@ -29,6 +29,7 @@ import java.util.List;
 @XmlRootElement
 @XmlAccessorType(XmlAccessType.FIELD)
 public class Jobs {
+  int numJobs;
   List<JobDBEntry> jobs;
   
   public static class JobDBEntry {
@@ -195,11 +196,20 @@ public class Jobs {
   
   public Jobs() {}
   
+  public int getNumJobs() {
+    return numJobs;
+  }
+  
+  public void setNumJobs(int numJobs) {
+    this.numJobs = numJobs;
+  }
+  
   public List<JobDBEntry> getJobs() {
     return jobs;
   }
   
   public void setJobs(List<JobDBEntry> jobs) {
     this.jobs = jobs;
+    this.numJobs = jobs.size();
   }
 }

Modified: incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/eventdb/model/TaskAttempt.java
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/eventdb/model/TaskAttempt.java?rev=1448447&r1=1448446&r2=1448447&view=diff
==============================================================================
--- incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/eventdb/model/TaskAttempt.java (original)
+++ incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/eventdb/model/TaskAttempt.java Wed Feb 20 22:34:52 2013
@@ -61,6 +61,7 @@ public class TaskAttempt {
   
   public static final String TASK_ATTEMPT_FIELDS = TaskAttemptFields.join();
   
+  private String jobId;
   private String taskAttemptId;
   private String taskType;
   private long startTime;
@@ -75,6 +76,14 @@ public class TaskAttempt {
   
   public TaskAttempt() {}
   
+  public String getJobId() {
+    return jobId;
+  }
+  
+  public void setJobId(String jobId) {
+    this.jobId = jobId;
+  }
+  
   public String getTaskAttemptId() {
     return taskAttemptId;
   }
@@ -134,19 +143,19 @@ public class TaskAttempt {
   public long getInputBytes() {
     return inputBytes;
   }
-
+  
   public long getOutputBytes() {
     return outputBytes;
   }
-
+  
   public void setInputBytes(long inputBytes) {
     this.inputBytes = inputBytes;
   }
-
+  
   public void setOutputBytes(long outputBytes) {
     this.outputBytes = outputBytes;
   }
-
+  
   public String getStatus() {
     return status;
   }

Modified: incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/eventdb/webservice/WorkflowJsonService.java
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/eventdb/webservice/WorkflowJsonService.java?rev=1448447&r1=1448446&r2=1448447&view=diff
==============================================================================
--- incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/eventdb/webservice/WorkflowJsonService.java (original)
+++ incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/eventdb/webservice/WorkflowJsonService.java Wed Feb 20 22:34:52 2013
@@ -112,7 +112,8 @@ public class WorkflowJsonService {
       @DefaultValue("-1") @QueryParam("minInputBytes") long minInputBytes, @DefaultValue("-1") @QueryParam("maxInputBytes") long maxInputBytes,
       @DefaultValue("-1") @QueryParam("minOutputBytes") long minOutputBytes, @DefaultValue("-1") @QueryParam("maxOutputBytes") long maxOutputBytes,
       @DefaultValue("-1") @QueryParam("minDuration") long minDuration, @DefaultValue("-1") @QueryParam("maxDuration") long maxDuration,
-      @DefaultValue("-1") @QueryParam("minStartTime") long minStartTime, @DefaultValue("-1") @QueryParam("maxStartTime") long maxStartTime) {
+      @DefaultValue("-1") @QueryParam("minStartTime") long minStartTime, @DefaultValue("-1") @QueryParam("maxStartTime") long maxStartTime,
+      @DefaultValue("-1") @QueryParam("minFinishTime") long minFinishTime, @DefaultValue("-1") @QueryParam("maxFinishTime") long maxFinishTime) {
     
     if (start < 0)
       start = 0;
@@ -152,6 +153,9 @@ public class WorkflowJsonService {
       case 8: // startTime
         field = WorkflowFields.STARTTIME;
         break;
+      case 9: // lastUpdateTime
+        field = WorkflowFields.LASTUPDATETIME;
+        break;
       default:
         field = WorkflowFields.WORKFLOWID;
     }
@@ -161,7 +165,7 @@ public class WorkflowJsonService {
     try {
       conn = getConnector();
       table = conn.fetchWorkflows(start, amount, searchTerm, echo, field, sortAscending, workflowId, workflowName, workflowType, userName, minJobs, maxJobs,
-          minInputBytes, maxInputBytes, minOutputBytes, maxOutputBytes, minDuration, maxDuration, minStartTime, maxStartTime);
+          minInputBytes, maxInputBytes, minOutputBytes, maxOutputBytes, minDuration, maxDuration, minStartTime, maxStartTime, minFinishTime, maxFinishTime);
     } catch (IOException e) {
       e.printStackTrace();
     } finally {
@@ -175,12 +179,16 @@ public class WorkflowJsonService {
   @GET
   @Produces(MediaType.APPLICATION_JSON)
   @Path("/job")
-  public Jobs getJobs(@QueryParam("workflowId") String workflowId) {
+  public Jobs getJobs(@QueryParam("workflowId") String workflowId, @DefaultValue("-1") @QueryParam("startTime") long minFinishTime,
+      @DefaultValue("-1") @QueryParam("endTime") long maxStartTime) {
     Jobs jobs = new Jobs();
     PostgresConnector conn = null;
     try {
       conn = getConnector();
-      jobs.setJobs(conn.fetchJobDetails(workflowId));
+      if (workflowId != null)
+        jobs.setJobs(conn.fetchJobDetails(workflowId));
+      else if (maxStartTime >= minFinishTime)
+        jobs.setJobs(conn.fetchJobDetails(minFinishTime, maxStartTime));
     } catch (IOException e) {
       e.printStackTrace();
       jobs.setJobs(EMPTY_JOBS);
@@ -195,20 +203,35 @@ public class WorkflowJsonService {
   @GET
   @Produces(MediaType.APPLICATION_JSON)
   @Path("/task")
-  public TaskData getTaskDetails(@QueryParam("jobId") String jobId, @QueryParam("width") int steps) {
+  public TaskData getTaskSummary(@QueryParam("jobId") String jobId, @QueryParam("width") int steps,
+      @DefaultValue("-1") @QueryParam("startTime") long minFinishTime, @DefaultValue("-1") @QueryParam("endTime") long maxStartTime) {
     TaskData points = new TaskData();
     PostgresConnector conn = null;
     try {
       conn = getConnector();
-      long[] times = conn.fetchJobStartStopTimes(jobId);
-      if (times != null) {
-        double submitTimeSecs = times[0] / 1000.0;
-        double finishTimeSecs = times[1] / 1000.0;
+      List<TaskAttempt> taskAttempts = null;
+      long startTime = -1;
+      long endTime = -1;
+      if (jobId != null) {
+        long[] times = conn.fetchJobStartStopTimes(jobId);
+        if (times != null) {
+          startTime = times[0];
+          endTime = times[1];
+          taskAttempts = conn.fetchJobTaskAttempts(jobId);
+        }
+      } else {
+        startTime = minFinishTime;
+        endTime = maxStartTime;
+        taskAttempts = conn.fetchTaskAttempts(minFinishTime, maxStartTime);
+      }
+      if (startTime > 0 && endTime > 0 && endTime >= startTime) {
+        double submitTimeSecs = startTime / 1000.0;
+        double finishTimeSecs = endTime / 1000.0;
         double step = (finishTimeSecs - submitTimeSecs) / steps;
         if (step < 1)
           step = 1;
-        getMapDetails(conn, points, jobId, submitTimeSecs, finishTimeSecs, step);
-        getReduceDetails(conn, points, jobId, submitTimeSecs, finishTimeSecs, step);
+        if (taskAttempts != null)
+          getTaskDetails(taskAttempts, points, submitTimeSecs, finishTimeSecs, step);
       }
     } catch (IOException e) {
       e.printStackTrace();
@@ -222,8 +245,31 @@ public class WorkflowJsonService {
   
   @GET
   @Produces(MediaType.APPLICATION_JSON)
+  @Path("/taskdetails")
+  public List<TaskAttempt> getTaskDetails(@QueryParam("jobId") String jobId, @QueryParam("workflowId") String workflowId) {
+    List<TaskAttempt> taskAttempts = new ArrayList<TaskAttempt>();
+    PostgresConnector conn = null;
+    try {
+      conn = getConnector();
+      if (jobId != null) {
+        taskAttempts = conn.fetchJobTaskAttempts(jobId);
+      } else if (workflowId != null) {
+        taskAttempts = conn.fetchWorkflowTaskAttempts(workflowId);
+      }
+    } catch (IOException e) {
+      e.printStackTrace();
+    } finally {
+      if (conn != null) {
+        conn.close();
+      }
+    }
+    return taskAttempts;
+  }
+  
+  @GET
+  @Produces(MediaType.APPLICATION_JSON)
   @Path("/tasklocality")
-  public TaskLocalityData getTaskLocalityDetails(@QueryParam("jobId") String jobId, @DefaultValue("4") @QueryParam("minr") int minr,
+  public TaskLocalityData getTaskLocalitySummary(@QueryParam("jobId") String jobId, @DefaultValue("4") @QueryParam("minr") int minr,
       @DefaultValue("24") @QueryParam("maxr") int maxr) {
     if (maxr < minr)
       maxr = minr;
@@ -245,38 +291,32 @@ public class WorkflowJsonService {
     return data;
   }
   
-  private static void getMapDetails(PostgresConnector conn, TaskData points, String jobId, double submitTimeSecs, double finishTimeSecs, double step)
+  private static void getTaskDetails(List<TaskAttempt> taskAttempts, TaskData points, double submitTimeSecs, double finishTimeSecs, double step)
       throws IOException {
-    List<TaskAttempt> taskAttempts = conn.fetchTaskAttempts(jobId, "MAP");
     List<Point> mapPoints = new ArrayList<Point>();
-    for (double time = submitTimeSecs; time < finishTimeSecs; time += step) {
-      int numTasks = 0;
-      for (TaskAttempt taskAttempt : taskAttempts)
-        if ((taskAttempt.getStartTime() / 1000.0) <= (time + step) && (taskAttempt.getFinishTime() / 1000.0) >= time)
-          numTasks++;
-      mapPoints.add(new Point(Math.round(time), numTasks));
-    }
-    points.setMapData(mapPoints);
-  }
-  
-  private static void getReduceDetails(PostgresConnector conn, TaskData points, String jobId, double submitTimeSecs, double finishTimeSecs, double step)
-      throws IOException {
-    List<TaskAttempt> taskAttempts = conn.fetchTaskAttempts(jobId, "REDUCE");
     List<Point> shufflePoints = new ArrayList<Point>();
     List<Point> reducePoints = new ArrayList<Point>();
     for (double time = submitTimeSecs; time < finishTimeSecs; time += step) {
+      int numTasks = 0;
       int numShuffleTasks = 0;
       int numReduceTasks = 0;
       for (TaskAttempt taskAttempt : taskAttempts) {
-        if ((taskAttempt.getStartTime() / 1000.0) <= (time + step) && (taskAttempt.getShuffleFinishTime() / 1000.0) >= time) {
-          numShuffleTasks++;
-        } else if ((taskAttempt.getShuffleFinishTime() / 1000.0) < (time + step) && (taskAttempt.getFinishTime() / 1000.0) >= time) {
-          numReduceTasks++;
+        if (taskAttempt.getTaskType().equals("MAP")) {
+          if ((taskAttempt.getStartTime() / 1000.0) <= (time + step) && (taskAttempt.getFinishTime() / 1000.0) >= time)
+            numTasks++;
+        } else if (taskAttempt.getTaskType().equals("REDUCE")) {
+          if ((taskAttempt.getStartTime() / 1000.0) <= (time + step) && (taskAttempt.getShuffleFinishTime() / 1000.0) >= time) {
+            numShuffleTasks++;
+          } else if ((taskAttempt.getShuffleFinishTime() / 1000.0) < (time + step) && (taskAttempt.getFinishTime() / 1000.0) >= time) {
+            numReduceTasks++;
+          }
         }
       }
+      mapPoints.add(new Point(Math.round(time), numTasks));
       shufflePoints.add(new Point(Math.round(time), numShuffleTasks));
       reducePoints.add(new Point(Math.round(time), numReduceTasks));
     }
+    points.setMapData(mapPoints);
     points.setShuffleData(shufflePoints);
     points.setReduceData(reducePoints);
   }
@@ -285,15 +325,14 @@ public class WorkflowJsonService {
       int maxr) throws IOException {
     long submitTimeX = transformX(submitTime);
     long finishTimeX = transformX(finishTime);
-    List<TaskAttempt> mapAttempts = conn.fetchTaskAttempts(jobId, "MAP");
-    List<TaskAttempt> reduceAttempts = conn.fetchTaskAttempts(jobId, "REDUCE");
-    Set<Long> xPoints = getXPoints(mapAttempts, reduceAttempts, submitTimeX, finishTimeX);
+    List<TaskAttempt> taskAttempts = conn.fetchJobTaskAttempts(jobId);
+    Set<Long> xPoints = getXPoints(taskAttempts, submitTimeX, finishTimeX);
     Long[] xList = xPoints.toArray(new Long[xPoints.size()]);
     MinMax io = new MinMax();
-    data.setMapNodeLocal(processLocalityData(mapAttempts, "NODE_LOCAL", xList, io));
-    data.setMapRackLocal(processLocalityData(mapAttempts, "RACK_LOCAL", xList, io));
-    data.setMapOffSwitch(processLocalityData(mapAttempts, "OFF_SWITCH", xList, io));
-    data.setReduceOffSwitch(processLocalityData(reduceAttempts, "OFF_SWITCH", xList, io));
+    data.setMapNodeLocal(processLocalityData(taskAttempts, "MAP", "NODE_LOCAL", xList, io));
+    data.setMapRackLocal(processLocalityData(taskAttempts, "MAP", "RACK_LOCAL", xList, io));
+    data.setMapOffSwitch(processLocalityData(taskAttempts, "MAP", "OFF_SWITCH", xList, io));
+    data.setReduceOffSwitch(processLocalityData(taskAttempts, "REDUCE", "OFF_SWITCH", xList, io));
     setRValues(data.getMapNodeLocal(), minr, maxr, io.max);
     setRValues(data.getMapRackLocal(), minr, maxr, io.max);
     setRValues(data.getMapOffSwitch(), minr, maxr, io.max);
@@ -319,7 +358,7 @@ public class WorkflowJsonService {
     return time;
   }
   
-  private static Set<Long> getXPoints(List<TaskAttempt> mapAttempts, List<TaskAttempt> reduceAttempts, long submitTimeX, long finishTimeX) {
+  private static Set<Long> getXPoints(List<TaskAttempt> taskAttempts, long submitTimeX, long finishTimeX) {
     TreeSet<Long> xPoints = new TreeSet<Long>();
     TreeSet<TaskAttempt> sortedAttempts = new TreeSet<TaskAttempt>(new Comparator<TaskAttempt>() {
       @Override
@@ -331,8 +370,7 @@ public class WorkflowJsonService {
         return t1.getTaskAttemptId().compareTo(t2.getTaskAttemptId());
       }
     });
-    sortedAttempts.addAll(mapAttempts);
-    sortedAttempts.addAll(reduceAttempts);
+    sortedAttempts.addAll(taskAttempts);
     getXPoints(sortedAttempts, xPoints);
     xPoints.add(submitTimeX);
     xPoints.add(finishTimeX);
@@ -362,11 +400,11 @@ public class WorkflowJsonService {
     return index;
   }
   
-  private static List<DataPoint> processLocalityData(List<TaskAttempt> taskAttempts, String locality, Long[] xPoints, MinMax io) {
+  private static List<DataPoint> processLocalityData(List<TaskAttempt> taskAttempts, String taskType, String locality, Long[] xPoints, MinMax io) {
     List<DataPoint> data = new ArrayList<DataPoint>();
     int i = 0;
     for (TaskAttempt taskAttempt : taskAttempts) {
-      if (locality.equals(taskAttempt.getLocality())) {
+      if (taskType.equals(taskAttempt.getTaskType()) && locality.equals(taskAttempt.getLocality())) {
         DataPoint point = new DataPoint();
         point.setX(transformX(taskAttempt.getStartTime()));
         point.setY(transformY(taskAttempt.getFinishTime() - taskAttempt.getStartTime()));

Modified: incubator/ambari/trunk/ambari-web/vendor/scripts/workflow_visualization.js
URL: http://svn.apache.org/viewvc/incubator/ambari/trunk/ambari-web/vendor/scripts/workflow_visualization.js?rev=1448447&r1=1448446&r2=1448447&view=diff
==============================================================================
--- incubator/ambari/trunk/ambari-web/vendor/scripts/workflow_visualization.js (original)
+++ incubator/ambari/trunk/ambari-web/vendor/scripts/workflow_visualization.js Wed Feb 20 22:34:52 2013
@@ -85,15 +85,14 @@ DagViewer.prototype._addLink = function 
 //                 nodeHeight = 15, labelFontSize = 10, maxLabelWidth = 120
 //                 nodeHeight = 40, labelFontSize = 20, maxLabelWidth = 260
 //                 nodeHeight = 30, labelFontSize = 16
-DagViewer.prototype.drawDag = function (svgw, svgh, nodeHeight, labelFontSize, maxLabelWidth, axisPadding, svgPadding) {
-  this._addTimelineGraph(svgw, svgh, nodeHeight || 20, labelFontSize || 14, maxLabelWidth || 180, axisPadding || 30, svgPadding || 20);
+DagViewer.prototype.drawDag = function (svgw, svgh, nodeHeight, labelFontSize, maxLabelWidth, axisPadding) {
+  this._addTimelineGraph(svgw, svgh, nodeHeight || 20, labelFontSize || 14, maxLabelWidth || 180, axisPadding || 30);
   return this;
 }
 
 // draw timeline graph
-DagViewer.prototype._addTimelineGraph = function (svgw, svgh, nodeHeight, labelFontSize, maxLabelWidth, axisPadding, svgPadding) {
-  // want to avoid having unnecessary scrollbars, so we need to size the div slightly larger than the svg
-  svgw = svgw - svgPadding;
+DagViewer.prototype._addTimelineGraph = function (svgw, svgh, nodeHeight, labelFontSize, maxLabelWidth, axisPadding) {
+  svgw = svgw;
 
   var margin = {"top":10, "bottom":10, "left":30, "right":30};
   var w = svgw - margin.left - margin.right;
@@ -157,15 +156,26 @@ DagViewer.prototype._addTimelineGraph = 
   }
 
   var h = 2*axisPadding + 2*nodeHeight*(maxIndex+1);
-  d3.select("div#" + this._id)
-    .attr("style","width:"+(svgw+svgPadding)+"px;height:"+Math.min(svgh,h+margin.top+margin.bottom+svgPadding)+"px;overflow:auto;padding:none;");
-  svgh = h + margin.top + margin.bottom;
+  var realh = svgh - margin.top - margin.bottom;
+  var scale = 1;
+  if (h > realh)
+    scale = realh / h;
+  svgh = Math.min(svgh, h + margin.top + margin.bottom);
   var svg = d3.select("div#" + this._id).append("svg:svg")
     .attr("width", svgw+"px")
     .attr("height", svgh+"px");
+    
   var svgg = svg.append("g")
-    .attr("transform", "translate("+margin.left+","+margin.top+")");
-  
+    .attr("transform", "translate("+margin.left+","+margin.top+") scale("+scale+")");
+  // add an untranslated white rectangle below everything
+  // so mouse doesn't have to be over nodes for panning/zooming
+  svgg.append("svg:rect")
+    .attr("x", 0)
+    .attr("y", 0)
+    .attr("width", svgw)
+    .attr("height", svgh/scale)
+    .attr("style", "fill:white;stroke:none");
+ 
   // create axes
   var x = d3.time.scale()
     .domain([0, elapsedTime])
@@ -196,18 +206,17 @@ DagViewer.prototype._addTimelineGraph = 
     return weeks + "w " + (x==0 ? "" : " " + x + "d");
   };
   var topAxis = d3.svg.axis()
-    .scale(x)
-    .orient("bottom")
-    .tickFormat(tickFormatter);
+    .scale(d3.time.scale().domain([startTime, startTime+elapsedTime]).range([0, w]))
+    .orient("bottom");
   var bottomAxis = d3.svg.axis()
     .scale(x)
     .orient("top")
     .tickFormat(tickFormatter);
   svgg.append("g")
-    .attr("class", "x axis")
+    .attr("class", "x axis top")
     .call(topAxis);
   svgg.append("g")
-    .attr("class", "x axis")
+    .attr("class", "x axis bottom")
     .call(bottomAxis)
     .attr("transform", "translate(0,"+h+")");
   
@@ -313,7 +322,8 @@ DagViewer.prototype._addTimelineGraph = 
     .attr("x", function(d) {
       var goal = d.x + d.w/2;
       var halfLabel = maxLabelWidth/2;
-      if (goal < halfLabel) return halfLabel;      else if (goal > w-halfLabel) return w-halfLabel;
+      if (goal < halfLabel) return halfLabel;
+      else if (goal > w-halfLabel) return w-halfLabel;
       return goal;
     } )
     .attr("y", function(d) { return d.y + d.h + labelFontSize; } )
@@ -338,4 +348,10 @@ DagViewer.prototype._addTimelineGraph = 
     .text(function (d) {
       return d.name;
     });
+
+  svg.call(d3.behavior.zoom().on("zoom", function() {
+    var left = Math.min(Math.max(d3.event.translate[0]+margin.left, margin.left-w*d3.event.scale*scale), margin.left+w);
+    var top = Math.min(Math.max(d3.event.translate[1]+margin.top, margin.top-h*d3.event.scale*scale), margin.top+h);
+    svgg.attr("transform", "translate("+left+","+top+") scale("+(d3.event.scale*scale)+")");
+  }));
 }