You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by bi...@apache.org on 2013/08/30 22:35:15 UTC

git commit: AMBARI-2759. ambari-log4j doesn't work with MySQL. (Chen Chun via billie)

Updated Branches:
  refs/heads/trunk 8f5b66286 -> 44cc11dfb


AMBARI-2759. ambari-log4j doesn't work with MySQL. (Chen Chun via billie)


Project: http://git-wip-us.apache.org/repos/asf/incubator-ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ambari/commit/44cc11df
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ambari/tree/44cc11df
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ambari/diff/44cc11df

Branch: refs/heads/trunk
Commit: 44cc11dfb38b5dd3ce770fcd3a632c26ae37a596
Parents: 8f5b662
Author: Billie Rinaldi <bi...@gmail.com>
Authored: Fri Aug 30 13:34:01 2013 -0700
Committer: Billie Rinaldi <bi...@gmail.com>
Committed: Fri Aug 30 13:34:01 2013 -0700

----------------------------------------------------------------------
 .../ambari/eventdb/db/PostgresConnector.java    |  5 ++-
 .../ambari/eventdb/model/TaskAttempt.java       |  7 ++++
 .../jobhistory/MapReduceJobHistoryUpdater.java  | 40 ++++++++++++--------
 3 files changed, 34 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ambari/blob/44cc11df/ambari-server/src/main/java/org/apache/ambari/eventdb/db/PostgresConnector.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/eventdb/db/PostgresConnector.java b/ambari-server/src/main/java/org/apache/ambari/eventdb/db/PostgresConnector.java
index 6b0d36c..e96fe92 100644
--- a/ambari-server/src/main/java/org/apache/ambari/eventdb/db/PostgresConnector.java
+++ b/ambari-server/src/main/java/org/apache/ambari/eventdb/db/PostgresConnector.java
@@ -81,8 +81,9 @@ public class PostgresConnector implements DBConnector {
     FJSS_PS("SELECT " + JobFields.SUBMITTIME + ", " + JobFields.FINISHTIME + " FROM " + JOB_TABLE_NAME + " WHERE " + JobFields.JOBID + " = ?"),
     FJTA_PS("SELECT " + TaskAttempt.TASK_ATTEMPT_FIELDS + " FROM " + TASK_ATTEMPT_TABLE_NAME + " WHERE " + TaskAttemptFields.JOBID + " = ? ORDER BY "
         + TaskAttemptFields.STARTTIME),
-    FWTA_PS("SELECT " + TaskAttempt.TASK_ATTEMPT_FIELDS + " FROM " + TASK_ATTEMPT_TABLE_NAME + ", (SELECT " + JobFields.JOBID + " as id FROM " + JOB_TABLE_NAME
-        + " WHERE " + JobFields.WORKFLOWID + " = ?) AS jobs WHERE " + TASK_ATTEMPT_TABLE_NAME + "." + TaskAttemptFields.JOBID + " = jobs.id "
+    FWTA_PS("SELECT " + TaskAttemptFields.join(TASK_ATTEMPT_TABLE_NAME) + " FROM " + TASK_ATTEMPT_TABLE_NAME + ", " + JOB_TABLE_NAME + " WHERE "
+        + TASK_ATTEMPT_TABLE_NAME + "." + TaskAttemptFields.JOBID + " = " + JOB_TABLE_NAME + "." + JobFields.JOBID + " AND " + JOB_TABLE_NAME + "."
+        + JobFields.WORKFLOWID + " = ?"
         + " ORDER BY " + TaskAttemptFields.JOBID + "," + TaskAttemptFields.STARTTIME + ", " + TaskAttemptFields.FINISHTIME),
     FTA_TIMERANGE_PS("SELECT " + TaskAttempt.TASK_ATTEMPT_FIELDS + " FROM " + TASK_ATTEMPT_TABLE_NAME + " WHERE " + TaskAttemptFields.FINISHTIME + " >= ? AND "
         + TaskAttemptFields.STARTTIME + " <= ? AND (" + TaskAttemptFields.TASKTYPE + " = 'MAP' OR  " + TaskAttemptFields.TASKTYPE + " = 'REDUCE') ORDER BY "

http://git-wip-us.apache.org/repos/asf/incubator-ambari/blob/44cc11df/ambari-server/src/main/java/org/apache/ambari/eventdb/model/TaskAttempt.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/eventdb/model/TaskAttempt.java b/ambari-server/src/main/java/org/apache/ambari/eventdb/model/TaskAttempt.java
index 0b0914c..526d63d 100644
--- a/ambari-server/src/main/java/org/apache/ambari/eventdb/model/TaskAttempt.java
+++ b/ambari-server/src/main/java/org/apache/ambari/eventdb/model/TaskAttempt.java
@@ -57,6 +57,13 @@ public class TaskAttempt {
         tmp[i] = TaskAttemptFields.values()[i].toString();
       return StringUtils.join(tmp, ",");
     }
+
+    public static String join(String tableName) {
+      String[] tmp = new String[TaskAttemptFields.values().length];
+      for (int i = 0; i < tmp.length; i++)
+        tmp[i] = tableName + "." + TaskAttemptFields.values()[i].toString();
+      return StringUtils.join(tmp, ",");
+    }
   }
   
   public static final String TASK_ATTEMPT_FIELDS = TaskAttemptFields.join();

http://git-wip-us.apache.org/repos/asf/incubator-ambari/blob/44cc11df/contrib/ambari-log4j/src/main/java/org/apache/ambari/log4j/hadoop/mapreduce/jobhistory/MapReduceJobHistoryUpdater.java
----------------------------------------------------------------------
diff --git a/contrib/ambari-log4j/src/main/java/org/apache/ambari/log4j/hadoop/mapreduce/jobhistory/MapReduceJobHistoryUpdater.java b/contrib/ambari-log4j/src/main/java/org/apache/ambari/log4j/hadoop/mapreduce/jobhistory/MapReduceJobHistoryUpdater.java
index aec5415..d62494b 100644
--- a/contrib/ambari-log4j/src/main/java/org/apache/ambari/log4j/hadoop/mapreduce/jobhistory/MapReduceJobHistoryUpdater.java
+++ b/contrib/ambari-log4j/src/main/java/org/apache/ambari/log4j/hadoop/mapreduce/jobhistory/MapReduceJobHistoryUpdater.java
@@ -162,9 +162,7 @@ public class MapReduceJobHistoryUpdater implements LogStoreUpdateProvider {
                 "workflowContext = ?, " +
                 "numJobsTotal = ?, " +
                 "lastUpdateTime = ?, " +
-                "duration = ? - (SELECT startTime FROM " +
-                WORKFLOW_TABLE +
-                " WHERE workflowId = ?) " +
+                "duration = ? - startTime " +
                 "WHERE workflowId = ?"
             );
     
@@ -174,20 +172,31 @@ public class MapReduceJobHistoryUpdater implements LogStoreUpdateProvider {
                 WORKFLOW_TABLE +
                 " SET " +
                 "lastUpdateTime = ?, " +
-                "duration = ? - (SELECT startTime FROM " +
-                WORKFLOW_TABLE +
-                " WHERE workflowId = selectid), " +
-                "numJobsCompleted = rows, " +
-                "inputBytes = input, " +
-                "outputBytes = output " +
-            "FROM (SELECT count(*) as rows, sum(inputBytes) as input, " +
-                "sum(outputBytes) as output, workflowId as selectid FROM " +
-                JOB_TABLE +
+                "duration = ? - startTime, " +
+                "numJobsCompleted = (" +
+                  "SELECT count(*)" +
+                  " FROM " +
+                  JOB_TABLE +
+                  " WHERE " +
+                  "workflowId = " + WORKFLOW_TABLE + ".workflowId" +
+                  " AND status = 'SUCCESS'), " +
+                "inputBytes = (" +
+                  "SELECT sum(inputBytes)" +
+                  " FROM " +
+                  JOB_TABLE +
+                  " WHERE " +
+                  "workflowId = " + WORKFLOW_TABLE + ".workflowId" +
+                  " AND status = 'SUCCESS'), " +
+                "outputBytes = (" +
+                  "SELECT sum(outputBytes)" +
+                  " FROM " +
+                  JOB_TABLE +
+                  " WHERE " +
+                  "workflowId = " + WORKFLOW_TABLE + ".workflowId" +
+                  " AND status = 'SUCCESS') " +
                 " WHERE workflowId = (SELECT workflowId FROM " +
                 JOB_TABLE +
-                " WHERE jobId = ?) AND status = 'SUCCESS' " +
-                "GROUP BY workflowId) as jobsummary " +
-            "WHERE workflowId = selectid"
+                " WHERE jobId = ?)"
             );
     
     // JobFinishedEvent
@@ -726,7 +735,6 @@ public class MapReduceJobHistoryUpdater implements LogStoreUpdateProvider {
         workflowUpdateTimePS.setLong(3, historyEvent.getSubmitTime());
         workflowUpdateTimePS.setLong(4, historyEvent.getSubmitTime());
         workflowUpdateTimePS.setString(5, workflowContext.getWorkflowId());
-        workflowUpdateTimePS.setString(6, workflowContext.getWorkflowId());
         workflowUpdateTimePS.executeUpdate();
         LOG.debug("Successfully updated workflowId = " + 
             workflowContext.getWorkflowId());