You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by nc...@apache.org on 2017/01/05 00:04:54 UTC

[13/50] [abbrv] ambari git commit: AMBARI-19321 : Hive View 2.0 - Minimal view for Hive which includes new UI changes. Also made changes in poms as required (nitirajrathore)

http://git-wip-us.apache.org/repos/asf/ambari/blob/853a1ce7/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/files/FileService.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/files/FileService.java b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/files/FileService.java
new file mode 100644
index 0000000..a3623e9
--- /dev/null
+++ b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/files/FileService.java
@@ -0,0 +1,266 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.view.hive20.resources.files;
+
+import com.jayway.jsonpath.JsonPath;
+import org.apache.ambari.view.ViewContext;
+import org.apache.ambari.view.ViewResourceHandler;
+import org.apache.ambari.view.commons.hdfs.UserService;
+import org.apache.ambari.view.hive20.BaseService;
+import org.apache.ambari.view.hive20.utils.*;
+import org.apache.ambari.view.utils.hdfs.HdfsApi;
+import org.apache.ambari.view.utils.hdfs.HdfsUtil;
+import org.apache.commons.codec.binary.Base64;
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileAlreadyExistsException;
+import org.json.simple.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.inject.Inject;
+import javax.servlet.http.HttpServletResponse;
+import javax.ws.rs.*;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import javax.ws.rs.core.UriInfo;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URL;
+import java.util.HashMap;
+
+/**
+ * File access resource
+ * API:
+ * GET /:path
+ *      read entire file
+ * POST /
+ *      create new file
+ *      Required: filePath
+ *      file should not already exists
+ * PUT /:path
+ *      update file content
+ */
+public class FileService extends BaseService {
+  public static final String FAKE_FILE = "fakefile://";
+  public static final String JSON_PATH_FILE = "jsonpath:";
+
+  @Inject
+  ViewResourceHandler handler;
+
+  protected final static Logger LOG =
+      LoggerFactory.getLogger(FileService.class);
+
+  /**
+   * Get single item
+   */
+  @GET
+  @Path("{filePath:.*}")
+  @Produces(MediaType.APPLICATION_JSON)
+  public Response getFilePage(@PathParam("filePath") String filePath, @QueryParam("page") Long page) throws IOException, InterruptedException {
+
+    LOG.debug("Reading file " + filePath);
+    try {
+      FileResource file = new FileResource();
+
+      if (page == null)
+        page = 0L;
+
+      if (filePath.startsWith(FAKE_FILE)) {
+        if (page > 1)
+          throw new IllegalArgumentException("There's only one page in fake files");
+
+        String encodedContent = filePath.substring(FAKE_FILE.length());
+        String content = new String(Base64.decodeBase64(encodedContent));
+
+        fillFakeFileObject(filePath, file, content);
+      } else if (filePath.startsWith(JSON_PATH_FILE)) {
+        if (page > 1)
+          throw new IllegalArgumentException("There's only one page in fake files");
+
+        String content = getJsonPathContentByUrl(filePath);
+        fillFakeFileObject(filePath, file, content);
+      } else  {
+
+        filePath = sanitizeFilePath(filePath);
+        FilePaginator paginator = new FilePaginator(filePath, getSharedObjectsFactory().getHdfsApi());
+
+        fillRealFileObject(filePath, page, file, paginator);
+      }
+
+      JSONObject object = new JSONObject();
+      object.put("file", file);
+      return Response.ok(object).status(200).build();
+    } catch (WebApplicationException ex) {
+      throw ex;
+    } catch (FileNotFoundException ex) {
+      throw new NotFoundFormattedException(ex.getMessage(), ex);
+    } catch (IllegalArgumentException ex) {
+      throw new BadRequestFormattedException(ex.getMessage(), ex);
+    } catch (Exception ex) {
+      throw new ServiceFormattedException(ex.getMessage(), ex);
+    }
+  }
+
+  protected String getJsonPathContentByUrl(String filePath) throws IOException {
+    URL url = new URL(filePath.substring(JSON_PATH_FILE.length()));
+
+    InputStream responseInputStream = context.getURLStreamProvider().readFrom(url.toString(), "GET",
+        (String)null, new HashMap<String, String>());
+    String response = IOUtils.toString(responseInputStream);
+
+    for (String ref : url.getRef().split("!")) {
+      response = JsonPath.read(response, ref);
+    }
+    return response;
+  }
+
+  public void fillRealFileObject(String filePath, Long page, FileResource file, FilePaginator paginator) throws IOException, InterruptedException {
+    file.setFilePath(filePath);
+    file.setFileContent(paginator.readPage(page));
+    file.setHasNext(paginator.pageCount() > page + 1);
+    file.setPage(page);
+    file.setPageCount(paginator.pageCount());
+  }
+
+  public void fillFakeFileObject(String filePath, FileResource file, String content) {
+    file.setFilePath(filePath);
+    file.setFileContent(content);
+    file.setHasNext(false);
+    file.setPage(0);
+    file.setPageCount(1);
+  }
+
+  /**
+   * Delete single item
+   */
+  @DELETE
+  @Path("{filePath:.*}")
+  public Response deleteFile(@PathParam("filePath") String filePath) throws IOException, InterruptedException {
+    try {
+      filePath = sanitizeFilePath(filePath);
+      LOG.debug("Deleting file " + filePath);
+      if (getSharedObjectsFactory().getHdfsApi().delete(filePath, false)) {
+        return Response.status(204).build();
+      }
+      throw new NotFoundFormattedException("FileSystem.delete returned false", null);
+    } catch (WebApplicationException ex) {
+      throw ex;
+    } catch (Exception ex) {
+      throw new ServiceFormattedException(ex.getMessage(), ex);
+    }
+  }
+
+  /**
+   * Update item
+   */
+  @PUT
+  @Path("{filePath:.*}")
+  @Consumes(MediaType.APPLICATION_JSON)
+  public Response updateFile(FileResourceRequest request,
+                             @PathParam("filePath") String filePath) throws IOException, InterruptedException {
+    try {
+      filePath = sanitizeFilePath(filePath);
+      LOG.debug("Rewriting file " + filePath);
+      FSDataOutputStream output = getSharedObjectsFactory().getHdfsApi().create(filePath, true);
+      output.writeBytes(request.file.getFileContent());
+      output.close();
+      return Response.status(204).build();
+    } catch (WebApplicationException ex) {
+      throw ex;
+    } catch (Exception ex) {
+      throw new ServiceFormattedException(ex.getMessage(), ex);
+    }
+  }
+
+  /**
+   * Create script
+   */
+  @POST
+  @Consumes(MediaType.APPLICATION_JSON)
+  public Response createFile(FileResourceRequest request,
+                             @Context HttpServletResponse response, @Context UriInfo ui)
+      throws IOException, InterruptedException {
+    try {
+      LOG.debug("Creating file " + request.file.getFilePath());
+      try {
+        FSDataOutputStream output = getSharedObjectsFactory().getHdfsApi().create(request.file.getFilePath(), false);
+        if (request.file.getFileContent() != null) {
+          output.writeBytes(request.file.getFileContent());
+        }
+        output.close();
+      } catch (FileAlreadyExistsException ex) {
+        throw new ServiceFormattedException("F020 File already exists", ex, 400);
+      }
+      response.setHeader("Location",
+          String.format("%s/%s", ui.getAbsolutePath().toString(), request.file.getFilePath()));
+      return Response.status(204).build();
+    } catch (WebApplicationException ex) {
+      throw ex;
+    } catch (Exception ex) {
+      throw new ServiceFormattedException(ex.getMessage(), ex);
+    }
+  }
+
+  /**
+   * Checks connection to HDFS
+   * @param context View Context
+   */
+  public static void hdfsSmokeTest(ViewContext context) {
+    try {
+      HdfsApi api = HdfsUtil.connectToHDFSApi(context);
+      api.getStatus();
+    } catch (WebApplicationException ex) {
+      throw ex;
+    } catch (Exception ex) {
+      throw new ServiceFormattedException(ex.getMessage(), ex);
+    }
+  }
+
+  /**
+   * Checks connection to User HomeDirectory
+   * @param context View Context
+   */
+  public static void userhomeSmokeTest(ViewContext context) {
+    try {
+      UserService userservice = new UserService(context);
+      userservice.homeDir();
+    } catch (WebApplicationException ex) {
+      throw ex;
+    } catch (Exception ex) {
+      throw new ServiceFormattedException(ex.getMessage(), ex);
+    }
+  }
+
+  /**
+   * Wrapper object for json mapping
+   */
+  public static class FileResourceRequest {
+    public FileResource file;
+  }
+
+  private String sanitizeFilePath(String filePath){
+    if (!filePath.startsWith("/") && !filePath.startsWith(".")) {
+      filePath = "/" + filePath;  // some servers strip double slashes in URL
+    }
+    return filePath;
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/853a1ce7/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/jobs/Aggregator.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/jobs/Aggregator.java b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/jobs/Aggregator.java
new file mode 100644
index 0000000..c70585e
--- /dev/null
+++ b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/jobs/Aggregator.java
@@ -0,0 +1,382 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.view.hive20.resources.jobs;
+
+import akka.actor.ActorRef;
+import org.apache.ambari.view.hive20.actor.message.job.SaveDagInformation;
+import org.apache.ambari.view.hive20.persistence.utils.FilteringStrategy;
+import org.apache.ambari.view.hive20.persistence.utils.Indexed;
+import org.apache.ambari.view.hive20.persistence.utils.ItemNotFound;
+import org.apache.ambari.view.hive20.persistence.utils.OnlyOwnersFilteringStrategy;
+import org.apache.ambari.view.hive20.resources.IResourceManager;
+import org.apache.ambari.view.hive20.resources.files.FileService;
+import org.apache.ambari.view.hive20.resources.jobs.atsJobs.HiveQueryId;
+import org.apache.ambari.view.hive20.resources.jobs.atsJobs.IATSParser;
+import org.apache.ambari.view.hive20.resources.jobs.atsJobs.TezDagId;
+import org.apache.ambari.view.hive20.resources.jobs.viewJobs.Job;
+import org.apache.ambari.view.hive20.resources.jobs.viewJobs.JobImpl;
+import org.apache.ambari.view.hive20.resources.jobs.viewJobs.JobInfo;
+import org.apache.commons.beanutils.PropertyUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.InvocationTargetException;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ * View Jobs and ATS Jobs aggregator.
+ * There are 4 options:
+ * 1) ATS ExecuteJob without operationId
+ *    *Meaning*: executed outside of HS2
+ *    - ExecuteJob info only from ATS
+ * 2) ATS ExecuteJob with operationId
+ *    a) Hive View ExecuteJob with same operationId is not present
+ *        *Meaning*: executed with HS2
+ *      - ExecuteJob info only from ATS
+ *    b) Hive View ExecuteJob with operationId is present (need to merge)
+ *        *Meaning*: executed with HS2 through Hive View
+ *      - ExecuteJob info merged from ATS and from Hive View DataStorage
+ * 3) ExecuteJob present only in Hive View, ATS does not have it
+ *   *Meaning*: executed through Hive View, but Hadoop ExecuteJob was not created
+ *   it can happen if user executes query without aggregation, like just "select * from TABLE"
+ *   - ExecuteJob info only from Hive View
+ */
+public class Aggregator {
+  protected final static Logger LOG =
+    LoggerFactory.getLogger(Aggregator.class);
+
+  private final IATSParser ats;
+  private IResourceManager<Job> viewJobResourceManager;
+  private final ActorRef operationController;
+
+  public Aggregator(IResourceManager<Job> jobResourceManager,
+                    IATSParser ats, ActorRef operationController) {
+    this.viewJobResourceManager = jobResourceManager;
+    this.ats = ats;
+    this.operationController = operationController;
+  }
+
+  /**
+   * gets all the jobs for 'username' where the job submission time is between 'startTime' (inclusive)
+   * and endTime (exclusive).
+   * Fetches the jobs from ATS and DB merges and update DB. returns the combined list.
+   *
+   * @param username:  username for which jobs have to be fetched.
+   * @param startTime: inclusive, time in secs from epoch
+   * @param endTime:   exclusive, time in secs from epoch
+   * @return: list of jobs
+   */
+  public List<Job> readAllForUserByTime(String username, long startTime, long endTime) {
+    List<HiveQueryId> queryIdList = ats.getHiveQueryIdsForUserByTime(username, startTime, endTime);
+    List<Job> allJobs = fetchDagsAndMergeJobs(queryIdList);
+    List<Job> dbOnlyJobs = readDBOnlyJobs(username, queryIdList, startTime, endTime);
+    allJobs.addAll(dbOnlyJobs);
+
+    return allJobs;
+  }
+
+  /**
+   * fetches the new state of jobs from ATS and from DB. Does merging/updating as required.
+   * @param jobInfos: infos of job to get
+   * @return: list of updated Job
+   */
+  public List<Job> readJobsByIds(List<JobInfo> jobInfos) {
+    //categorize jobs
+    List<String> jobsWithHiveIds = new LinkedList<>();
+    List<String> dbOnlyJobs = new LinkedList<>();
+
+    for (JobInfo jobInfo : jobInfos) {
+      if (null == jobInfo.getHiveId() || jobInfo.getHiveId().trim().isEmpty()) {
+        dbOnlyJobs.add(jobInfo.getJobId());
+      } else {
+        jobsWithHiveIds.add(jobInfo.getHiveId());
+      }
+    }
+
+    List<HiveQueryId> queryIdList = ats.getHiveQueryIdByEntityList(jobsWithHiveIds);
+    List<Job> allJobs = fetchDagsAndMergeJobs(queryIdList);
+    List<Job> dbJobs = readJobsFromDbByJobId(dbOnlyJobs);
+
+    allJobs.addAll(dbJobs);
+    return allJobs;
+  }
+
+  /**
+   * gets the jobs from the Database given their id
+   * @param jobsIds: list of ids of jobs
+   * @return: list of all the jobs found
+   */
+  private List<Job> readJobsFromDbByJobId(List<String> jobsIds) {
+    List<Job> jobs = new LinkedList<>();
+    for (final String jid : jobsIds) {
+      try {
+        Job job = getJobFromDbByJobId(jid);
+        jobs.add(job);
+      } catch (ItemNotFound itemNotFound) {
+        LOG.error("Error while finding job with id : {}", jid, itemNotFound);
+      }
+    }
+
+    return jobs;
+  }
+
+  /**
+   * fetches the job from DB given its id
+   * @param jobId: the id of the job to fetch
+   * @return: the job
+   * @throws ItemNotFound: if job with given id is not found in db
+   */
+  private Job getJobFromDbByJobId(final String jobId) throws ItemNotFound {
+    if (null == jobId)
+      return null;
+
+    List<Job> jobs = viewJobResourceManager.readAll(new FilteringStrategy() {
+      @Override
+      public boolean isConform(Indexed item) {
+        return item.getId().equals(jobId);
+      }
+
+      @Override
+      public String whereStatement() {
+        return "id = '" + jobId + "'"; // even IDs are string
+      }
+    });
+
+    if (null != jobs && !jobs.isEmpty())
+      return jobs.get(0);
+
+    throw new ItemNotFound(String.format("Job with id %s not found.", jobId));
+  }
+
+  /**
+   * returns all the jobs from ATS and DB (for this instance) for the given user.
+   * @param username
+   * @return
+   */
+  public List<Job> readAll(String username) {
+    List<HiveQueryId> queries = ats.getHiveQueryIdsForUser(username);
+    LOG.debug("HiveQueryIds fetched : {}", queries);
+    List<Job> allJobs = fetchDagsAndMergeJobs(queries);
+    List<Job> dbOnlyJobs = readDBOnlyJobs(username, queries, null, null);
+    LOG.debug("Jobs only present in DB: {}", dbOnlyJobs);
+    allJobs.addAll(dbOnlyJobs);
+    return allJobs;
+  }
+
+  /**
+   * reads all the jobs from DB for username and excludes the jobs mentioned in queries list
+   * @param username : username for which the jobs are to be read.
+   * @param queries : the jobs to exclude
+   * @param startTime: can be null, if not then the window start time for job
+   * @param endTime: can be null, if not then the window end time for job
+   * @return : the jobs in db that are not in the queries
+   */
+  private List<Job> readDBOnlyJobs(String username, List<HiveQueryId> queries, Long startTime, Long endTime) {
+    List<Job> dbOnlyJobs = new LinkedList<>();
+    HashMap<String, String> operationIdVsHiveId = new HashMap<>();
+
+    for (HiveQueryId hqid : queries) {
+      operationIdVsHiveId.put(hqid.operationId, hqid.entity);
+    }
+    LOG.debug("operationIdVsHiveId : {} ", operationIdVsHiveId);
+    //cover case when operationId is present, but not exists in ATS
+    //e.g. optimized queries without executing jobs, like "SELECT * FROM TABLE"
+    List<Job> jobs = viewJobResourceManager.readAll(new OnlyOwnersFilteringStrategy(username));
+    for (Job job : jobs) {
+      if (null != startTime && null != endTime && null != job.getDateSubmitted()
+        && (job.getDateSubmitted() < startTime || job.getDateSubmitted() >= endTime || operationIdVsHiveId.containsKey(job.getGuid()))
+        ) {
+        continue; // don't include this in the result
+      } else {
+        dbOnlyJobs.add(job);
+      }
+    }
+    return dbOnlyJobs;
+  }
+
+  private List<Job> fetchDagsAndMergeJobs(List<HiveQueryId> queries) {
+    List<Job> allJobs = new LinkedList<Job>();
+
+    for (HiveQueryId atsHiveQuery : queries) {
+      JobImpl atsJob = null;
+      if (hasOperationId(atsHiveQuery)) {
+        try {
+          Job viewJob = getJobByOperationId(atsHiveQuery.operationId);
+          TezDagId atsTezDag = getTezDagFromHiveQueryId(atsHiveQuery);
+          atsJob = mergeHiveAtsTez(atsHiveQuery, atsTezDag, viewJob);
+        } catch (ItemNotFound itemNotFound) {
+          LOG.error("Ignore : {}", itemNotFound.getMessage());
+          continue;
+        }
+      } else {
+        TezDagId atsTezDag = getTezDagFromHiveQueryId(atsHiveQuery);
+        atsJob = atsOnlyJob(atsHiveQuery, atsTezDag);
+      }
+
+      atsJob.setHiveQueryId(atsHiveQuery.entity);
+      allJobs.add(atsJob);
+    }
+
+    return allJobs;
+  }
+
+  /**
+   * @param atsHiveQuery
+   * @param atsTezDag
+   * @param viewJob
+   * @return
+   */
+  private JobImpl mergeHiveAtsTez(HiveQueryId atsHiveQuery, TezDagId atsTezDag, Job viewJob) throws ItemNotFound {
+    saveJobInfoIfNeeded(atsHiveQuery, atsTezDag, viewJob);
+    return mergeAtsJobWithViewJob(atsHiveQuery, atsTezDag, viewJob);
+  }
+
+  public Job readATSJob(Job viewJob) throws ItemNotFound {
+
+    if (viewJob.getStatus().equals(Job.JOB_STATE_INITIALIZED) || viewJob.getStatus().equals(Job.JOB_STATE_UNKNOWN))
+      return viewJob;
+
+    String hexGuid = viewJob.getGuid();
+
+
+    HiveQueryId atsHiveQuery = ats.getHiveQueryIdByOperationId(hexGuid);
+
+    TezDagId atsTezDag = getTezDagFromHiveQueryId(atsHiveQuery);
+
+    saveJobInfoIfNeeded(atsHiveQuery, atsTezDag, viewJob, true);
+    return mergeAtsJobWithViewJob(atsHiveQuery, atsTezDag, viewJob);
+  }
+
+  private TezDagId getTezDagFromHiveQueryId(HiveQueryId atsHiveQuery) {
+    TezDagId atsTezDag;
+    if (atsHiveQuery.version >= HiveQueryId.ATS_15_RESPONSE_VERSION) {
+      atsTezDag = ats.getTezDAGByEntity(atsHiveQuery.entity);
+    } else if (atsHiveQuery.dagNames != null && atsHiveQuery.dagNames.size() > 0) {
+      String dagName = atsHiveQuery.dagNames.get(0);
+
+      atsTezDag = ats.getTezDAGByName(dagName);
+    } else {
+      atsTezDag = new TezDagId();
+    }
+    return atsTezDag;
+  }
+
+  protected boolean hasOperationId(HiveQueryId atsHiveQuery) {
+    return atsHiveQuery.operationId != null;
+  }
+
+  protected JobImpl mergeAtsJobWithViewJob(HiveQueryId atsHiveQuery, TezDagId atsTezDag, Job viewJob) {
+    JobImpl atsJob;
+    try {
+      atsJob = new JobImpl(PropertyUtils.describe(viewJob));
+    } catch (IllegalAccessException e) {
+      LOG.error("Can't instantiate JobImpl", e);
+      return null;
+    } catch (InvocationTargetException e) {
+      LOG.error("Can't instantiate JobImpl", e);
+      return null;
+    } catch (NoSuchMethodException e) {
+      LOG.error("Can't instantiate JobImpl", e);
+      return null;
+    }
+    fillAtsJobFields(atsJob, atsHiveQuery, atsTezDag);
+    return atsJob;
+  }
+
+  protected void saveJobInfoIfNeeded(HiveQueryId hiveQueryId, TezDagId tezDagId, Job viewJob) throws ItemNotFound {
+    saveJobInfoIfNeeded(hiveQueryId, tezDagId, viewJob, false);
+  }
+
+  protected void saveJobInfoIfNeeded(HiveQueryId hiveQueryId, TezDagId tezDagId, Job viewJob, boolean useActorSystem) throws ItemNotFound {
+    boolean updateDb = false;
+    String dagName = null;
+    String dagId = null;
+    String applicationId = null;
+    if (viewJob.getDagName() == null || viewJob.getDagName().isEmpty()) {
+      if (hiveQueryId.dagNames != null && hiveQueryId.dagNames.size() > 0) {
+        dagName = hiveQueryId.dagNames.get(0);
+        updateDb = true;
+      }
+    }
+    if (tezDagId.status != null && (tezDagId.status.compareToIgnoreCase(Job.JOB_STATE_UNKNOWN) != 0) &&
+        !viewJob.getStatus().equalsIgnoreCase(tezDagId.status)) {
+      dagId = tezDagId.entity;
+      applicationId = tezDagId.applicationId;
+      updateDb = true;
+    }
+
+    if(updateDb) {
+      if (useActorSystem) {
+        LOG.info("Saving DAG information via actor system for job id: {}", viewJob.getId());
+        operationController.tell(new SaveDagInformation(viewJob.getId(), dagName, dagId, applicationId), ActorRef.noSender());
+      } else {
+        viewJob.setDagName(dagName);
+        viewJob.setDagId(dagId);
+        viewJob.setApplicationId(applicationId);
+        viewJobResourceManager.update(viewJob, viewJob.getId());
+      }
+    }
+  }
+
+  protected JobImpl atsOnlyJob(HiveQueryId atsHiveQuery, TezDagId atsTezDag) {
+    JobImpl atsJob = new JobImpl();
+    atsJob.setId(atsHiveQuery.entity);
+    fillAtsJobFields(atsJob, atsHiveQuery, atsTezDag);
+
+    String query = atsHiveQuery.query;
+    atsJob.setTitle(query.substring(0, (query.length() > 42) ? 42 : query.length()));
+
+    atsJob.setQueryFile(FileService.JSON_PATH_FILE + atsHiveQuery.url + "#otherinfo.QUERY!queryText");
+    return atsJob;
+  }
+
+  protected JobImpl fillAtsJobFields(JobImpl atsJob, HiveQueryId atsHiveQuery, TezDagId atsTezDag) {
+    atsJob.setApplicationId(atsTezDag.applicationId);
+
+    if (atsHiveQuery.dagNames != null && atsHiveQuery.dagNames.size() > 0)
+      atsJob.setDagName(atsHiveQuery.dagNames.get(0));
+    atsJob.setDagId(atsTezDag.entity);
+    if (atsHiveQuery.starttime != 0)
+      atsJob.setDateSubmitted(atsHiveQuery.starttime);
+    atsJob.setDuration(atsHiveQuery.duration);
+    return atsJob;
+  }
+
+  protected Job getJobByOperationId(final String opId) throws ItemNotFound {
+    List<Job> jobs = viewJobResourceManager.readAll(new FilteringStrategy() {
+      @Override
+      public boolean isConform(Indexed item) {
+        Job opHandle = (Job) item;
+        return opHandle.getGuid().equals(opId);
+      }
+
+      @Override
+      public String whereStatement() {
+        return "guid='" + opId + "'";
+      }
+    });
+
+    if (jobs.size() != 1)
+      throw new ItemNotFound();
+
+    return jobs.get(0);
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/853a1ce7/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/jobs/JobResourceProvider.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/jobs/JobResourceProvider.java b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/jobs/JobResourceProvider.java
new file mode 100644
index 0000000..6156933
--- /dev/null
+++ b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/jobs/JobResourceProvider.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.view.hive20.resources.jobs;
+
+import org.apache.ambari.view.*;
+import org.apache.ambari.view.hive20.persistence.utils.ItemNotFound;
+import org.apache.ambari.view.hive20.persistence.utils.OnlyOwnersFilteringStrategy;
+import org.apache.ambari.view.hive20.resources.jobs.viewJobs.*;
+import org.apache.ambari.view.hive20.utils.SharedObjectsFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.inject.Inject;
+import java.lang.reflect.InvocationTargetException;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Resource provider for job
+ */
+public class JobResourceProvider implements ResourceProvider<Job> {
+  @Inject
+  ViewContext context;
+
+  protected JobResourceManager resourceManager = null;
+  protected final static Logger LOG =
+      LoggerFactory.getLogger(JobResourceProvider.class);
+
+  protected synchronized JobResourceManager getResourceManager() {
+    if (resourceManager == null) {
+      resourceManager = new JobResourceManager(new SharedObjectsFactory(context), context);
+    }
+    return resourceManager;
+  }
+
+  @Override
+  public Job getResource(String resourceId, Set<String> properties) throws SystemException, NoSuchResourceException, UnsupportedPropertyException {
+    try {
+      return getResourceManager().read(resourceId);
+    } catch (ItemNotFound itemNotFound) {
+      throw new NoSuchResourceException(resourceId);
+    }
+  }
+
+  @Override
+  public Set<Job> getResources(ReadRequest readRequest) throws SystemException, NoSuchResourceException, UnsupportedPropertyException {
+    if (context == null) {
+      return new HashSet<Job>();
+    }
+    return new HashSet<Job>(getResourceManager().readAll(
+        new OnlyOwnersFilteringStrategy(this.context.getUsername())));
+  }
+
+  @Override
+  public void createResource(String s, Map<String, Object> stringObjectMap) throws SystemException, ResourceAlreadyExistsException, NoSuchResourceException, UnsupportedPropertyException {
+    Job item = null;
+    try {
+      item = new JobImpl(stringObjectMap);
+    } catch (InvocationTargetException e) {
+      throw new SystemException("error on creating resource", e);
+    } catch (IllegalAccessException e) {
+      throw new SystemException("error on creating resource", e);
+    }
+    getResourceManager().create(item);
+    JobController jobController = new SharedObjectsFactory(context).getJobControllerFactory().createControllerForJob(item);
+    try {
+      jobController.submit();
+    } catch (Throwable throwable) {
+      throw new SystemException("error on creating resource", throwable);
+    }
+  }
+
+  @Override
+  public boolean updateResource(String resourceId, Map<String, Object> stringObjectMap) throws SystemException, NoSuchResourceException, UnsupportedPropertyException {
+    Job item = null;
+    try {
+      item = new JobImpl(stringObjectMap);
+    } catch (InvocationTargetException e) {
+      throw new SystemException("error on updating resource", e);
+    } catch (IllegalAccessException e) {
+      throw new SystemException("error on updating resource", e);
+    }
+    try {
+      getResourceManager().update(item, resourceId);
+    } catch (ItemNotFound itemNotFound) {
+      throw new NoSuchResourceException(resourceId);
+    }
+    return true;
+  }
+
+  @Override
+  public boolean deleteResource(String resourceId) throws SystemException, NoSuchResourceException, UnsupportedPropertyException {
+    try {
+      getResourceManager().delete(resourceId);
+    } catch (ItemNotFound itemNotFound) {
+      throw new NoSuchResourceException(resourceId);
+    }
+    return true;
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/853a1ce7/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/jobs/JobService.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/jobs/JobService.java b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/jobs/JobService.java
new file mode 100644
index 0000000..675ea37
--- /dev/null
+++ b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/jobs/JobService.java
@@ -0,0 +1,626 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.view.hive20.resources.jobs;
+
+import akka.actor.ActorRef;
+import com.beust.jcommander.internal.Lists;
+import com.google.common.base.Optional;
+import org.apache.ambari.view.ViewResourceHandler;
+import org.apache.ambari.view.hive20.BaseService;
+import org.apache.ambari.view.hive20.ConnectionFactory;
+import org.apache.ambari.view.hive20.ConnectionSystem;
+import org.apache.ambari.view.hive20.actor.message.job.Failure;
+import org.apache.ambari.view.hive20.backgroundjobs.BackgroundJobController;
+import org.apache.ambari.view.hive20.client.AsyncJobRunner;
+import org.apache.ambari.view.hive20.client.AsyncJobRunnerImpl;
+import org.apache.ambari.view.hive20.client.ColumnDescription;
+import org.apache.ambari.view.hive20.client.Cursor;
+import org.apache.ambari.view.hive20.client.EmptyCursor;
+import org.apache.ambari.view.hive20.client.HiveClientException;
+import org.apache.ambari.view.hive20.client.NonPersistentCursor;
+import org.apache.ambari.view.hive20.client.Row;
+import org.apache.ambari.view.hive20.persistence.utils.ItemNotFound;
+import org.apache.ambari.view.hive20.resources.jobs.atsJobs.IATSParser;
+import org.apache.ambari.view.hive20.resources.jobs.viewJobs.Job;
+import org.apache.ambari.view.hive20.resources.jobs.viewJobs.JobController;
+import org.apache.ambari.view.hive20.resources.jobs.viewJobs.JobImpl;
+import org.apache.ambari.view.hive20.resources.jobs.viewJobs.JobInfo;
+import org.apache.ambari.view.hive20.resources.jobs.viewJobs.JobResourceManager;
+import org.apache.ambari.view.hive20.utils.MisconfigurationFormattedException;
+import org.apache.ambari.view.hive20.utils.NotFoundFormattedException;
+import org.apache.ambari.view.hive20.utils.ServiceFormattedException;
+import org.apache.ambari.view.hive20.utils.SharedObjectsFactory;
+import org.apache.commons.beanutils.PropertyUtils;
+import org.apache.commons.csv.CSVFormat;
+import org.apache.commons.csv.CSVPrinter;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.json.simple.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.inject.Inject;
+import javax.servlet.http.HttpServletResponse;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.DELETE;
+import javax.ws.rs.GET;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import javax.ws.rs.core.StreamingOutput;
+import javax.ws.rs.core.UriInfo;
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.io.Writer;
+import java.lang.reflect.InvocationTargetException;
+import java.sql.SQLException;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+
+/**
+ * Servlet for queries
+ * API:
+ * GET /:id
+ *      read job
+ * POST /
+ *      create new job
+ *      Required: title, queryFile
+ * GET /
+ *      get all Jobs of current user
+ */
+public class JobService extends BaseService {
+  @Inject
+  ViewResourceHandler handler;
+
+  private JobResourceManager resourceManager;
+
+  protected final static Logger LOG =
+      LoggerFactory.getLogger(JobService.class);
+  private Aggregator aggregator;
+
+  protected synchronized JobResourceManager getResourceManager() {
+    if (resourceManager == null) {
+      SharedObjectsFactory connectionsFactory = getSharedObjectsFactory();
+      resourceManager = new JobResourceManager(connectionsFactory, context);
+    }
+    return resourceManager;
+  }
+
+
+  protected Aggregator getAggregator() {
+    if (aggregator == null) {
+      IATSParser atsParser = getSharedObjectsFactory().getATSParser();
+      ActorRef operationController = ConnectionSystem.getInstance().getOperationController(context);
+      aggregator = new Aggregator(getResourceManager(), atsParser, operationController);
+    }
+    return aggregator;
+  }
+
+  protected void setAggregator(Aggregator aggregator) {
+    this.aggregator = aggregator;
+  }
+
+  /**
+   * Get single item
+   */
+  @GET
+  @Path("{jobId}")
+  @Produces(MediaType.APPLICATION_JSON)
+  public Response getOne(@PathParam("jobId") String jobId) {
+    try {
+      JobController jobController = getResourceManager().readController(jobId);
+
+      Job job = jobController.getJob();
+      if(job.getStatus().equals(Job.JOB_STATE_ERROR) || job.getStatus().equals(Job.JOB_STATE_CANCELED)){
+        ConnectionSystem system = ConnectionSystem.getInstance();
+        final AsyncJobRunner asyncJobRunner = new AsyncJobRunnerImpl(context, system.getOperationController(context), system.getActorSystem());
+        Optional<Failure> error = asyncJobRunner.getError(jobId, context.getUsername());
+
+        if(error.isPresent()){
+          Throwable th = error.get().getError();
+          if(th instanceof SQLException){
+            SQLException sqlException = (SQLException) th;
+            if(sqlException.getSQLState().equals("AUTHFAIL") && ConnectionFactory.isLdapEnabled(context))
+              return Response.status(401).build();
+          }
+          throw new Exception(th);
+        }
+      }
+
+      JSONObject jsonJob = jsonObjectFromJob(jobController);
+      return Response.ok(jsonJob).build();
+    } catch (WebApplicationException ex) {
+      throw ex;
+    } catch (ItemNotFound itemNotFound) {
+      throw new NotFoundFormattedException(itemNotFound.getMessage(), itemNotFound);
+    } catch (Exception ex) {
+      throw new ServiceFormattedException(ex.getMessage(), ex);
+    }
+  }
+
+  private JSONObject jsonObjectFromJob(JobController jobController) throws IllegalAccessException, NoSuchMethodException, InvocationTargetException {
+    Job hiveJob = jobController.getJobPOJO();
+
+    Job mergedJob;
+    try {
+      mergedJob = getAggregator().readATSJob(hiveJob);
+    } catch (ItemNotFound itemNotFound) {
+      throw new ServiceFormattedException("E010 ExecuteJob not found", itemNotFound);
+    }
+    Map createdJobMap = PropertyUtils.describe(mergedJob);
+    createdJobMap.remove("class"); // no need to show Bean class on client
+
+    JSONObject jobJson = new JSONObject();
+    jobJson.put("job", createdJobMap);
+    return jobJson;
+  }
+
+  /**
+   * Get job results in csv format
+   */
+  @GET
+  @Path("{jobId}/results/csv")
+  @Produces("text/csv")
+  public Response getResultsCSV(@PathParam("jobId") String jobId,
+                                @Context HttpServletResponse response,
+                                @QueryParam("fileName") String fileName,
+                                @QueryParam("columns") final String requestedColumns) {
+    try {
+
+      final String username = context.getUsername();
+
+      ConnectionSystem system = ConnectionSystem.getInstance();
+      final AsyncJobRunner asyncJobRunner = new AsyncJobRunnerImpl(context, system.getOperationController(context), system.getActorSystem());
+
+      Optional<NonPersistentCursor> cursorOptional = asyncJobRunner.resetAndGetCursor(jobId, username);
+
+      if(!cursorOptional.isPresent()){
+        throw new Exception("Download failed");
+      }
+
+      final NonPersistentCursor resultSet = cursorOptional.get();
+
+
+      StreamingOutput stream = new StreamingOutput() {
+        @Override
+        public void write(OutputStream os) throws IOException, WebApplicationException {
+          Writer writer = new BufferedWriter(new OutputStreamWriter(os));
+          CSVPrinter csvPrinter = new CSVPrinter(writer, CSVFormat.DEFAULT);
+          try {
+
+            List<ColumnDescription> descriptions = resultSet.getDescriptions();
+            List<String> headers = Lists.newArrayList();
+            for (ColumnDescription description : descriptions) {
+              headers.add(description.getName());
+            }
+
+            csvPrinter.printRecord(headers.toArray());
+
+            while (resultSet.hasNext()) {
+              csvPrinter.printRecord(resultSet.next().getRow());
+              writer.flush();
+            }
+          } finally {
+            writer.close();
+          }
+        }
+      };
+
+      if (fileName == null || fileName.isEmpty()) {
+        fileName = "results.csv";
+      }
+
+      return Response.ok(stream).
+          header("Content-Disposition", String.format("attachment; filename=\"%s\"", fileName)).
+          build();
+
+
+    } catch (WebApplicationException ex) {
+      throw ex;
+    }  catch (Throwable ex) {
+      throw new ServiceFormattedException(ex.getMessage(), ex);
+    }
+  }
+
+  /**
+   * Get job results in csv format
+   */
+  @GET
+  @Path("{jobId}/results/csv/saveToHDFS")
+  @Produces(MediaType.APPLICATION_JSON)
+  public Response getResultsToHDFS(@PathParam("jobId") String jobId,
+                                   @QueryParam("commence") String commence,
+                                   @QueryParam("file") final String targetFile,
+                                   @QueryParam("stop") final String stop,
+                                   @QueryParam("columns") final String requestedColumns,
+                                   @Context HttpServletResponse response) {
+    try {
+
+      final JobController jobController = getResourceManager().readController(jobId);
+      final String username = context.getUsername();
+
+      String backgroundJobId = "csv" + String.valueOf(jobController.getJob().getId());
+      if (commence != null && commence.equals("true")) {
+        if (targetFile == null)
+          throw new MisconfigurationFormattedException("targetFile should not be empty");
+
+        ConnectionSystem system = ConnectionSystem.getInstance();
+        final AsyncJobRunner asyncJobRunner = new AsyncJobRunnerImpl(context, system.getOperationController(context), system.getActorSystem());
+
+        Optional<NonPersistentCursor> cursorOptional = asyncJobRunner.resetAndGetCursor(jobId, username);
+
+        if(!cursorOptional.isPresent()){
+          throw new Exception("Download failed");
+        }
+
+        final NonPersistentCursor resultSet = cursorOptional.get();
+
+        BackgroundJobController.getInstance(context).startJob(String.valueOf(backgroundJobId), new Runnable() {
+          @Override
+          public void run() {
+
+            try {
+
+              FSDataOutputStream stream = getSharedObjectsFactory().getHdfsApi().create(targetFile, true);
+              Writer writer = new BufferedWriter(new OutputStreamWriter(stream));
+              CSVPrinter csvPrinter = new CSVPrinter(writer, CSVFormat.DEFAULT);
+              try {
+                while (resultSet.hasNext() && !Thread.currentThread().isInterrupted()) {
+                  csvPrinter.printRecord(resultSet.next().getRow());
+                  writer.flush();
+                }
+              } finally {
+                writer.close();
+              }
+              stream.close();
+
+            } catch (IOException e) {
+              throw new ServiceFormattedException("F010 Could not write CSV to HDFS for job#" + jobController.getJob().getId(), e);
+            } catch (InterruptedException e) {
+              throw new ServiceFormattedException("F010 Could not write CSV to HDFS for job#" + jobController.getJob().getId(), e);
+            }
+          }
+        });
+      }
+
+      if (stop != null && stop.equals("true")) {
+        BackgroundJobController.getInstance(context).interrupt(backgroundJobId);
+      }
+
+      JSONObject object = new JSONObject();
+      object.put("stopped", BackgroundJobController.getInstance(context).isInterrupted(backgroundJobId));
+      object.put("jobId", jobController.getJob().getId());
+      object.put("backgroundJobId", backgroundJobId);
+      object.put("operationType", "CSV2HDFS");
+      object.put("status", BackgroundJobController.getInstance(context).state(backgroundJobId).toString());
+
+      return Response.ok(object).build();
+    } catch (WebApplicationException ex) {
+      throw ex;
+    } catch (ItemNotFound itemNotFound) {
+      throw new NotFoundFormattedException(itemNotFound.getMessage(), itemNotFound);
+    } catch (Exception ex) {
+      throw new ServiceFormattedException(ex.getMessage(), ex);
+    }
+  }
+
+
+  @Path("{jobId}/status")
+  @GET
+  @Consumes(MediaType.APPLICATION_JSON)
+  @Produces(MediaType.APPLICATION_JSON)
+  public Response fetchJobStatus(@PathParam("jobId") String jobId) throws ItemNotFound, HiveClientException, NoOperationStatusSetException {
+    JobController jobController = getResourceManager().readController(jobId);
+    Job job = jobController.getJob();
+    String jobStatus = job.getStatus();
+
+
+    LOG.info("jobStatus : {} for jobId : {}",jobStatus, jobId);
+
+    JSONObject jsonObject = new JSONObject();
+    jsonObject.put("jobStatus", jobStatus);
+    jsonObject.put("jobId", jobId);
+
+    return Response.ok(jsonObject).build();
+  }
+
+  /**
+   * Get next results page
+   */
+  @GET
+  @Path("{jobId}/results")
+  @Produces(MediaType.APPLICATION_JSON)
+  public Response getResults(@PathParam("jobId") final String jobId,
+                             @QueryParam("first") final String fromBeginning,
+                             @QueryParam("count") Integer count,
+                             @QueryParam("searchId") String searchId,
+                             @QueryParam("format") String format,
+                             @QueryParam("columns") final String requestedColumns) {
+    try {
+
+      final String username = context.getUsername();
+
+      ConnectionSystem system = ConnectionSystem.getInstance();
+      final AsyncJobRunner asyncJobRunner = new AsyncJobRunnerImpl(context, system.getOperationController(context), system.getActorSystem());
+
+      return ResultsPaginationController.getInstance(context)
+              .request(jobId, searchId, true, fromBeginning, count, format,requestedColumns,
+                      new Callable<Cursor< Row, ColumnDescription >>() {
+                        @Override
+                        public Cursor call() throws Exception {
+                          Optional<NonPersistentCursor> cursor;
+                          if(fromBeginning != null && fromBeginning.equals("true")){
+                            cursor = asyncJobRunner.resetAndGetCursor(jobId, username);
+                          }
+                          else {
+                            cursor = asyncJobRunner.getCursor(jobId, username);
+                          }
+                          if(cursor.isPresent())
+                          return cursor.get();
+                          else
+                            return new EmptyCursor();
+                        }
+                      }).build();
+
+    } catch (WebApplicationException ex) {
+      throw ex;
+    } catch (Exception ex) {
+      throw new ServiceFormattedException(ex.getMessage(), ex);
+    }
+  }
+
+  /**
+   * Renew expiration time for results
+   */
+  @GET
+  @Path("{jobId}/results/keepAlive")
+  public Response keepAliveResults(@PathParam("jobId") String jobId,
+                             @QueryParam("first") String fromBeginning,
+                             @QueryParam("count") Integer count) {
+    try {
+      if (!ResultsPaginationController.getInstance(context).keepAlive(jobId, ResultsPaginationController.DEFAULT_SEARCH_ID)) {
+        throw new NotFoundFormattedException("Results already expired", null);
+      }
+      return Response.ok().build();
+    } catch (WebApplicationException ex) {
+      throw ex;
+    } catch (Exception ex) {
+      throw new ServiceFormattedException(ex.getMessage(), ex);
+    }
+  }
+
+  /**
+   * Get progress info
+   */
+  @GET
+  @Path("{jobId}/progress")
+  @Produces(MediaType.APPLICATION_JSON)
+  public Response getProgress(@PathParam("jobId") String jobId) {
+    try {
+      final JobController jobController = getResourceManager().readController(jobId);
+
+      ProgressRetriever.Progress progress = new ProgressRetriever(jobController.getJob(), getSharedObjectsFactory()).
+          getProgress();
+
+      return Response.ok(progress).build();
+    } catch (WebApplicationException ex) {
+      throw ex;
+    } catch (ItemNotFound itemNotFound) {
+      throw new NotFoundFormattedException(itemNotFound.getMessage(), itemNotFound);
+    } catch (Exception ex) {
+      throw new ServiceFormattedException(ex.getMessage(), ex);
+    }
+  }
+
+  /**
+   * Delete single item
+   */
+  @DELETE
+  @Path("{id}")
+  public Response delete(@PathParam("id") String id,
+                         @QueryParam("remove") final String remove) {
+    try {
+      JobController jobController;
+      try {
+        jobController = getResourceManager().readController(id);
+      } catch (ItemNotFound itemNotFound) {
+        throw new NotFoundFormattedException(itemNotFound.getMessage(), itemNotFound);
+      }
+      jobController.cancel();
+      if (remove != null && remove.compareTo("true") == 0) {
+        getResourceManager().delete(id);
+      }
+      return Response.status(204).build();
+    } catch (WebApplicationException ex) {
+      throw ex;
+    } catch (ItemNotFound itemNotFound) {
+      throw new NotFoundFormattedException(itemNotFound.getMessage(), itemNotFound);
+    } catch (Exception ex) {
+      throw new ServiceFormattedException(ex.getMessage(), ex);
+    }
+  }
+
+  /**
+   * Get all Jobs
+   */
+  @GET
+  @Produces(MediaType.APPLICATION_JSON)
+  public Response getList(@QueryParam("startTime") long startTime, @QueryParam("endTime") long endTime) {
+    try {
+
+      LOG.debug("Getting all job: startTime: {}, endTime: {}",startTime,endTime);
+      List<Job> allJobs = getAggregator().readAllForUserByTime(context.getUsername(),startTime, endTime);
+      for(Job job : allJobs) {
+        job.setSessionTag(null);
+      }
+      JSONObject result = new JSONObject();
+      result.put("jobs", allJobs);
+      return Response.ok(result).build();
+    } catch (WebApplicationException ex) {
+      LOG.error("Exception occured while fetching all jobs.", ex);
+      throw ex;
+    } catch (Exception ex) {
+      LOG.error("Exception occured while fetching all jobs.", ex);
+      throw new ServiceFormattedException(ex.getMessage(), ex);
+    }
+  }
+
+  /**
+   * fetch the jobs with given info.
+   * provide as much info about the job so that next api can optimize the fetch process.
+   * @param jobInfos
+   * @return
+   */
+  @Path("/getList")
+  @POST
+  @Produces(MediaType.APPLICATION_JSON)
+  @Consumes(MediaType.APPLICATION_JSON)
+  public List<Job> getList(List<JobInfo> jobInfos) {
+    try {
+      LOG.debug("fetching jobs with ids :{}", jobInfos);
+      List<Job> allJobs = getAggregator().readJobsByIds(jobInfos);
+      for(Job job : allJobs) {
+        job.setSessionTag(null);
+      }
+
+      return allJobs;
+    } catch (WebApplicationException ex) {
+      LOG.error("Exception occured while fetching all jobs.", ex);
+      throw ex;
+    } catch (Exception ex) {
+      LOG.error("Exception occured while fetching all jobs.", ex);
+      throw new ServiceFormattedException(ex.getMessage(), ex);
+    }
+  }
+
+  /**
+   * Create job
+   */
+  @POST
+  @Consumes(MediaType.APPLICATION_JSON)
+  public Response create(JobRequest request, @Context HttpServletResponse response,
+                         @Context UriInfo ui) {
+    try {
+      Map jobInfo = PropertyUtils.describe(request.job);
+      Job job = new JobImpl(jobInfo);
+      JobController createdJobController = new JobServiceInternal().createJob(job, getResourceManager());
+      JSONObject jobObject = jsonObjectFromJob(createdJobController);
+      response.setHeader("Location",
+        String.format("%s/%s", ui.getAbsolutePath().toString(), job.getId()));
+      return Response.ok(jobObject).status(201).build();
+    } catch (WebApplicationException ex) {
+      LOG.error("Error occurred while creating job : ",ex);
+      throw ex;
+    } catch (ItemNotFound itemNotFound) {
+      LOG.error("Error occurred while creating job : ",itemNotFound);
+      throw new NotFoundFormattedException(itemNotFound.getMessage(), itemNotFound);
+    } catch (Throwable ex) {
+      LOG.error("Error occurred while creating job : ",ex);
+      throw new ServiceFormattedException(ex.getMessage(), ex);
+    }
+  }
+
+  /**
+   * Remove connection credentials
+   */
+  @DELETE
+  @Path("auth")
+  public Response removePassword() {
+    try {
+      //new UserLocalHiveAuthCredentials().remove(context);
+      //connectionLocal.remove(context);  // force reconnect on next get
+      return Response.ok().status(200).build();
+    } catch (WebApplicationException ex) {
+      throw ex;
+    } catch (Exception ex) {
+      throw new ServiceFormattedException(ex.getMessage(), ex);
+    }
+  }
+
+
+  /**
+   * Invalidate session
+   */
+  @DELETE
+  @Path("sessions/{sessionTag}")
+  public Response invalidateSession(@PathParam("sessionTag") String sessionTag) {
+    try {
+      //Connection connection = connectionLocal.get(context);
+      //connection.invalidateSessionByTag(sessionTag);
+      return Response.ok().build();
+    } catch (WebApplicationException ex) {
+      throw ex;
+    } catch (Exception ex) {
+      throw new ServiceFormattedException(ex.getMessage(), ex);
+    }
+  }
+
+  /**
+   * Session status
+   */
+  @GET
+  @Path("sessions/{sessionTag}")
+  @Produces(MediaType.APPLICATION_JSON)
+  public Response sessionStatus(@PathParam("sessionTag") String sessionTag) {
+    try {
+      //Connection connection = connectionLocal.get(context);
+
+      JSONObject session = new JSONObject();
+      session.put("sessionTag", sessionTag);
+      try {
+        //connection.getSessionByTag(sessionTag);
+        session.put("actual", true);
+      } catch (Exception /*HiveClientException*/ ex) {
+        session.put("actual", false);
+      }
+
+      //TODO: New implementation
+
+      JSONObject status = new JSONObject();
+      status.put("session", session);
+      return Response.ok(status).build();
+    } catch (WebApplicationException ex) {
+      throw ex;
+    } catch (Exception ex) {
+      throw new ServiceFormattedException(ex.getMessage(), ex);
+    }
+  }
+
+  /**
+   * Wrapper object for json mapping
+   */
+  public static class JobRequest {
+    public JobImpl job;
+  }
+
+  /**
+   * Wrapper for authentication json mapping
+   */
+  public static class AuthRequest {
+    public String password;
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/853a1ce7/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/jobs/JobServiceInternal.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/jobs/JobServiceInternal.java b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/jobs/JobServiceInternal.java
new file mode 100644
index 0000000..1409ba8
--- /dev/null
+++ b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/jobs/JobServiceInternal.java
@@ -0,0 +1,35 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.ambari.view.hive20.resources.jobs;
+
+import org.apache.ambari.view.hive20.resources.jobs.viewJobs.Job;
+import org.apache.ambari.view.hive20.resources.jobs.viewJobs.JobController;
+import org.apache.ambari.view.hive20.resources.jobs.viewJobs.JobResourceManager;
+
+public class JobServiceInternal {
+  public JobController createJob(Job job, JobResourceManager resourceManager) throws Throwable {
+    resourceManager.create(job);
+
+    JobController createdJobController = resourceManager.readController(job.getId());
+    createdJobController.submit();
+    resourceManager.saveIfModified(createdJobController);
+    return createdJobController;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/853a1ce7/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/jobs/ModifyNotificationDelegate.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/jobs/ModifyNotificationDelegate.java b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/jobs/ModifyNotificationDelegate.java
new file mode 100644
index 0000000..073cdc7
--- /dev/null
+++ b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/jobs/ModifyNotificationDelegate.java
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.view.hive20.resources.jobs;
+
+public interface ModifyNotificationDelegate {
+  boolean onModification(Object object);
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/853a1ce7/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/jobs/ModifyNotificationInvocationHandler.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/jobs/ModifyNotificationInvocationHandler.java b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/jobs/ModifyNotificationInvocationHandler.java
new file mode 100644
index 0000000..51058f5
--- /dev/null
+++ b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/jobs/ModifyNotificationInvocationHandler.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.view.hive20.resources.jobs;
+
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.Method;
+
+public class ModifyNotificationInvocationHandler implements InvocationHandler {
+  private Object proxied;
+  private ModifyNotificationDelegate modifyDelegate;
+
+  public ModifyNotificationInvocationHandler(Object proxied, ModifyNotificationDelegate delegate) {
+    this.proxied = proxied;
+    this.modifyDelegate = delegate;
+  }
+
+  @Override
+  public Object invoke(Object o, Method method, Object[] args) throws Throwable {
+    if (method.getName().startsWith("set")) {
+      modifyDelegate.onModification(proxied);
+    }
+    return method.invoke(proxied, args);
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/853a1ce7/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/jobs/NoOperationStatusSetException.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/jobs/NoOperationStatusSetException.java b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/jobs/NoOperationStatusSetException.java
new file mode 100644
index 0000000..31d97d0
--- /dev/null
+++ b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/jobs/NoOperationStatusSetException.java
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.view.hive20.resources.jobs;
+
+
+public class NoOperationStatusSetException extends Exception {
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/853a1ce7/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/jobs/ProgressRetriever.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/jobs/ProgressRetriever.java b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/jobs/ProgressRetriever.java
new file mode 100644
index 0000000..4d8c7d7
--- /dev/null
+++ b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/jobs/ProgressRetriever.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.view.hive20.resources.jobs;
+
+import org.apache.ambari.view.hive20.resources.jobs.atsJobs.TezVertexId;
+import org.apache.ambari.view.hive20.resources.jobs.rm.RMParser;
+import org.apache.ambari.view.hive20.resources.jobs.viewJobs.Job;
+import org.apache.ambari.view.hive20.utils.ServiceFormattedException;
+import org.apache.ambari.view.hive20.utils.SharedObjectsFactory;
+
+import java.util.List;
+
+public class ProgressRetriever {
+  private final Progress progress;
+  private final Job job;
+  private final SharedObjectsFactory sharedObjects;
+
+  public ProgressRetriever(Job job, SharedObjectsFactory sharedObjects) {
+    this.job = job;
+    this.sharedObjects = sharedObjects;
+
+    this.progress = new Progress();
+  }
+
+  public Progress getProgress() {
+    jobCheck();
+
+    progress.dagProgress = sharedObjects.getRMParser().getDAGProgress(
+        job.getApplicationId(), job.getDagId());
+
+    List<TezVertexId> vertices = sharedObjects.getATSParser().getVerticesForDAGId(job.getDagId());
+    progress.vertexProgresses = sharedObjects.getRMParser().getDAGVerticesProgress(job.getApplicationId(), job.getDagId(), vertices);
+
+    return progress;
+  }
+
+  public void jobCheck() {
+    if (job.getApplicationId() == null || job.getApplicationId().isEmpty()) {
+      throw new ServiceFormattedException("E070 ApplicationId is not defined yet");
+    }
+    if (job.getDagId() == null || job.getDagId().isEmpty()) {
+      throw new ServiceFormattedException("E080 DagID is not defined yet");
+    }
+  }
+
+  public static class Progress {
+    public Double dagProgress;
+    public List<RMParser.VertexProgress> vertexProgresses;
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/853a1ce7/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/jobs/ResultsPaginationController.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/jobs/ResultsPaginationController.java b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/jobs/ResultsPaginationController.java
new file mode 100644
index 0000000..6efa2a9
--- /dev/null
+++ b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/jobs/ResultsPaginationController.java
@@ -0,0 +1,286 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.view.hive20.resources.jobs;
+
+
+import com.google.common.base.Function;
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import org.apache.ambari.view.ViewContext;
+import org.apache.ambari.view.hive20.client.ColumnDescription;
+import org.apache.ambari.view.hive20.client.Cursor;
+import org.apache.ambari.view.hive20.client.HiveClientException;
+import org.apache.ambari.view.hive20.client.Row;
+import org.apache.ambari.view.hive20.utils.BadRequestFormattedException;
+import org.apache.ambari.view.hive20.utils.ResultFetchFormattedException;
+import org.apache.ambari.view.hive20.utils.ResultNotReadyFormattedException;
+import org.apache.ambari.view.hive20.utils.ServiceFormattedException;
+import org.apache.commons.collections4.map.PassiveExpiringMap;
+import org.apache.hadoop.hbase.util.Strings;
+
+import javax.ws.rs.core.Response;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Callable;
+
+/**
+ * Results Pagination Controller
+ * Persists cursors for result sets
+ */
+public class ResultsPaginationController {
+  public static final String DEFAULT_SEARCH_ID = "default";
+  private static Map<String, ResultsPaginationController> viewSingletonObjects = new HashMap<String, ResultsPaginationController>();
+  public static ResultsPaginationController getInstance(ViewContext context) {
+    if (!viewSingletonObjects.containsKey(context.getInstanceName()))
+      viewSingletonObjects.put(context.getInstanceName(), new ResultsPaginationController());
+    return viewSingletonObjects.get(context.getInstanceName());
+  }
+
+  public ResultsPaginationController() {
+  }
+
+  private static final long EXPIRING_TIME = 10*60*1000;  // 10 minutes
+  private static final int DEFAULT_FETCH_COUNT = 50;
+  private Map<String, Cursor<Row, ColumnDescription>> resultsCache;
+
+  public static class CustomTimeToLiveExpirationPolicy extends PassiveExpiringMap.ConstantTimeToLiveExpirationPolicy<String, Cursor<Row, ColumnDescription>> {
+    public CustomTimeToLiveExpirationPolicy(long timeToLiveMillis) {
+      super(timeToLiveMillis);
+    }
+
+    @Override
+    public long expirationTime(String key, Cursor<Row, ColumnDescription> value) {
+      if (key.startsWith("$")) {
+        return -1;  //never expire
+      }
+      return super.expirationTime(key, value);
+    }
+  }
+
+  private Map<String, Cursor<Row, ColumnDescription>> getResultsCache() {
+    if (resultsCache == null) {
+      PassiveExpiringMap<String, Cursor<Row, ColumnDescription>> resultsCacheExpiringMap =
+          new PassiveExpiringMap<>(new CustomTimeToLiveExpirationPolicy(EXPIRING_TIME));
+      resultsCache = Collections.synchronizedMap(resultsCacheExpiringMap);
+    }
+    return resultsCache;
+  }
+
+  /**
+   * Renew timer of cache entry.
+   * @param key name/id of results request
+   * @return false if entry not found; true if renew was ok
+   */
+  public boolean keepAlive(String key, String searchId) {
+    if (searchId == null)
+      searchId = DEFAULT_SEARCH_ID;
+    String effectiveKey = key + "?" + searchId;
+    if (!getResultsCache().containsKey(effectiveKey)) {
+      return false;
+    }
+    Cursor cursor = getResultsCache().get(effectiveKey);
+    getResultsCache().put(effectiveKey, cursor);
+    cursor.keepAlive();
+    return true;
+  }
+
+  private Cursor<Row, ColumnDescription> getResultsSet(String key, Callable<Cursor<Row, ColumnDescription>> makeResultsSet) {
+    if (!getResultsCache().containsKey(key)) {
+      Cursor resultSet;
+      try {
+        resultSet = makeResultsSet.call();
+        if (resultSet.isResettable()) {
+          resultSet.reset();
+        }
+      } catch (ResultNotReadyFormattedException | ResultFetchFormattedException ex) {
+        throw ex;
+      } catch (Exception ex) {
+        throw new ServiceFormattedException(ex.getMessage(), ex);
+      }
+      getResultsCache().put(key, resultSet);
+    }
+
+    return getResultsCache().get(key);
+  }
+
+  public Response.ResponseBuilder request(String key, String searchId, boolean canExpire, String fromBeginning, Integer count, String format, String requestedColumns, Callable<Cursor<Row, ColumnDescription>> makeResultsSet) throws HiveClientException {
+    if (searchId == null)
+      searchId = DEFAULT_SEARCH_ID;
+    key = key + "?" + searchId;
+    if (!canExpire)
+      key = "$" + key;
+    if (fromBeginning != null && fromBeginning.equals("true") && getResultsCache().containsKey(key)) {
+
+      getResultsCache().remove(key);
+    }
+
+    Cursor<Row, ColumnDescription> resultSet = getResultsSet(key, makeResultsSet);
+
+    if (count == null)
+      count = DEFAULT_FETCH_COUNT;
+
+    List<ColumnDescription> allschema = resultSet.getDescriptions();
+    List<Row> allRowEntries = FluentIterable.from(resultSet)
+      .limit(count).toList();
+
+    List<ColumnDescription> schema = allschema;
+
+    final Set<Integer> selectedColumns = getRequestedColumns(requestedColumns);
+    if (!selectedColumns.isEmpty()) {
+      schema = filter(allschema, selectedColumns);
+    }
+
+    List<Object[]> rows = FluentIterable.from(allRowEntries)
+      .transform(new Function<Row, Object[]>() {
+        @Override
+        public Object[] apply(Row input) {
+          if(!selectedColumns.isEmpty()) {
+            return filter(Lists.newArrayList(input.getRow()), selectedColumns).toArray();
+          } else {
+            return input.getRow();
+          }
+        }
+      }).toList();
+
+    int read = rows.size();
+    if(format != null && format.equalsIgnoreCase("d3")) {
+      List<Map<String,Object>> results = new ArrayList<>();
+      for(int i=0; i<rows.size(); i++) {
+        Object[] row = rows.get(i);
+        Map<String, Object> keyValue = new HashMap<>(row.length);
+        for(int j=0; j<row.length; j++) {
+          //Replace dots in schema with underscore
+          String schemaName = schema.get(j).getName();
+          keyValue.put(schemaName.replace('.','_'), row[j]);
+        }
+        results.add(keyValue);
+      }
+      return Response.ok(results);
+    } else {
+      ResultsResponse resultsResponse = new ResultsResponse();
+      resultsResponse.setSchema(schema);
+      resultsResponse.setRows(rows);
+      resultsResponse.setReadCount(read);
+      resultsResponse.setHasNext(resultSet.hasNext());
+      //      resultsResponse.setSize(resultSet.size());
+      resultsResponse.setOffset(resultSet.getOffset());
+      resultsResponse.setHasResults(true);
+      return Response.ok(resultsResponse);
+    }
+  }
+
+  private <T> List<T> filter(List<T> list, Set<Integer> selectedColumns) {
+    List<T> filtered = Lists.newArrayList();
+    for(int i: selectedColumns) {
+      if(list != null && list.get(i) != null)
+        filtered.add(list.get(i));
+    }
+
+    return filtered;
+  }
+
+  private Set<Integer> getRequestedColumns(String requestedColumns) {
+    if(Strings.isEmpty(requestedColumns)) {
+      return new HashSet<>();
+    }
+    Set<Integer> selectedColumns = Sets.newHashSet();
+    for (String columnRequested : requestedColumns.split(",")) {
+      try {
+        selectedColumns.add(Integer.parseInt(columnRequested));
+      } catch (NumberFormatException ex) {
+        throw new BadRequestFormattedException("Columns param should be comma-separated integers", ex);
+      }
+    }
+    return selectedColumns;
+  }
+
+  private static class ResultsResponse {
+    private List<ColumnDescription> schema;
+    private List<String[]> rows;
+    private int readCount;
+    private boolean hasNext;
+    private long offset;
+    private boolean hasResults;
+
+    public void setSchema(List<ColumnDescription> schema) {
+      this.schema = schema;
+    }
+
+    public List<ColumnDescription> getSchema() {
+      return schema;
+    }
+
+    public void setRows(List<Object[]> rows) {
+      if( null == rows ){
+        this.rows = null;
+      }
+      this.rows = new ArrayList<String[]>(rows.size());
+      for(Object[] row : rows ){
+        String[] strs = new String[row.length];
+        for( int colNum = 0 ; colNum < row.length ; colNum++ ){
+          String value = String.valueOf(row[colNum]);
+          if(row[colNum] != null && (value.isEmpty() || value.equalsIgnoreCase("null"))){
+            strs[colNum] = String.format("\"%s\"",value);
+          }else{
+            strs[colNum] = value;
+          }
+        }
+        this.rows.add(strs);
+      }
+    }
+
+    public List<String[]> getRows() {
+      return rows;
+    }
+
+    public void setReadCount(int readCount) {
+      this.readCount = readCount;
+    }
+
+    public void setHasNext(boolean hasNext) {
+      this.hasNext = hasNext;
+    }
+
+    public boolean isHasNext() {
+      return hasNext;
+    }
+
+    public long getOffset() {
+      return offset;
+    }
+
+    public void setOffset(long offset) {
+      this.offset = offset;
+    }
+
+    public boolean getHasResults() {
+      return hasResults;
+    }
+
+    public void setHasResults(boolean hasResults) {
+      this.hasResults = hasResults;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/853a1ce7/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/jobs/atsJobs/ATSParser.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/jobs/atsJobs/ATSParser.java b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/jobs/atsJobs/ATSParser.java
new file mode 100644
index 0000000..6e9753d
--- /dev/null
+++ b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/jobs/atsJobs/ATSParser.java
@@ -0,0 +1,248 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.view.hive20.resources.jobs.atsJobs;
+
+import org.json.simple.JSONArray;
+import org.json.simple.JSONObject;
+import org.json.simple.JSONValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ * Parser of ATS responses
+ */
+public class ATSParser implements IATSParser {
+  protected final static Logger LOG =
+    LoggerFactory.getLogger(ATSParser.class);
+
+  private ATSRequestsDelegate delegate;
+
+  private static final long MillisInSecond = 1000L;
+
+  public ATSParser(ATSRequestsDelegate delegate) {
+    this.delegate = delegate;
+  }
+
+  /**
+   * returns all HiveQueryIDs from ATS for the given user.
+   * @param username
+   * @return
+   */
+  @Override
+  public List<HiveQueryId> getHiveQueryIdsForUser(String username) {
+    JSONObject entities = delegate.hiveQueryIdsForUser(username);
+    return parseHqidJsonFromATS(entities);
+  }
+
+  /**
+   * parses the JSONArray or hive query IDs
+   * @param entities: should contain 'entities' element as JSONArray
+   * @return
+   */
+  private List<HiveQueryId> parseHqidJsonFromATS(JSONObject entities) {
+    JSONArray jobs = (JSONArray) entities.get("entities");
+
+    return getHqidListFromJsonArray(jobs);
+  }
+
+  /**
+   * parses List of HiveQueryIds from JSON
+   * @param jobs
+   * @return
+   */
+  private List<HiveQueryId> getHqidListFromJsonArray(JSONArray jobs) {
+    List<HiveQueryId> parsedJobs = new LinkedList<>();
+    for (Object job : jobs) {
+      try {
+        HiveQueryId parsedJob = parseAtsHiveJob((JSONObject) job);
+        parsedJobs.add(parsedJob);
+      } catch (Exception ex) {
+        LOG.error("Error while parsing ATS job", ex);
+      }
+    }
+
+    return parsedJobs;
+  }
+
+  @Override
+  public List<TezVertexId> getVerticesForDAGId(String dagId) {
+    JSONObject entities = delegate.tezVerticesListForDAG(dagId);
+    JSONArray vertices = (JSONArray) entities.get("entities");
+
+    List<TezVertexId> parsedVertices = new LinkedList<TezVertexId>();
+    for(Object vertex : vertices) {
+      try {
+        TezVertexId parsedVertex = parseVertex((JSONObject) vertex);
+        parsedVertices.add(parsedVertex);
+      } catch (Exception ex) {
+        LOG.error("Error while parsing the vertex", ex);
+      }
+    }
+
+    return parsedVertices;
+  }
+
+  @Override
+  public HiveQueryId getHiveQueryIdByOperationId(String guidString) {
+    JSONObject entities = delegate.hiveQueryIdByOperationId(guidString);
+    return getHiveQueryIdFromJson(entities);
+  }
+
+  private HiveQueryId getHiveQueryIdFromJson(JSONObject entities) {
+    JSONArray jobs = (JSONArray) entities.get("entities");
+
+    if (jobs.size() == 0) {
+      return new HiveQueryId();
+    }
+
+    return parseAtsHiveJob((JSONObject) jobs.get(0));
+  }
+
+  /**
+   * returns the hive entity from ATS. empty object if not found.
+   *
+   * @param hiveId: the entityId of the hive
+   * @return: empty entity if not found else HiveQueryId
+   */
+  @Override
+  public HiveQueryId getHiveQueryIdByHiveEntityId(String hiveId) {
+    JSONObject entity = delegate.hiveQueryEntityByEntityId(hiveId);
+    return parseAtsHiveJob(entity);
+  }
+
+  @Override
+  public TezDagId getTezDAGByName(String name) {
+    JSONArray tezDagEntities = (JSONArray) delegate.tezDagByName(name).get("entities");
+    return parseTezDag(tezDagEntities);
+  }
+
+  @Override
+  public TezDagId getTezDAGByEntity(String entity) {
+    JSONArray tezDagEntities = (JSONArray) delegate.tezDagByEntity(entity).get("entities");
+    return parseTezDag(tezDagEntities);
+  }
+
+  /**
+   * fetches the HIVE_QUERY_ID from ATS for given user between given time period
+   *
+   * @param username:  username for which to fetch hive query IDs
+   * @param startTime: time in miliseconds, inclusive
+   * @param endTime:   time in miliseconds, exclusive
+   * @return: List of HIVE_QUERY_ID
+   */
+  @Override
+  public List<HiveQueryId> getHiveQueryIdsForUserByTime(String username, long startTime, long endTime) {
+    JSONObject entities = delegate.hiveQueryIdsForUserByTime(username, startTime, endTime);
+    return parseHqidJsonFromATS(entities);
+  }
+
+  @Override
+  public List<HiveQueryId> getHiveQueryIdByEntityList(List<String> hiveIds) {
+    List<HiveQueryId> hiveQueryIds = new LinkedList<>();
+    for (String id : hiveIds) {
+      HiveQueryId hqi = this.getHiveQueryIdByHiveEntityId(id);
+      if (null != hqi.entity) {
+        hiveQueryIds.add(hqi);
+      }
+    }
+    return hiveQueryIds;
+  }
+
+  private TezDagId parseTezDag(JSONArray tezDagEntities) {
+    assert tezDagEntities.size() <= 1;
+    if (tezDagEntities.size() == 0) {
+      return new TezDagId();
+    }
+    JSONObject tezDagEntity = (JSONObject) tezDagEntities.get(0);
+
+    TezDagId parsedDag = new TezDagId();
+    JSONArray applicationIds = (JSONArray) ((JSONObject) tezDagEntity.get("primaryfilters")).get("applicationId");
+    parsedDag.entity = (String) tezDagEntity.get("entity");
+    parsedDag.applicationId = (String) applicationIds.get(0);
+    parsedDag.status = (String) ((JSONObject) tezDagEntity.get("otherinfo")).get("status");
+    return parsedDag;
+  }
+
+  private HiveQueryId parseAtsHiveJob(JSONObject job) {
+    HiveQueryId parsedJob = new HiveQueryId();
+
+    parsedJob.entity = (String) job.get("entity");
+    parsedJob.url = delegate.hiveQueryIdDirectUrl((String) job.get("entity"));
+    parsedJob.starttime = ((Long) job.get("starttime"));
+
+    JSONObject primaryfilters = (JSONObject) job.get("primaryfilters");
+    JSONArray operationIds = (JSONArray) primaryfilters.get("operationid");
+    if (operationIds != null) {
+      parsedJob.operationId = (String) (operationIds).get(0);
+    }
+    JSONArray users = (JSONArray) primaryfilters.get("user");
+    if (users != null) {
+      parsedJob.user = (String) (users).get(0);
+    }
+
+    JSONObject lastEvent = getLastEvent(job);
+    long lastEventTimestamp = ((Long) lastEvent.get("timestamp"));
+
+    parsedJob.duration = (lastEventTimestamp - parsedJob.starttime) / MillisInSecond;
+
+    JSONObject otherinfo = (JSONObject) job.get("otherinfo");
+    if (otherinfo.get("QUERY") != null) {  // workaround for HIVE-10829
+      JSONObject query = (JSONObject) JSONValue.parse((String) otherinfo.get("QUERY"));
+
+      parsedJob.query = (String) query.get("queryText");
+      JSONObject stages = (JSONObject) ((JSONObject) query.get("queryPlan")).get("STAGE PLANS");
+
+      List<String> dagIds = new LinkedList<String>();
+      List<JSONObject> stagesList = new LinkedList<JSONObject>();
+
+      for (Object key : stages.keySet()) {
+        JSONObject stage = (JSONObject) stages.get(key);
+        if (stage.get("Tez") != null) {
+          String dagId = (String) ((JSONObject) stage.get("Tez")).get("DagId:");
+          dagIds.add(dagId);
+        }
+        stagesList.add(stage);
+      }
+      parsedJob.dagNames = dagIds;
+      parsedJob.stages = stagesList;
+    }
+
+    if (otherinfo.get("VERSION") != null) {
+      parsedJob.version = (Long) otherinfo.get("VERSION");
+    }
+    return parsedJob;
+  }
+
+  private TezVertexId parseVertex(JSONObject vertex) {
+    TezVertexId tezVertexId = new TezVertexId();
+    tezVertexId.entity = (String)vertex.get("entity");
+    JSONObject otherinfo = (JSONObject)vertex.get("otherinfo");
+    if (otherinfo != null)
+      tezVertexId.vertexName = (String)otherinfo.get("vertexName");
+    return tezVertexId;
+  }
+
+  private JSONObject getLastEvent(JSONObject atsEntity) {
+    JSONArray events = (JSONArray) atsEntity.get("events");
+    return (JSONObject) events.get(0);
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/853a1ce7/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/jobs/atsJobs/ATSParserFactory.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/jobs/atsJobs/ATSParserFactory.java b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/jobs/atsJobs/ATSParserFactory.java
new file mode 100644
index 0000000..343202e
--- /dev/null
+++ b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/jobs/atsJobs/ATSParserFactory.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.view.hive20.resources.jobs.atsJobs;
+
+import org.apache.ambari.view.ViewContext;
+
+public class ATSParserFactory {
+
+  private ViewContext context;
+
+  public ATSParserFactory(ViewContext context) {
+    this.context = context;
+  }
+
+  public ATSParser getATSParser() {
+    ATSRequestsDelegateImpl delegate = new ATSRequestsDelegateImpl(context, getATSUrl(context));
+    return new ATSParser(delegate);
+  }
+
+  public static String getATSUrl(ViewContext context) {
+    return context.getProperties().get("yarn.ats.url");
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/853a1ce7/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/jobs/atsJobs/ATSRequestsDelegate.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/jobs/atsJobs/ATSRequestsDelegate.java b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/jobs/atsJobs/ATSRequestsDelegate.java
new file mode 100644
index 0000000..dac42aa
--- /dev/null
+++ b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/jobs/atsJobs/ATSRequestsDelegate.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.view.hive20.resources.jobs.atsJobs;
+
+import org.json.simple.JSONObject;
+
+public interface ATSRequestsDelegate {
+  String hiveQueryIdDirectUrl(String entity);
+
+  String hiveQueryIdOperationIdUrl(String operationId);
+
+  String tezDagDirectUrl(String entity);
+
+  String tezDagNameUrl(String name);
+
+  String tezVerticesListForDAGUrl(String dagId);
+
+  JSONObject hiveQueryIdsForUser(String username);
+
+  JSONObject hiveQueryIdByOperationId(String operationId);
+
+  JSONObject tezDagByName(String name);
+
+  JSONObject tezVerticesListForDAG(String dagId);
+
+  JSONObject tezDagByEntity(String entity);
+
+  JSONObject hiveQueryIdsForUserByTime(String username, long startTime, long endTime);
+
+  JSONObject hiveQueryEntityByEntityId(String hiveEntityId);
+}