You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by ni...@apache.org on 2016/12/30 08:10:12 UTC
ambari git commit: AMBARI-19062 : in hive view directly calling DB
and removed ATS calls when fetching history data. (nitirajrathore)
Repository: ambari
Updated Branches:
refs/heads/branch-2.5 86b86d458 -> 4149aaa5e
AMBARI-19062 : in hive view directly calling DB and removed ATS calls when fetching history data. (nitirajrathore)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/4149aaa5
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/4149aaa5
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/4149aaa5
Branch: refs/heads/branch-2.5
Commit: 4149aaa5e03d519256bd0735b6498cb846c77a47
Parents: 86b86d4
Author: Nitiraj Rathore <ni...@gmail.com>
Authored: Fri Dec 30 13:38:41 2016 +0530
Committer: Nitiraj Rathore <ni...@gmail.com>
Committed: Fri Dec 30 13:39:26 2016 +0530
----------------------------------------------------------------------
.../view/hive2/resources/jobs/Aggregator.java | 137 ++++++++++---------
1 file changed, 73 insertions(+), 64 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ambari/blob/4149aaa5/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/resources/jobs/Aggregator.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/resources/jobs/Aggregator.java b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/resources/jobs/Aggregator.java
index 99faeca..d399c47 100644
--- a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/resources/jobs/Aggregator.java
+++ b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/resources/jobs/Aggregator.java
@@ -19,11 +19,15 @@
package org.apache.ambari.view.hive2.resources.jobs;
import akka.actor.ActorRef;
+import com.google.common.base.Function;
+import com.google.common.base.Joiner;
+import com.google.common.base.Predicate;
+import com.google.common.base.Strings;
+import com.google.common.collect.FluentIterable;
import org.apache.ambari.view.hive2.actor.message.job.SaveDagInformation;
import org.apache.ambari.view.hive2.persistence.utils.FilteringStrategy;
import org.apache.ambari.view.hive2.persistence.utils.Indexed;
import org.apache.ambari.view.hive2.persistence.utils.ItemNotFound;
-import org.apache.ambari.view.hive2.persistence.utils.OnlyOwnersFilteringStrategy;
import org.apache.ambari.view.hive2.resources.IResourceManager;
import org.apache.ambari.view.hive2.resources.files.FileService;
import org.apache.ambari.view.hive2.resources.jobs.atsJobs.HiveQueryId;
@@ -36,8 +40,8 @@ import org.apache.commons.beanutils.PropertyUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.annotation.Nullable;
import java.lang.reflect.InvocationTargetException;
-import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
@@ -84,13 +88,9 @@ public class Aggregator {
* @param endTime: exclusive, time in secs from epoch
* @return: list of jobs
*/
- public List<Job> readAllForUserByTime(String username, long startTime, long endTime) {
- List<HiveQueryId> queryIdList = ats.getHiveQueryIdsForUserByTime(username, startTime, endTime);
- List<Job> allJobs = fetchDagsAndMergeJobs(queryIdList);
- List<Job> dbOnlyJobs = readDBOnlyJobs(username, queryIdList, startTime, endTime);
- allJobs.addAll(dbOnlyJobs);
-
- return allJobs;
+ public List<Job> readAllForUserByTime(String username, Long startTime, Long endTime) {
+ List<Job> jobs = readDBJobs(username, startTime, endTime);
+ return jobs;
}
/**
@@ -98,25 +98,22 @@ public class Aggregator {
* @param jobInfos: infos of job to get
* @return: list of updated Job
*/
- public List<Job> readJobsByIds(List<JobInfo> jobInfos) {
- //categorize jobs
- List<String> jobsWithHiveIds = new LinkedList<>();
- List<String> dbOnlyJobs = new LinkedList<>();
-
- for (JobInfo jobInfo : jobInfos) {
- if (null == jobInfo.getHiveId() || jobInfo.getHiveId().trim().isEmpty()) {
- dbOnlyJobs.add(jobInfo.getJobId());
- } else {
- jobsWithHiveIds.add(jobInfo.getHiveId());
+ public List<Job> readJobsByIds(final List<JobInfo> jobInfos) {
+ List<String> jobIds = FluentIterable.from(jobInfos).filter(new Predicate<JobInfo>() {
+ @Override
+ public boolean apply(@Nullable JobInfo input) {
+ return !Strings.isNullOrEmpty(input.getJobId());
}
- }
-
- List<HiveQueryId> queryIdList = ats.getHiveQueryIdByEntityList(jobsWithHiveIds);
- List<Job> allJobs = fetchDagsAndMergeJobs(queryIdList);
- List<Job> dbJobs = readJobsFromDbByJobId(dbOnlyJobs);
-
- allJobs.addAll(dbJobs);
- return allJobs;
+ }).transform(new Function<JobInfo, String>() {
+ @Nullable
+ @Override
+ public String apply(@Nullable JobInfo input) {
+ return input.getJobId();
+ }
+ }).toList();
+ List<Job> dbJobs = readJobsFromDbByJobId(jobIds);
+ LOG.debug("readJobsByIds: dbJobs : {}", dbJobs);
+ return dbJobs;
}
/**
@@ -124,17 +121,24 @@ public class Aggregator {
* @param jobsIds: list of ids of jobs
* @return: list of all the jobs found
*/
- private List<Job> readJobsFromDbByJobId(List<String> jobsIds) {
- List<Job> jobs = new LinkedList<>();
- for (final String jid : jobsIds) {
- try {
- Job job = getJobFromDbByJobId(jid);
- jobs.add(job);
- } catch (ItemNotFound itemNotFound) {
- LOG.error("Error while finding job with id : {}", jid, itemNotFound);
+ private List<Job> readJobsFromDbByJobId(final List<String> jobsIds) {
+ LOG.info("Reading jobs from db with ids : {} ", jobsIds);
+ List<Job> jobs = viewJobResourceManager.readAll(new FilteringStrategy() {
+ @Override
+ public boolean isConform(Indexed item) {
+ JobImpl job = (JobImpl) item;
+ return jobsIds.contains(job.getId());
}
- }
+ @Override
+ public String whereStatement() {
+ String query = " id in ( " + Joiner.on(",").join(jobsIds) + " ) ";
+ LOG.debug("where clause for jobsIds : {}", query);
+ return query;
+ }
+ });
+
+ LOG.debug("jobs returned from DB : {}" , jobs);
return jobs;
}
@@ -172,44 +176,49 @@ public class Aggregator {
* @return
*/
public List<Job> readAll(String username) {
- List<HiveQueryId> queries = ats.getHiveQueryIdsForUser(username);
- LOG.debug("HiveQueryIds fetched : {}", queries);
- List<Job> allJobs = fetchDagsAndMergeJobs(queries);
- List<Job> dbOnlyJobs = readDBOnlyJobs(username, queries, null, null);
- LOG.debug("Jobs only present in DB: {}", dbOnlyJobs);
- allJobs.addAll(dbOnlyJobs);
- return allJobs;
+ return readAllForUserByTime(username, null, null);
}
/**
* reads all the jobs from DB for username and excludes the jobs mentioned in queries list
* @param username : username for which the jobs are to be read.
- * @param queries : the jobs to exclude
* @param startTime: can be null, if not then the window start time for job
* @param endTime: can be null, if not then the window end time for job
* @return : the jobs in db that are not in the queries
*/
- private List<Job> readDBOnlyJobs(String username, List<HiveQueryId> queries, Long startTime, Long endTime) {
- List<Job> dbOnlyJobs = new LinkedList<>();
- HashMap<String, String> operationIdVsHiveId = new HashMap<>();
-
- for (HiveQueryId hqid : queries) {
- operationIdVsHiveId.put(hqid.operationId, hqid.entity);
- }
- LOG.debug("operationIdVsHiveId : {} ", operationIdVsHiveId);
- //cover case when operationId is present, but not exists in ATS
- //e.g. optimized queries without executing jobs, like "SELECT * FROM TABLE"
- List<Job> jobs = viewJobResourceManager.readAll(new OnlyOwnersFilteringStrategy(username));
- for (Job job : jobs) {
- if (null != startTime && null != endTime && null != job.getDateSubmitted()
- && (job.getDateSubmitted() < startTime || job.getDateSubmitted() >= endTime || operationIdVsHiveId.containsKey(job.getGuid()))
- ) {
- continue; // don't include this in the result
- } else {
- dbOnlyJobs.add(job);
+ private List<Job> readDBJobs(final String username, final Long startTime, final Long endTime) {
+ List<Job> jobs = viewJobResourceManager.readAll( new FilteringStrategy() {
+ @Override
+ public boolean isConform(Indexed item) {
+ JobImpl job = (JobImpl) item;
+ return job.getOwner().compareTo(username) == 0 &&
+ ( (null == startTime || job.getDateSubmitted() >= startTime ) &&
+ ( null == endTime || job.getDateSubmitted() < endTime )
+ );
}
- }
- return dbOnlyJobs;
+ @Override
+ public String whereStatement() {
+ StringBuilder sb = new StringBuilder( "owner = '" ).append( username ).append( "'" );
+ if( null != startTime || null != endTime ) {
+ sb.append(" AND ( " );
+ if( null != startTime ) {
+ sb.append( " dateSubmitted >= " ).append( startTime );
+ }
+ if( null != endTime ){
+ if(null != startTime){
+ sb.append(" AND ");
+ }
+ sb.append(" dateSubmitted < ").append(endTime);
+ }
+ sb.append( " ) " );
+ }
+ String where = sb.toString();
+ LOG.debug("where statement : {}", where);
+ return where;
+ }
+ });
+ LOG.debug("returning jobs: {}", jobs);
+ return jobs;
}
private List<Job> fetchDagsAndMergeJobs(List<HiveQueryId> queries) {