You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by bi...@apache.org on 2013/08/30 22:35:15 UTC
git commit: AMBARI-2759. ambari-log4j doesn't work with MySQL. (Chen
Chun via billie)
Updated Branches:
refs/heads/trunk 8f5b66286 -> 44cc11dfb
AMBARI-2759. ambari-log4j doesn't work with MySQL. (Chen Chun via billie)
Project: http://git-wip-us.apache.org/repos/asf/incubator-ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ambari/commit/44cc11df
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ambari/tree/44cc11df
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ambari/diff/44cc11df
Branch: refs/heads/trunk
Commit: 44cc11dfb38b5dd3ce770fcd3a632c26ae37a596
Parents: 8f5b662
Author: Billie Rinaldi <bi...@gmail.com>
Authored: Fri Aug 30 13:34:01 2013 -0700
Committer: Billie Rinaldi <bi...@gmail.com>
Committed: Fri Aug 30 13:34:01 2013 -0700
----------------------------------------------------------------------
.../ambari/eventdb/db/PostgresConnector.java | 5 ++-
.../ambari/eventdb/model/TaskAttempt.java | 7 ++++
.../jobhistory/MapReduceJobHistoryUpdater.java | 40 ++++++++++++--------
3 files changed, 34 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ambari/blob/44cc11df/ambari-server/src/main/java/org/apache/ambari/eventdb/db/PostgresConnector.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/eventdb/db/PostgresConnector.java b/ambari-server/src/main/java/org/apache/ambari/eventdb/db/PostgresConnector.java
index 6b0d36c..e96fe92 100644
--- a/ambari-server/src/main/java/org/apache/ambari/eventdb/db/PostgresConnector.java
+++ b/ambari-server/src/main/java/org/apache/ambari/eventdb/db/PostgresConnector.java
@@ -81,8 +81,9 @@ public class PostgresConnector implements DBConnector {
FJSS_PS("SELECT " + JobFields.SUBMITTIME + ", " + JobFields.FINISHTIME + " FROM " + JOB_TABLE_NAME + " WHERE " + JobFields.JOBID + " = ?"),
FJTA_PS("SELECT " + TaskAttempt.TASK_ATTEMPT_FIELDS + " FROM " + TASK_ATTEMPT_TABLE_NAME + " WHERE " + TaskAttemptFields.JOBID + " = ? ORDER BY "
+ TaskAttemptFields.STARTTIME),
- FWTA_PS("SELECT " + TaskAttempt.TASK_ATTEMPT_FIELDS + " FROM " + TASK_ATTEMPT_TABLE_NAME + ", (SELECT " + JobFields.JOBID + " as id FROM " + JOB_TABLE_NAME
- + " WHERE " + JobFields.WORKFLOWID + " = ?) AS jobs WHERE " + TASK_ATTEMPT_TABLE_NAME + "." + TaskAttemptFields.JOBID + " = jobs.id "
+ FWTA_PS("SELECT " + TaskAttemptFields.join(TASK_ATTEMPT_TABLE_NAME) + " FROM " + TASK_ATTEMPT_TABLE_NAME + ", " + JOB_TABLE_NAME + " WHERE "
+ + TASK_ATTEMPT_TABLE_NAME + "." + TaskAttemptFields.JOBID + " = " + JOB_TABLE_NAME + "." + JobFields.JOBID + " AND " + JOB_TABLE_NAME + "."
+ + JobFields.WORKFLOWID + " = ?"
+ " ORDER BY " + TaskAttemptFields.JOBID + "," + TaskAttemptFields.STARTTIME + ", " + TaskAttemptFields.FINISHTIME),
FTA_TIMERANGE_PS("SELECT " + TaskAttempt.TASK_ATTEMPT_FIELDS + " FROM " + TASK_ATTEMPT_TABLE_NAME + " WHERE " + TaskAttemptFields.FINISHTIME + " >= ? AND "
+ TaskAttemptFields.STARTTIME + " <= ? AND (" + TaskAttemptFields.TASKTYPE + " = 'MAP' OR " + TaskAttemptFields.TASKTYPE + " = 'REDUCE') ORDER BY "
http://git-wip-us.apache.org/repos/asf/incubator-ambari/blob/44cc11df/ambari-server/src/main/java/org/apache/ambari/eventdb/model/TaskAttempt.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/eventdb/model/TaskAttempt.java b/ambari-server/src/main/java/org/apache/ambari/eventdb/model/TaskAttempt.java
index 0b0914c..526d63d 100644
--- a/ambari-server/src/main/java/org/apache/ambari/eventdb/model/TaskAttempt.java
+++ b/ambari-server/src/main/java/org/apache/ambari/eventdb/model/TaskAttempt.java
@@ -57,6 +57,13 @@ public class TaskAttempt {
tmp[i] = TaskAttemptFields.values()[i].toString();
return StringUtils.join(tmp, ",");
}
+
+ public static String join(String tableName) {
+ String[] tmp = new String[TaskAttemptFields.values().length];
+ for (int i = 0; i < tmp.length; i++)
+ tmp[i] = tableName + "." + TaskAttemptFields.values()[i].toString();
+ return StringUtils.join(tmp, ",");
+ }
}
public static final String TASK_ATTEMPT_FIELDS = TaskAttemptFields.join();
http://git-wip-us.apache.org/repos/asf/incubator-ambari/blob/44cc11df/contrib/ambari-log4j/src/main/java/org/apache/ambari/log4j/hadoop/mapreduce/jobhistory/MapReduceJobHistoryUpdater.java
----------------------------------------------------------------------
diff --git a/contrib/ambari-log4j/src/main/java/org/apache/ambari/log4j/hadoop/mapreduce/jobhistory/MapReduceJobHistoryUpdater.java b/contrib/ambari-log4j/src/main/java/org/apache/ambari/log4j/hadoop/mapreduce/jobhistory/MapReduceJobHistoryUpdater.java
index aec5415..d62494b 100644
--- a/contrib/ambari-log4j/src/main/java/org/apache/ambari/log4j/hadoop/mapreduce/jobhistory/MapReduceJobHistoryUpdater.java
+++ b/contrib/ambari-log4j/src/main/java/org/apache/ambari/log4j/hadoop/mapreduce/jobhistory/MapReduceJobHistoryUpdater.java
@@ -162,9 +162,7 @@ public class MapReduceJobHistoryUpdater implements LogStoreUpdateProvider {
"workflowContext = ?, " +
"numJobsTotal = ?, " +
"lastUpdateTime = ?, " +
- "duration = ? - (SELECT startTime FROM " +
- WORKFLOW_TABLE +
- " WHERE workflowId = ?) " +
+ "duration = ? - startTime " +
"WHERE workflowId = ?"
);
@@ -174,20 +172,31 @@ public class MapReduceJobHistoryUpdater implements LogStoreUpdateProvider {
WORKFLOW_TABLE +
" SET " +
"lastUpdateTime = ?, " +
- "duration = ? - (SELECT startTime FROM " +
- WORKFLOW_TABLE +
- " WHERE workflowId = selectid), " +
- "numJobsCompleted = rows, " +
- "inputBytes = input, " +
- "outputBytes = output " +
- "FROM (SELECT count(*) as rows, sum(inputBytes) as input, " +
- "sum(outputBytes) as output, workflowId as selectid FROM " +
- JOB_TABLE +
+ "duration = ? - startTime, " +
+ "numJobsCompleted = (" +
+ "SELECT count(*)" +
+ " FROM " +
+ JOB_TABLE +
+ " WHERE " +
+ "workflowId = " + WORKFLOW_TABLE + ".workflowId" +
+ " AND status = 'SUCCESS'), " +
+ "inputBytes = (" +
+ "SELECT sum(inputBytes)" +
+ " FROM " +
+ JOB_TABLE +
+ " WHERE " +
+ "workflowId = " + WORKFLOW_TABLE + ".workflowId" +
+ " AND status = 'SUCCESS'), " +
+ "outputBytes = (" +
+ "SELECT sum(outputBytes)" +
+ " FROM " +
+ JOB_TABLE +
+ " WHERE " +
+ "workflowId = " + WORKFLOW_TABLE + ".workflowId" +
+ " AND status = 'SUCCESS') " +
" WHERE workflowId = (SELECT workflowId FROM " +
JOB_TABLE +
- " WHERE jobId = ?) AND status = 'SUCCESS' " +
- "GROUP BY workflowId) as jobsummary " +
- "WHERE workflowId = selectid"
+ " WHERE jobId = ?)"
);
// JobFinishedEvent
@@ -726,7 +735,6 @@ public class MapReduceJobHistoryUpdater implements LogStoreUpdateProvider {
workflowUpdateTimePS.setLong(3, historyEvent.getSubmitTime());
workflowUpdateTimePS.setLong(4, historyEvent.getSubmitTime());
workflowUpdateTimePS.setString(5, workflowContext.getWorkflowId());
- workflowUpdateTimePS.setString(6, workflowContext.getWorkflowId());
workflowUpdateTimePS.executeUpdate();
LOG.debug("Successfully updated workflowId = " +
workflowContext.getWorkflowId());