You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by ni...@apache.org on 2016/12/30 08:10:12 UTC

ambari git commit: AMBARI-19062 : in hive view directly calling DB and removed ATS calls when fetching history data. (nitirajrathore)

Repository: ambari
Updated Branches:
  refs/heads/branch-2.5 86b86d458 -> 4149aaa5e


AMBARI-19062 : in hive view directly calling DB and removed ATS calls when fetching history data. (nitirajrathore)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/4149aaa5
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/4149aaa5
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/4149aaa5

Branch: refs/heads/branch-2.5
Commit: 4149aaa5e03d519256bd0735b6498cb846c77a47
Parents: 86b86d4
Author: Nitiraj Rathore <ni...@gmail.com>
Authored: Fri Dec 30 13:38:41 2016 +0530
Committer: Nitiraj Rathore <ni...@gmail.com>
Committed: Fri Dec 30 13:39:26 2016 +0530

----------------------------------------------------------------------
 .../view/hive2/resources/jobs/Aggregator.java   | 137 ++++++++++---------
 1 file changed, 73 insertions(+), 64 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/4149aaa5/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/resources/jobs/Aggregator.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/resources/jobs/Aggregator.java b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/resources/jobs/Aggregator.java
index 99faeca..d399c47 100644
--- a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/resources/jobs/Aggregator.java
+++ b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/resources/jobs/Aggregator.java
@@ -19,11 +19,15 @@
 package org.apache.ambari.view.hive2.resources.jobs;
 
 import akka.actor.ActorRef;
+import com.google.common.base.Function;
+import com.google.common.base.Joiner;
+import com.google.common.base.Predicate;
+import com.google.common.base.Strings;
+import com.google.common.collect.FluentIterable;
 import org.apache.ambari.view.hive2.actor.message.job.SaveDagInformation;
 import org.apache.ambari.view.hive2.persistence.utils.FilteringStrategy;
 import org.apache.ambari.view.hive2.persistence.utils.Indexed;
 import org.apache.ambari.view.hive2.persistence.utils.ItemNotFound;
-import org.apache.ambari.view.hive2.persistence.utils.OnlyOwnersFilteringStrategy;
 import org.apache.ambari.view.hive2.resources.IResourceManager;
 import org.apache.ambari.view.hive2.resources.files.FileService;
 import org.apache.ambari.view.hive2.resources.jobs.atsJobs.HiveQueryId;
@@ -36,8 +40,8 @@ import org.apache.commons.beanutils.PropertyUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nullable;
 import java.lang.reflect.InvocationTargetException;
-import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 
@@ -84,13 +88,9 @@ public class Aggregator {
    * @param endTime:   exclusive, time in secs from epoch
    * @return: list of jobs
    */
-  public List<Job> readAllForUserByTime(String username, long startTime, long endTime) {
-    List<HiveQueryId> queryIdList = ats.getHiveQueryIdsForUserByTime(username, startTime, endTime);
-    List<Job> allJobs = fetchDagsAndMergeJobs(queryIdList);
-    List<Job> dbOnlyJobs = readDBOnlyJobs(username, queryIdList, startTime, endTime);
-    allJobs.addAll(dbOnlyJobs);
-
-    return allJobs;
+  public List<Job> readAllForUserByTime(String username, Long startTime, Long endTime) {
+    List<Job> jobs = readDBJobs(username, startTime, endTime);
+    return jobs;
   }
 
   /**
@@ -98,25 +98,22 @@ public class Aggregator {
    * @param jobInfos: infos of job to get
    * @return: list of updated Job
    */
-  public List<Job> readJobsByIds(List<JobInfo> jobInfos) {
-    //categorize jobs
-    List<String> jobsWithHiveIds = new LinkedList<>();
-    List<String> dbOnlyJobs = new LinkedList<>();
-
-    for (JobInfo jobInfo : jobInfos) {
-      if (null == jobInfo.getHiveId() || jobInfo.getHiveId().trim().isEmpty()) {
-        dbOnlyJobs.add(jobInfo.getJobId());
-      } else {
-        jobsWithHiveIds.add(jobInfo.getHiveId());
+  public List<Job> readJobsByIds(final List<JobInfo> jobInfos) {
+    List<String> jobIds = FluentIterable.from(jobInfos).filter(new Predicate<JobInfo>() {
+      @Override
+      public boolean apply(@Nullable JobInfo input) {
+        return !Strings.isNullOrEmpty(input.getJobId());
       }
-    }
-
-    List<HiveQueryId> queryIdList = ats.getHiveQueryIdByEntityList(jobsWithHiveIds);
-    List<Job> allJobs = fetchDagsAndMergeJobs(queryIdList);
-    List<Job> dbJobs = readJobsFromDbByJobId(dbOnlyJobs);
-
-    allJobs.addAll(dbJobs);
-    return allJobs;
+    }).transform(new Function<JobInfo, String>() {
+      @Nullable
+      @Override
+      public String apply(@Nullable JobInfo input) {
+        return input.getJobId();
+      }
+    }).toList();
+    List<Job> dbJobs = readJobsFromDbByJobId(jobIds);
+    LOG.debug("readJobsByIds: dbJobs : {}", dbJobs);
+    return dbJobs;
   }
 
   /**
@@ -124,17 +121,24 @@ public class Aggregator {
    * @param jobsIds: list of ids of jobs
    * @return: list of all the jobs found
    */
-  private List<Job> readJobsFromDbByJobId(List<String> jobsIds) {
-    List<Job> jobs = new LinkedList<>();
-    for (final String jid : jobsIds) {
-      try {
-        Job job = getJobFromDbByJobId(jid);
-        jobs.add(job);
-      } catch (ItemNotFound itemNotFound) {
-        LOG.error("Error while finding job with id : {}", jid, itemNotFound);
+  private List<Job> readJobsFromDbByJobId(final List<String> jobsIds) {
+    LOG.info("Reading jobs from db with ids : {} ", jobsIds);
+    List<Job> jobs = viewJobResourceManager.readAll(new FilteringStrategy() {
+      @Override
+      public boolean isConform(Indexed item) {
+        JobImpl job = (JobImpl) item;
+        return jobsIds.contains(job.getId());
       }
-    }
 
+      @Override
+      public String whereStatement() {
+        String query = " id in ( " + Joiner.on(",").join(jobsIds) + " ) ";
+        LOG.debug("where clause for jobsIds : {}", query);
+        return query;
+      }
+    });
+
+    LOG.debug("jobs returned from DB : {}" , jobs);
     return jobs;
   }
 
@@ -172,44 +176,49 @@ public class Aggregator {
    * @return
    */
   public List<Job> readAll(String username) {
-    List<HiveQueryId> queries = ats.getHiveQueryIdsForUser(username);
-    LOG.debug("HiveQueryIds fetched : {}", queries);
-    List<Job> allJobs = fetchDagsAndMergeJobs(queries);
-    List<Job> dbOnlyJobs = readDBOnlyJobs(username, queries, null, null);
-    LOG.debug("Jobs only present in DB: {}", dbOnlyJobs);
-    allJobs.addAll(dbOnlyJobs);
-    return allJobs;
+    return readAllForUserByTime(username, null, null);
   }
 
   /**
    * reads all the jobs from DB for username and excludes the jobs mentioned in queries list
    * @param username : username for which the jobs are to be read.
-   * @param queries : the jobs to exclude
    * @param startTime: can be null, if not then the window start time for job
    * @param endTime: can be null, if not then the window end time for job
    * @return : the jobs in db that are not in the queries
    */
-  private List<Job> readDBOnlyJobs(String username, List<HiveQueryId> queries, Long startTime, Long endTime) {
-    List<Job> dbOnlyJobs = new LinkedList<>();
-    HashMap<String, String> operationIdVsHiveId = new HashMap<>();
-
-    for (HiveQueryId hqid : queries) {
-      operationIdVsHiveId.put(hqid.operationId, hqid.entity);
-    }
-    LOG.debug("operationIdVsHiveId : {} ", operationIdVsHiveId);
-    //cover case when operationId is present, but not exists in ATS
-    //e.g. optimized queries without executing jobs, like "SELECT * FROM TABLE"
-    List<Job> jobs = viewJobResourceManager.readAll(new OnlyOwnersFilteringStrategy(username));
-    for (Job job : jobs) {
-      if (null != startTime && null != endTime && null != job.getDateSubmitted()
-        && (job.getDateSubmitted() < startTime || job.getDateSubmitted() >= endTime || operationIdVsHiveId.containsKey(job.getGuid()))
-        ) {
-        continue; // don't include this in the result
-      } else {
-        dbOnlyJobs.add(job);
+  private List<Job> readDBJobs(final String username, final Long startTime, final Long endTime) {
+    List<Job> jobs = viewJobResourceManager.readAll( new FilteringStrategy() {
+      @Override
+      public boolean isConform(Indexed item) {
+        JobImpl job = (JobImpl) item;
+        return job.getOwner().compareTo(username) == 0 &&
+          ( (null == startTime || job.getDateSubmitted() >= startTime ) &&
+            ( null == endTime || job.getDateSubmitted() < endTime )
+          );
       }
-    }
-    return dbOnlyJobs;
+      @Override
+      public String whereStatement() {
+        StringBuilder sb = new StringBuilder( "owner = '" ).append( username ).append( "'" );
+        if( null != startTime || null != endTime ) {
+          sb.append(" AND ( " );
+          if( null != startTime ) {
+            sb.append( " dateSubmitted >= " ).append( startTime );
+          }
+          if( null != endTime ){
+            if(null != startTime){
+              sb.append(" AND ");
+            }
+            sb.append(" dateSubmitted < ").append(endTime);
+          }
+          sb.append( " ) " );
+        }
+        String where = sb.toString();
+        LOG.debug("where statement : {}", where);
+        return where;
+      }
+    });
+    LOG.debug("returning jobs: {}", jobs);
+    return jobs;
   }
 
   private List<Job> fetchDagsAndMergeJobs(List<HiveQueryId> queries) {