You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by nc...@apache.org on 2017/01/05 00:04:54 UTC
[13/50] [abbrv] ambari git commit: AMBARI-19321 : Hive View 2.0 -
Minimal view for Hive which includes new UI changes. Also made changes in
poms as required (nitirajrathore)
http://git-wip-us.apache.org/repos/asf/ambari/blob/853a1ce7/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/files/FileService.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/files/FileService.java b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/files/FileService.java
new file mode 100644
index 0000000..a3623e9
--- /dev/null
+++ b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/files/FileService.java
@@ -0,0 +1,266 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.view.hive20.resources.files;
+
+import com.jayway.jsonpath.JsonPath;
+import org.apache.ambari.view.ViewContext;
+import org.apache.ambari.view.ViewResourceHandler;
+import org.apache.ambari.view.commons.hdfs.UserService;
+import org.apache.ambari.view.hive20.BaseService;
+import org.apache.ambari.view.hive20.utils.*;
+import org.apache.ambari.view.utils.hdfs.HdfsApi;
+import org.apache.ambari.view.utils.hdfs.HdfsUtil;
+import org.apache.commons.codec.binary.Base64;
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileAlreadyExistsException;
+import org.json.simple.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.inject.Inject;
+import javax.servlet.http.HttpServletResponse;
+import javax.ws.rs.*;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import javax.ws.rs.core.UriInfo;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URL;
+import java.util.HashMap;
+
+/**
+ * File access resource
+ * API:
+ * GET /:path
+ * read entire file
+ * POST /
+ * create new file
+ * Required: filePath
+ * file should not already exists
+ * PUT /:path
+ * update file content
+ */
+public class FileService extends BaseService {
+ public static final String FAKE_FILE = "fakefile://";
+ public static final String JSON_PATH_FILE = "jsonpath:";
+
+ @Inject
+ ViewResourceHandler handler;
+
+ protected final static Logger LOG =
+ LoggerFactory.getLogger(FileService.class);
+
+ /**
+ * Get single item
+ */
+ @GET
+ @Path("{filePath:.*}")
+ @Produces(MediaType.APPLICATION_JSON)
+ public Response getFilePage(@PathParam("filePath") String filePath, @QueryParam("page") Long page) throws IOException, InterruptedException {
+
+ LOG.debug("Reading file " + filePath);
+ try {
+ FileResource file = new FileResource();
+
+ if (page == null)
+ page = 0L;
+
+ if (filePath.startsWith(FAKE_FILE)) {
+ if (page > 1)
+ throw new IllegalArgumentException("There's only one page in fake files");
+
+ String encodedContent = filePath.substring(FAKE_FILE.length());
+ String content = new String(Base64.decodeBase64(encodedContent));
+
+ fillFakeFileObject(filePath, file, content);
+ } else if (filePath.startsWith(JSON_PATH_FILE)) {
+ if (page > 1)
+ throw new IllegalArgumentException("There's only one page in fake files");
+
+ String content = getJsonPathContentByUrl(filePath);
+ fillFakeFileObject(filePath, file, content);
+ } else {
+
+ filePath = sanitizeFilePath(filePath);
+ FilePaginator paginator = new FilePaginator(filePath, getSharedObjectsFactory().getHdfsApi());
+
+ fillRealFileObject(filePath, page, file, paginator);
+ }
+
+ JSONObject object = new JSONObject();
+ object.put("file", file);
+ return Response.ok(object).status(200).build();
+ } catch (WebApplicationException ex) {
+ throw ex;
+ } catch (FileNotFoundException ex) {
+ throw new NotFoundFormattedException(ex.getMessage(), ex);
+ } catch (IllegalArgumentException ex) {
+ throw new BadRequestFormattedException(ex.getMessage(), ex);
+ } catch (Exception ex) {
+ throw new ServiceFormattedException(ex.getMessage(), ex);
+ }
+ }
+
+ protected String getJsonPathContentByUrl(String filePath) throws IOException {
+ URL url = new URL(filePath.substring(JSON_PATH_FILE.length()));
+
+ InputStream responseInputStream = context.getURLStreamProvider().readFrom(url.toString(), "GET",
+ (String)null, new HashMap<String, String>());
+ String response = IOUtils.toString(responseInputStream);
+
+ for (String ref : url.getRef().split("!")) {
+ response = JsonPath.read(response, ref);
+ }
+ return response;
+ }
+
+ public void fillRealFileObject(String filePath, Long page, FileResource file, FilePaginator paginator) throws IOException, InterruptedException {
+ file.setFilePath(filePath);
+ file.setFileContent(paginator.readPage(page));
+ file.setHasNext(paginator.pageCount() > page + 1);
+ file.setPage(page);
+ file.setPageCount(paginator.pageCount());
+ }
+
+ public void fillFakeFileObject(String filePath, FileResource file, String content) {
+ file.setFilePath(filePath);
+ file.setFileContent(content);
+ file.setHasNext(false);
+ file.setPage(0);
+ file.setPageCount(1);
+ }
+
+ /**
+ * Delete single item
+ */
+ @DELETE
+ @Path("{filePath:.*}")
+ public Response deleteFile(@PathParam("filePath") String filePath) throws IOException, InterruptedException {
+ try {
+ filePath = sanitizeFilePath(filePath);
+ LOG.debug("Deleting file " + filePath);
+ if (getSharedObjectsFactory().getHdfsApi().delete(filePath, false)) {
+ return Response.status(204).build();
+ }
+ throw new NotFoundFormattedException("FileSystem.delete returned false", null);
+ } catch (WebApplicationException ex) {
+ throw ex;
+ } catch (Exception ex) {
+ throw new ServiceFormattedException(ex.getMessage(), ex);
+ }
+ }
+
+ /**
+ * Update item
+ */
+ @PUT
+ @Path("{filePath:.*}")
+ @Consumes(MediaType.APPLICATION_JSON)
+ public Response updateFile(FileResourceRequest request,
+ @PathParam("filePath") String filePath) throws IOException, InterruptedException {
+ try {
+ filePath = sanitizeFilePath(filePath);
+ LOG.debug("Rewriting file " + filePath);
+ FSDataOutputStream output = getSharedObjectsFactory().getHdfsApi().create(filePath, true);
+ output.writeBytes(request.file.getFileContent());
+ output.close();
+ return Response.status(204).build();
+ } catch (WebApplicationException ex) {
+ throw ex;
+ } catch (Exception ex) {
+ throw new ServiceFormattedException(ex.getMessage(), ex);
+ }
+ }
+
+ /**
+ * Create script
+ */
+ @POST
+ @Consumes(MediaType.APPLICATION_JSON)
+ public Response createFile(FileResourceRequest request,
+ @Context HttpServletResponse response, @Context UriInfo ui)
+ throws IOException, InterruptedException {
+ try {
+ LOG.debug("Creating file " + request.file.getFilePath());
+ try {
+ FSDataOutputStream output = getSharedObjectsFactory().getHdfsApi().create(request.file.getFilePath(), false);
+ if (request.file.getFileContent() != null) {
+ output.writeBytes(request.file.getFileContent());
+ }
+ output.close();
+ } catch (FileAlreadyExistsException ex) {
+ throw new ServiceFormattedException("F020 File already exists", ex, 400);
+ }
+ response.setHeader("Location",
+ String.format("%s/%s", ui.getAbsolutePath().toString(), request.file.getFilePath()));
+ return Response.status(204).build();
+ } catch (WebApplicationException ex) {
+ throw ex;
+ } catch (Exception ex) {
+ throw new ServiceFormattedException(ex.getMessage(), ex);
+ }
+ }
+
+ /**
+ * Checks connection to HDFS
+ * @param context View Context
+ */
+ public static void hdfsSmokeTest(ViewContext context) {
+ try {
+ HdfsApi api = HdfsUtil.connectToHDFSApi(context);
+ api.getStatus();
+ } catch (WebApplicationException ex) {
+ throw ex;
+ } catch (Exception ex) {
+ throw new ServiceFormattedException(ex.getMessage(), ex);
+ }
+ }
+
+ /**
+ * Checks connection to User HomeDirectory
+ * @param context View Context
+ */
+ public static void userhomeSmokeTest(ViewContext context) {
+ try {
+ UserService userservice = new UserService(context);
+ userservice.homeDir();
+ } catch (WebApplicationException ex) {
+ throw ex;
+ } catch (Exception ex) {
+ throw new ServiceFormattedException(ex.getMessage(), ex);
+ }
+ }
+
+ /**
+ * Wrapper object for json mapping
+ */
+ public static class FileResourceRequest {
+ public FileResource file;
+ }
+
+ private String sanitizeFilePath(String filePath){
+ if (!filePath.startsWith("/") && !filePath.startsWith(".")) {
+ filePath = "/" + filePath; // some servers strip double slashes in URL
+ }
+ return filePath;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/853a1ce7/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/jobs/Aggregator.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/jobs/Aggregator.java b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/jobs/Aggregator.java
new file mode 100644
index 0000000..c70585e
--- /dev/null
+++ b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/jobs/Aggregator.java
@@ -0,0 +1,382 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.view.hive20.resources.jobs;
+
+import akka.actor.ActorRef;
+import org.apache.ambari.view.hive20.actor.message.job.SaveDagInformation;
+import org.apache.ambari.view.hive20.persistence.utils.FilteringStrategy;
+import org.apache.ambari.view.hive20.persistence.utils.Indexed;
+import org.apache.ambari.view.hive20.persistence.utils.ItemNotFound;
+import org.apache.ambari.view.hive20.persistence.utils.OnlyOwnersFilteringStrategy;
+import org.apache.ambari.view.hive20.resources.IResourceManager;
+import org.apache.ambari.view.hive20.resources.files.FileService;
+import org.apache.ambari.view.hive20.resources.jobs.atsJobs.HiveQueryId;
+import org.apache.ambari.view.hive20.resources.jobs.atsJobs.IATSParser;
+import org.apache.ambari.view.hive20.resources.jobs.atsJobs.TezDagId;
+import org.apache.ambari.view.hive20.resources.jobs.viewJobs.Job;
+import org.apache.ambari.view.hive20.resources.jobs.viewJobs.JobImpl;
+import org.apache.ambari.view.hive20.resources.jobs.viewJobs.JobInfo;
+import org.apache.commons.beanutils.PropertyUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.InvocationTargetException;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ * View Jobs and ATS Jobs aggregator.
+ * There are 4 options:
+ * 1) ATS ExecuteJob without operationId
+ * *Meaning*: executed outside of HS2
+ * - ExecuteJob info only from ATS
+ * 2) ATS ExecuteJob with operationId
+ * a) Hive View ExecuteJob with same operationId is not present
+ * *Meaning*: executed with HS2
+ * - ExecuteJob info only from ATS
+ * b) Hive View ExecuteJob with operationId is present (need to merge)
+ * *Meaning*: executed with HS2 through Hive View
+ * - ExecuteJob info merged from ATS and from Hive View DataStorage
+ * 3) ExecuteJob present only in Hive View, ATS does not have it
+ * *Meaning*: executed through Hive View, but Hadoop ExecuteJob was not created
+ * it can happen if user executes query without aggregation, like just "select * from TABLE"
+ * - ExecuteJob info only from Hive View
+ */
+public class Aggregator {
+ protected final static Logger LOG =
+ LoggerFactory.getLogger(Aggregator.class);
+
+ private final IATSParser ats;
+ private IResourceManager<Job> viewJobResourceManager;
+ private final ActorRef operationController;
+
+ public Aggregator(IResourceManager<Job> jobResourceManager,
+ IATSParser ats, ActorRef operationController) {
+ this.viewJobResourceManager = jobResourceManager;
+ this.ats = ats;
+ this.operationController = operationController;
+ }
+
+ /**
+ * gets all the jobs for 'username' where the job submission time is between 'startTime' (inclusive)
+ * and endTime (exclusive).
+ * Fetches the jobs from ATS and DB merges and update DB. returns the combined list.
+ *
+ * @param username: username for which jobs have to be fetched.
+ * @param startTime: inclusive, time in secs from epoch
+ * @param endTime: exclusive, time in secs from epoch
+ * @return: list of jobs
+ */
+ public List<Job> readAllForUserByTime(String username, long startTime, long endTime) {
+ List<HiveQueryId> queryIdList = ats.getHiveQueryIdsForUserByTime(username, startTime, endTime);
+ List<Job> allJobs = fetchDagsAndMergeJobs(queryIdList);
+ List<Job> dbOnlyJobs = readDBOnlyJobs(username, queryIdList, startTime, endTime);
+ allJobs.addAll(dbOnlyJobs);
+
+ return allJobs;
+ }
+
+ /**
+ * fetches the new state of jobs from ATS and from DB. Does merging/updating as required.
+ * @param jobInfos: infos of job to get
+ * @return: list of updated Job
+ */
+ public List<Job> readJobsByIds(List<JobInfo> jobInfos) {
+ //categorize jobs
+ List<String> jobsWithHiveIds = new LinkedList<>();
+ List<String> dbOnlyJobs = new LinkedList<>();
+
+ for (JobInfo jobInfo : jobInfos) {
+ if (null == jobInfo.getHiveId() || jobInfo.getHiveId().trim().isEmpty()) {
+ dbOnlyJobs.add(jobInfo.getJobId());
+ } else {
+ jobsWithHiveIds.add(jobInfo.getHiveId());
+ }
+ }
+
+ List<HiveQueryId> queryIdList = ats.getHiveQueryIdByEntityList(jobsWithHiveIds);
+ List<Job> allJobs = fetchDagsAndMergeJobs(queryIdList);
+ List<Job> dbJobs = readJobsFromDbByJobId(dbOnlyJobs);
+
+ allJobs.addAll(dbJobs);
+ return allJobs;
+ }
+
+ /**
+ * gets the jobs from the Database given their id
+ * @param jobsIds: list of ids of jobs
+ * @return: list of all the jobs found
+ */
+ private List<Job> readJobsFromDbByJobId(List<String> jobsIds) {
+ List<Job> jobs = new LinkedList<>();
+ for (final String jid : jobsIds) {
+ try {
+ Job job = getJobFromDbByJobId(jid);
+ jobs.add(job);
+ } catch (ItemNotFound itemNotFound) {
+ LOG.error("Error while finding job with id : {}", jid, itemNotFound);
+ }
+ }
+
+ return jobs;
+ }
+
+ /**
+ * fetches the job from DB given its id
+ * @param jobId: the id of the job to fetch
+ * @return: the job
+ * @throws ItemNotFound: if job with given id is not found in db
+ */
+ private Job getJobFromDbByJobId(final String jobId) throws ItemNotFound {
+ if (null == jobId)
+ return null;
+
+ List<Job> jobs = viewJobResourceManager.readAll(new FilteringStrategy() {
+ @Override
+ public boolean isConform(Indexed item) {
+ return item.getId().equals(jobId);
+ }
+
+ @Override
+ public String whereStatement() {
+ return "id = '" + jobId + "'"; // even IDs are string
+ }
+ });
+
+ if (null != jobs && !jobs.isEmpty())
+ return jobs.get(0);
+
+ throw new ItemNotFound(String.format("Job with id %s not found.", jobId));
+ }
+
+ /**
+ * returns all the jobs from ATS and DB (for this instance) for the given user.
+ * @param username
+ * @return
+ */
+ public List<Job> readAll(String username) {
+ List<HiveQueryId> queries = ats.getHiveQueryIdsForUser(username);
+ LOG.debug("HiveQueryIds fetched : {}", queries);
+ List<Job> allJobs = fetchDagsAndMergeJobs(queries);
+ List<Job> dbOnlyJobs = readDBOnlyJobs(username, queries, null, null);
+ LOG.debug("Jobs only present in DB: {}", dbOnlyJobs);
+ allJobs.addAll(dbOnlyJobs);
+ return allJobs;
+ }
+
+ /**
+ * reads all the jobs from DB for username and excludes the jobs mentioned in queries list
+ * @param username : username for which the jobs are to be read.
+ * @param queries : the jobs to exclude
+ * @param startTime: can be null, if not then the window start time for job
+ * @param endTime: can be null, if not then the window end time for job
+ * @return : the jobs in db that are not in the queries
+ */
+ private List<Job> readDBOnlyJobs(String username, List<HiveQueryId> queries, Long startTime, Long endTime) {
+ List<Job> dbOnlyJobs = new LinkedList<>();
+ HashMap<String, String> operationIdVsHiveId = new HashMap<>();
+
+ for (HiveQueryId hqid : queries) {
+ operationIdVsHiveId.put(hqid.operationId, hqid.entity);
+ }
+ LOG.debug("operationIdVsHiveId : {} ", operationIdVsHiveId);
+ //cover case when operationId is present, but not exists in ATS
+ //e.g. optimized queries without executing jobs, like "SELECT * FROM TABLE"
+ List<Job> jobs = viewJobResourceManager.readAll(new OnlyOwnersFilteringStrategy(username));
+ for (Job job : jobs) {
+ if (null != startTime && null != endTime && null != job.getDateSubmitted()
+ && (job.getDateSubmitted() < startTime || job.getDateSubmitted() >= endTime || operationIdVsHiveId.containsKey(job.getGuid()))
+ ) {
+ continue; // don't include this in the result
+ } else {
+ dbOnlyJobs.add(job);
+ }
+ }
+ return dbOnlyJobs;
+ }
+
+ private List<Job> fetchDagsAndMergeJobs(List<HiveQueryId> queries) {
+ List<Job> allJobs = new LinkedList<Job>();
+
+ for (HiveQueryId atsHiveQuery : queries) {
+ JobImpl atsJob = null;
+ if (hasOperationId(atsHiveQuery)) {
+ try {
+ Job viewJob = getJobByOperationId(atsHiveQuery.operationId);
+ TezDagId atsTezDag = getTezDagFromHiveQueryId(atsHiveQuery);
+ atsJob = mergeHiveAtsTez(atsHiveQuery, atsTezDag, viewJob);
+ } catch (ItemNotFound itemNotFound) {
+ LOG.error("Ignore : {}", itemNotFound.getMessage());
+ continue;
+ }
+ } else {
+ TezDagId atsTezDag = getTezDagFromHiveQueryId(atsHiveQuery);
+ atsJob = atsOnlyJob(atsHiveQuery, atsTezDag);
+ }
+
+ atsJob.setHiveQueryId(atsHiveQuery.entity);
+ allJobs.add(atsJob);
+ }
+
+ return allJobs;
+ }
+
+ /**
+ * @param atsHiveQuery
+ * @param atsTezDag
+ * @param viewJob
+ * @return
+ */
+ private JobImpl mergeHiveAtsTez(HiveQueryId atsHiveQuery, TezDagId atsTezDag, Job viewJob) throws ItemNotFound {
+ saveJobInfoIfNeeded(atsHiveQuery, atsTezDag, viewJob);
+ return mergeAtsJobWithViewJob(atsHiveQuery, atsTezDag, viewJob);
+ }
+
+ public Job readATSJob(Job viewJob) throws ItemNotFound {
+
+ if (viewJob.getStatus().equals(Job.JOB_STATE_INITIALIZED) || viewJob.getStatus().equals(Job.JOB_STATE_UNKNOWN))
+ return viewJob;
+
+ String hexGuid = viewJob.getGuid();
+
+
+ HiveQueryId atsHiveQuery = ats.getHiveQueryIdByOperationId(hexGuid);
+
+ TezDagId atsTezDag = getTezDagFromHiveQueryId(atsHiveQuery);
+
+ saveJobInfoIfNeeded(atsHiveQuery, atsTezDag, viewJob, true);
+ return mergeAtsJobWithViewJob(atsHiveQuery, atsTezDag, viewJob);
+ }
+
+ private TezDagId getTezDagFromHiveQueryId(HiveQueryId atsHiveQuery) {
+ TezDagId atsTezDag;
+ if (atsHiveQuery.version >= HiveQueryId.ATS_15_RESPONSE_VERSION) {
+ atsTezDag = ats.getTezDAGByEntity(atsHiveQuery.entity);
+ } else if (atsHiveQuery.dagNames != null && atsHiveQuery.dagNames.size() > 0) {
+ String dagName = atsHiveQuery.dagNames.get(0);
+
+ atsTezDag = ats.getTezDAGByName(dagName);
+ } else {
+ atsTezDag = new TezDagId();
+ }
+ return atsTezDag;
+ }
+
+ protected boolean hasOperationId(HiveQueryId atsHiveQuery) {
+ return atsHiveQuery.operationId != null;
+ }
+
+ protected JobImpl mergeAtsJobWithViewJob(HiveQueryId atsHiveQuery, TezDagId atsTezDag, Job viewJob) {
+ JobImpl atsJob;
+ try {
+ atsJob = new JobImpl(PropertyUtils.describe(viewJob));
+ } catch (IllegalAccessException e) {
+ LOG.error("Can't instantiate JobImpl", e);
+ return null;
+ } catch (InvocationTargetException e) {
+ LOG.error("Can't instantiate JobImpl", e);
+ return null;
+ } catch (NoSuchMethodException e) {
+ LOG.error("Can't instantiate JobImpl", e);
+ return null;
+ }
+ fillAtsJobFields(atsJob, atsHiveQuery, atsTezDag);
+ return atsJob;
+ }
+
+ protected void saveJobInfoIfNeeded(HiveQueryId hiveQueryId, TezDagId tezDagId, Job viewJob) throws ItemNotFound {
+ saveJobInfoIfNeeded(hiveQueryId, tezDagId, viewJob, false);
+ }
+
+ protected void saveJobInfoIfNeeded(HiveQueryId hiveQueryId, TezDagId tezDagId, Job viewJob, boolean useActorSystem) throws ItemNotFound {
+ boolean updateDb = false;
+ String dagName = null;
+ String dagId = null;
+ String applicationId = null;
+ if (viewJob.getDagName() == null || viewJob.getDagName().isEmpty()) {
+ if (hiveQueryId.dagNames != null && hiveQueryId.dagNames.size() > 0) {
+ dagName = hiveQueryId.dagNames.get(0);
+ updateDb = true;
+ }
+ }
+ if (tezDagId.status != null && (tezDagId.status.compareToIgnoreCase(Job.JOB_STATE_UNKNOWN) != 0) &&
+ !viewJob.getStatus().equalsIgnoreCase(tezDagId.status)) {
+ dagId = tezDagId.entity;
+ applicationId = tezDagId.applicationId;
+ updateDb = true;
+ }
+
+ if(updateDb) {
+ if (useActorSystem) {
+ LOG.info("Saving DAG information via actor system for job id: {}", viewJob.getId());
+ operationController.tell(new SaveDagInformation(viewJob.getId(), dagName, dagId, applicationId), ActorRef.noSender());
+ } else {
+ viewJob.setDagName(dagName);
+ viewJob.setDagId(dagId);
+ viewJob.setApplicationId(applicationId);
+ viewJobResourceManager.update(viewJob, viewJob.getId());
+ }
+ }
+ }
+
+ protected JobImpl atsOnlyJob(HiveQueryId atsHiveQuery, TezDagId atsTezDag) {
+ JobImpl atsJob = new JobImpl();
+ atsJob.setId(atsHiveQuery.entity);
+ fillAtsJobFields(atsJob, atsHiveQuery, atsTezDag);
+
+ String query = atsHiveQuery.query;
+ atsJob.setTitle(query.substring(0, (query.length() > 42) ? 42 : query.length()));
+
+ atsJob.setQueryFile(FileService.JSON_PATH_FILE + atsHiveQuery.url + "#otherinfo.QUERY!queryText");
+ return atsJob;
+ }
+
+ protected JobImpl fillAtsJobFields(JobImpl atsJob, HiveQueryId atsHiveQuery, TezDagId atsTezDag) {
+ atsJob.setApplicationId(atsTezDag.applicationId);
+
+ if (atsHiveQuery.dagNames != null && atsHiveQuery.dagNames.size() > 0)
+ atsJob.setDagName(atsHiveQuery.dagNames.get(0));
+ atsJob.setDagId(atsTezDag.entity);
+ if (atsHiveQuery.starttime != 0)
+ atsJob.setDateSubmitted(atsHiveQuery.starttime);
+ atsJob.setDuration(atsHiveQuery.duration);
+ return atsJob;
+ }
+
+ protected Job getJobByOperationId(final String opId) throws ItemNotFound {
+ List<Job> jobs = viewJobResourceManager.readAll(new FilteringStrategy() {
+ @Override
+ public boolean isConform(Indexed item) {
+ Job opHandle = (Job) item;
+ return opHandle.getGuid().equals(opId);
+ }
+
+ @Override
+ public String whereStatement() {
+ return "guid='" + opId + "'";
+ }
+ });
+
+ if (jobs.size() != 1)
+ throw new ItemNotFound();
+
+ return jobs.get(0);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/853a1ce7/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/jobs/JobResourceProvider.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/jobs/JobResourceProvider.java b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/jobs/JobResourceProvider.java
new file mode 100644
index 0000000..6156933
--- /dev/null
+++ b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/jobs/JobResourceProvider.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.view.hive20.resources.jobs;
+
+import org.apache.ambari.view.*;
+import org.apache.ambari.view.hive20.persistence.utils.ItemNotFound;
+import org.apache.ambari.view.hive20.persistence.utils.OnlyOwnersFilteringStrategy;
+import org.apache.ambari.view.hive20.resources.jobs.viewJobs.*;
+import org.apache.ambari.view.hive20.utils.SharedObjectsFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.inject.Inject;
+import java.lang.reflect.InvocationTargetException;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Resource provider for job
+ */
+public class JobResourceProvider implements ResourceProvider<Job> {
+ @Inject
+ ViewContext context;
+
+ protected JobResourceManager resourceManager = null;
+ protected final static Logger LOG =
+ LoggerFactory.getLogger(JobResourceProvider.class);
+
+ protected synchronized JobResourceManager getResourceManager() {
+ if (resourceManager == null) {
+ resourceManager = new JobResourceManager(new SharedObjectsFactory(context), context);
+ }
+ return resourceManager;
+ }
+
+ @Override
+ public Job getResource(String resourceId, Set<String> properties) throws SystemException, NoSuchResourceException, UnsupportedPropertyException {
+ try {
+ return getResourceManager().read(resourceId);
+ } catch (ItemNotFound itemNotFound) {
+ throw new NoSuchResourceException(resourceId);
+ }
+ }
+
+ @Override
+ public Set<Job> getResources(ReadRequest readRequest) throws SystemException, NoSuchResourceException, UnsupportedPropertyException {
+ if (context == null) {
+ return new HashSet<Job>();
+ }
+ return new HashSet<Job>(getResourceManager().readAll(
+ new OnlyOwnersFilteringStrategy(this.context.getUsername())));
+ }
+
+ @Override
+ public void createResource(String s, Map<String, Object> stringObjectMap) throws SystemException, ResourceAlreadyExistsException, NoSuchResourceException, UnsupportedPropertyException {
+ Job item = null;
+ try {
+ item = new JobImpl(stringObjectMap);
+ } catch (InvocationTargetException e) {
+ throw new SystemException("error on creating resource", e);
+ } catch (IllegalAccessException e) {
+ throw new SystemException("error on creating resource", e);
+ }
+ getResourceManager().create(item);
+ JobController jobController = new SharedObjectsFactory(context).getJobControllerFactory().createControllerForJob(item);
+ try {
+ jobController.submit();
+ } catch (Throwable throwable) {
+ throw new SystemException("error on creating resource", throwable);
+ }
+ }
+
+ @Override
+ public boolean updateResource(String resourceId, Map<String, Object> stringObjectMap) throws SystemException, NoSuchResourceException, UnsupportedPropertyException {
+ Job item = null;
+ try {
+ item = new JobImpl(stringObjectMap);
+ } catch (InvocationTargetException e) {
+ throw new SystemException("error on updating resource", e);
+ } catch (IllegalAccessException e) {
+ throw new SystemException("error on updating resource", e);
+ }
+ try {
+ getResourceManager().update(item, resourceId);
+ } catch (ItemNotFound itemNotFound) {
+ throw new NoSuchResourceException(resourceId);
+ }
+ return true;
+ }
+
+ @Override
+ public boolean deleteResource(String resourceId) throws SystemException, NoSuchResourceException, UnsupportedPropertyException {
+ try {
+ getResourceManager().delete(resourceId);
+ } catch (ItemNotFound itemNotFound) {
+ throw new NoSuchResourceException(resourceId);
+ }
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/853a1ce7/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/jobs/JobService.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/jobs/JobService.java b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/jobs/JobService.java
new file mode 100644
index 0000000..675ea37
--- /dev/null
+++ b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/jobs/JobService.java
@@ -0,0 +1,626 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.view.hive20.resources.jobs;
+
+import akka.actor.ActorRef;
+import com.beust.jcommander.internal.Lists;
+import com.google.common.base.Optional;
+import org.apache.ambari.view.ViewResourceHandler;
+import org.apache.ambari.view.hive20.BaseService;
+import org.apache.ambari.view.hive20.ConnectionFactory;
+import org.apache.ambari.view.hive20.ConnectionSystem;
+import org.apache.ambari.view.hive20.actor.message.job.Failure;
+import org.apache.ambari.view.hive20.backgroundjobs.BackgroundJobController;
+import org.apache.ambari.view.hive20.client.AsyncJobRunner;
+import org.apache.ambari.view.hive20.client.AsyncJobRunnerImpl;
+import org.apache.ambari.view.hive20.client.ColumnDescription;
+import org.apache.ambari.view.hive20.client.Cursor;
+import org.apache.ambari.view.hive20.client.EmptyCursor;
+import org.apache.ambari.view.hive20.client.HiveClientException;
+import org.apache.ambari.view.hive20.client.NonPersistentCursor;
+import org.apache.ambari.view.hive20.client.Row;
+import org.apache.ambari.view.hive20.persistence.utils.ItemNotFound;
+import org.apache.ambari.view.hive20.resources.jobs.atsJobs.IATSParser;
+import org.apache.ambari.view.hive20.resources.jobs.viewJobs.Job;
+import org.apache.ambari.view.hive20.resources.jobs.viewJobs.JobController;
+import org.apache.ambari.view.hive20.resources.jobs.viewJobs.JobImpl;
+import org.apache.ambari.view.hive20.resources.jobs.viewJobs.JobInfo;
+import org.apache.ambari.view.hive20.resources.jobs.viewJobs.JobResourceManager;
+import org.apache.ambari.view.hive20.utils.MisconfigurationFormattedException;
+import org.apache.ambari.view.hive20.utils.NotFoundFormattedException;
+import org.apache.ambari.view.hive20.utils.ServiceFormattedException;
+import org.apache.ambari.view.hive20.utils.SharedObjectsFactory;
+import org.apache.commons.beanutils.PropertyUtils;
+import org.apache.commons.csv.CSVFormat;
+import org.apache.commons.csv.CSVPrinter;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.json.simple.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.inject.Inject;
+import javax.servlet.http.HttpServletResponse;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.DELETE;
+import javax.ws.rs.GET;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import javax.ws.rs.core.StreamingOutput;
+import javax.ws.rs.core.UriInfo;
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.io.Writer;
+import java.lang.reflect.InvocationTargetException;
+import java.sql.SQLException;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+
+/**
+ * Servlet for queries
+ * API:
+ * GET /:id
+ * read job
+ * POST /
+ * create new job
+ * Required: title, queryFile
+ * GET /
+ * get all Jobs of current user
+ */
+public class JobService extends BaseService {
+ @Inject
+ ViewResourceHandler handler;
+
+ private JobResourceManager resourceManager;
+
+ protected final static Logger LOG =
+ LoggerFactory.getLogger(JobService.class);
+ private Aggregator aggregator;
+
+ protected synchronized JobResourceManager getResourceManager() {
+ if (resourceManager == null) {
+ SharedObjectsFactory connectionsFactory = getSharedObjectsFactory();
+ resourceManager = new JobResourceManager(connectionsFactory, context);
+ }
+ return resourceManager;
+ }
+
+
+ protected Aggregator getAggregator() {
+ if (aggregator == null) {
+ IATSParser atsParser = getSharedObjectsFactory().getATSParser();
+ ActorRef operationController = ConnectionSystem.getInstance().getOperationController(context);
+ aggregator = new Aggregator(getResourceManager(), atsParser, operationController);
+ }
+ return aggregator;
+ }
+
+ protected void setAggregator(Aggregator aggregator) {
+ this.aggregator = aggregator;
+ }
+
+ /**
+ * Get single item
+ */
+ @GET
+ @Path("{jobId}")
+ @Produces(MediaType.APPLICATION_JSON)
+ public Response getOne(@PathParam("jobId") String jobId) {
+ try {
+ JobController jobController = getResourceManager().readController(jobId);
+
+ Job job = jobController.getJob();
+ if(job.getStatus().equals(Job.JOB_STATE_ERROR) || job.getStatus().equals(Job.JOB_STATE_CANCELED)){
+ ConnectionSystem system = ConnectionSystem.getInstance();
+ final AsyncJobRunner asyncJobRunner = new AsyncJobRunnerImpl(context, system.getOperationController(context), system.getActorSystem());
+ Optional<Failure> error = asyncJobRunner.getError(jobId, context.getUsername());
+
+ if(error.isPresent()){
+ Throwable th = error.get().getError();
+ if(th instanceof SQLException){
+ SQLException sqlException = (SQLException) th;
+ if(sqlException.getSQLState().equals("AUTHFAIL") && ConnectionFactory.isLdapEnabled(context))
+ return Response.status(401).build();
+ }
+ throw new Exception(th);
+ }
+ }
+
+ JSONObject jsonJob = jsonObjectFromJob(jobController);
+ return Response.ok(jsonJob).build();
+ } catch (WebApplicationException ex) {
+ throw ex;
+ } catch (ItemNotFound itemNotFound) {
+ throw new NotFoundFormattedException(itemNotFound.getMessage(), itemNotFound);
+ } catch (Exception ex) {
+ throw new ServiceFormattedException(ex.getMessage(), ex);
+ }
+ }
+
+ private JSONObject jsonObjectFromJob(JobController jobController) throws IllegalAccessException, NoSuchMethodException, InvocationTargetException {
+ Job hiveJob = jobController.getJobPOJO();
+
+ Job mergedJob;
+ try {
+ mergedJob = getAggregator().readATSJob(hiveJob);
+ } catch (ItemNotFound itemNotFound) {
+ throw new ServiceFormattedException("E010 ExecuteJob not found", itemNotFound);
+ }
+ Map createdJobMap = PropertyUtils.describe(mergedJob);
+ createdJobMap.remove("class"); // no need to show Bean class on client
+
+ JSONObject jobJson = new JSONObject();
+ jobJson.put("job", createdJobMap);
+ return jobJson;
+ }
+
+ /**
+ * Get job results in csv format
+ */
+ @GET
+ @Path("{jobId}/results/csv")
+ @Produces("text/csv")
+ public Response getResultsCSV(@PathParam("jobId") String jobId,
+ @Context HttpServletResponse response,
+ @QueryParam("fileName") String fileName,
+ @QueryParam("columns") final String requestedColumns) {
+ try {
+
+ final String username = context.getUsername();
+
+ ConnectionSystem system = ConnectionSystem.getInstance();
+ final AsyncJobRunner asyncJobRunner = new AsyncJobRunnerImpl(context, system.getOperationController(context), system.getActorSystem());
+
+ Optional<NonPersistentCursor> cursorOptional = asyncJobRunner.resetAndGetCursor(jobId, username);
+
+ if(!cursorOptional.isPresent()){
+ throw new Exception("Download failed");
+ }
+
+ final NonPersistentCursor resultSet = cursorOptional.get();
+
+
+ StreamingOutput stream = new StreamingOutput() {
+ @Override
+ public void write(OutputStream os) throws IOException, WebApplicationException {
+ Writer writer = new BufferedWriter(new OutputStreamWriter(os));
+ CSVPrinter csvPrinter = new CSVPrinter(writer, CSVFormat.DEFAULT);
+ try {
+
+ List<ColumnDescription> descriptions = resultSet.getDescriptions();
+ List<String> headers = Lists.newArrayList();
+ for (ColumnDescription description : descriptions) {
+ headers.add(description.getName());
+ }
+
+ csvPrinter.printRecord(headers.toArray());
+
+ while (resultSet.hasNext()) {
+ csvPrinter.printRecord(resultSet.next().getRow());
+ writer.flush();
+ }
+ } finally {
+ writer.close();
+ }
+ }
+ };
+
+ if (fileName == null || fileName.isEmpty()) {
+ fileName = "results.csv";
+ }
+
+ return Response.ok(stream).
+ header("Content-Disposition", String.format("attachment; filename=\"%s\"", fileName)).
+ build();
+
+
+ } catch (WebApplicationException ex) {
+ throw ex;
+ } catch (Throwable ex) {
+ throw new ServiceFormattedException(ex.getMessage(), ex);
+ }
+ }
+
+ /**
+ * Get job results in csv format
+ */
+ @GET
+ @Path("{jobId}/results/csv/saveToHDFS")
+ @Produces(MediaType.APPLICATION_JSON)
+ public Response getResultsToHDFS(@PathParam("jobId") String jobId,
+ @QueryParam("commence") String commence,
+ @QueryParam("file") final String targetFile,
+ @QueryParam("stop") final String stop,
+ @QueryParam("columns") final String requestedColumns,
+ @Context HttpServletResponse response) {
+ try {
+
+ final JobController jobController = getResourceManager().readController(jobId);
+ final String username = context.getUsername();
+
+ String backgroundJobId = "csv" + String.valueOf(jobController.getJob().getId());
+ if (commence != null && commence.equals("true")) {
+ if (targetFile == null)
+ throw new MisconfigurationFormattedException("targetFile should not be empty");
+
+ ConnectionSystem system = ConnectionSystem.getInstance();
+ final AsyncJobRunner asyncJobRunner = new AsyncJobRunnerImpl(context, system.getOperationController(context), system.getActorSystem());
+
+ Optional<NonPersistentCursor> cursorOptional = asyncJobRunner.resetAndGetCursor(jobId, username);
+
+ if(!cursorOptional.isPresent()){
+ throw new Exception("Download failed");
+ }
+
+ final NonPersistentCursor resultSet = cursorOptional.get();
+
+ BackgroundJobController.getInstance(context).startJob(String.valueOf(backgroundJobId), new Runnable() {
+ @Override
+ public void run() {
+
+ try {
+
+ FSDataOutputStream stream = getSharedObjectsFactory().getHdfsApi().create(targetFile, true);
+ Writer writer = new BufferedWriter(new OutputStreamWriter(stream));
+ CSVPrinter csvPrinter = new CSVPrinter(writer, CSVFormat.DEFAULT);
+ try {
+ while (resultSet.hasNext() && !Thread.currentThread().isInterrupted()) {
+ csvPrinter.printRecord(resultSet.next().getRow());
+ writer.flush();
+ }
+ } finally {
+ writer.close();
+ }
+ stream.close();
+
+ } catch (IOException e) {
+ throw new ServiceFormattedException("F010 Could not write CSV to HDFS for job#" + jobController.getJob().getId(), e);
+ } catch (InterruptedException e) {
+ throw new ServiceFormattedException("F010 Could not write CSV to HDFS for job#" + jobController.getJob().getId(), e);
+ }
+ }
+ });
+ }
+
+ if (stop != null && stop.equals("true")) {
+ BackgroundJobController.getInstance(context).interrupt(backgroundJobId);
+ }
+
+ JSONObject object = new JSONObject();
+ object.put("stopped", BackgroundJobController.getInstance(context).isInterrupted(backgroundJobId));
+ object.put("jobId", jobController.getJob().getId());
+ object.put("backgroundJobId", backgroundJobId);
+ object.put("operationType", "CSV2HDFS");
+ object.put("status", BackgroundJobController.getInstance(context).state(backgroundJobId).toString());
+
+ return Response.ok(object).build();
+ } catch (WebApplicationException ex) {
+ throw ex;
+ } catch (ItemNotFound itemNotFound) {
+ throw new NotFoundFormattedException(itemNotFound.getMessage(), itemNotFound);
+ } catch (Exception ex) {
+ throw new ServiceFormattedException(ex.getMessage(), ex);
+ }
+ }
+
+
+ @Path("{jobId}/status")
+ @GET
+ @Consumes(MediaType.APPLICATION_JSON)
+ @Produces(MediaType.APPLICATION_JSON)
+ public Response fetchJobStatus(@PathParam("jobId") String jobId) throws ItemNotFound, HiveClientException, NoOperationStatusSetException {
+ JobController jobController = getResourceManager().readController(jobId);
+ Job job = jobController.getJob();
+ String jobStatus = job.getStatus();
+
+
+ LOG.info("jobStatus : {} for jobId : {}",jobStatus, jobId);
+
+ JSONObject jsonObject = new JSONObject();
+ jsonObject.put("jobStatus", jobStatus);
+ jsonObject.put("jobId", jobId);
+
+ return Response.ok(jsonObject).build();
+ }
+
+ /**
+ * Get next results page
+ */
+ @GET
+ @Path("{jobId}/results")
+ @Produces(MediaType.APPLICATION_JSON)
+ public Response getResults(@PathParam("jobId") final String jobId,
+ @QueryParam("first") final String fromBeginning,
+ @QueryParam("count") Integer count,
+ @QueryParam("searchId") String searchId,
+ @QueryParam("format") String format,
+ @QueryParam("columns") final String requestedColumns) {
+ try {
+
+ final String username = context.getUsername();
+
+ ConnectionSystem system = ConnectionSystem.getInstance();
+ final AsyncJobRunner asyncJobRunner = new AsyncJobRunnerImpl(context, system.getOperationController(context), system.getActorSystem());
+
+ return ResultsPaginationController.getInstance(context)
+ .request(jobId, searchId, true, fromBeginning, count, format,requestedColumns,
+ new Callable<Cursor< Row, ColumnDescription >>() {
+ @Override
+ public Cursor call() throws Exception {
+ Optional<NonPersistentCursor> cursor;
+ if(fromBeginning != null && fromBeginning.equals("true")){
+ cursor = asyncJobRunner.resetAndGetCursor(jobId, username);
+ }
+ else {
+ cursor = asyncJobRunner.getCursor(jobId, username);
+ }
+ if(cursor.isPresent())
+ return cursor.get();
+ else
+ return new EmptyCursor();
+ }
+ }).build();
+
+ } catch (WebApplicationException ex) {
+ throw ex;
+ } catch (Exception ex) {
+ throw new ServiceFormattedException(ex.getMessage(), ex);
+ }
+ }
+
+ /**
+ * Renew expiration time for results
+ */
+ @GET
+ @Path("{jobId}/results/keepAlive")
+ public Response keepAliveResults(@PathParam("jobId") String jobId,
+ @QueryParam("first") String fromBeginning,
+ @QueryParam("count") Integer count) {
+ try {
+ if (!ResultsPaginationController.getInstance(context).keepAlive(jobId, ResultsPaginationController.DEFAULT_SEARCH_ID)) {
+ throw new NotFoundFormattedException("Results already expired", null);
+ }
+ return Response.ok().build();
+ } catch (WebApplicationException ex) {
+ throw ex;
+ } catch (Exception ex) {
+ throw new ServiceFormattedException(ex.getMessage(), ex);
+ }
+ }
+
+ /**
+ * Get progress info
+ */
+ @GET
+ @Path("{jobId}/progress")
+ @Produces(MediaType.APPLICATION_JSON)
+ public Response getProgress(@PathParam("jobId") String jobId) {
+ try {
+ final JobController jobController = getResourceManager().readController(jobId);
+
+ ProgressRetriever.Progress progress = new ProgressRetriever(jobController.getJob(), getSharedObjectsFactory()).
+ getProgress();
+
+ return Response.ok(progress).build();
+ } catch (WebApplicationException ex) {
+ throw ex;
+ } catch (ItemNotFound itemNotFound) {
+ throw new NotFoundFormattedException(itemNotFound.getMessage(), itemNotFound);
+ } catch (Exception ex) {
+ throw new ServiceFormattedException(ex.getMessage(), ex);
+ }
+ }
+
+ /**
+ * Delete single item
+ */
+ @DELETE
+ @Path("{id}")
+ public Response delete(@PathParam("id") String id,
+ @QueryParam("remove") final String remove) {
+ try {
+ JobController jobController;
+ try {
+ jobController = getResourceManager().readController(id);
+ } catch (ItemNotFound itemNotFound) {
+ throw new NotFoundFormattedException(itemNotFound.getMessage(), itemNotFound);
+ }
+ jobController.cancel();
+ if (remove != null && remove.compareTo("true") == 0) {
+ getResourceManager().delete(id);
+ }
+ return Response.status(204).build();
+ } catch (WebApplicationException ex) {
+ throw ex;
+ } catch (ItemNotFound itemNotFound) {
+ throw new NotFoundFormattedException(itemNotFound.getMessage(), itemNotFound);
+ } catch (Exception ex) {
+ throw new ServiceFormattedException(ex.getMessage(), ex);
+ }
+ }
+
+ /**
+ * Get all Jobs
+ */
+ @GET
+ @Produces(MediaType.APPLICATION_JSON)
+ public Response getList(@QueryParam("startTime") long startTime, @QueryParam("endTime") long endTime) {
+ try {
+
+ LOG.debug("Getting all job: startTime: {}, endTime: {}",startTime,endTime);
+ List<Job> allJobs = getAggregator().readAllForUserByTime(context.getUsername(),startTime, endTime);
+ for(Job job : allJobs) {
+ job.setSessionTag(null);
+ }
+ JSONObject result = new JSONObject();
+ result.put("jobs", allJobs);
+ return Response.ok(result).build();
+ } catch (WebApplicationException ex) {
+ LOG.error("Exception occured while fetching all jobs.", ex);
+ throw ex;
+ } catch (Exception ex) {
+ LOG.error("Exception occured while fetching all jobs.", ex);
+ throw new ServiceFormattedException(ex.getMessage(), ex);
+ }
+ }
+
+ /**
+ * fetch the jobs with given info.
+ * provide as much info about the job so that next api can optimize the fetch process.
+ * @param jobInfos
+ * @return
+ */
+ @Path("/getList")
+ @POST
+ @Produces(MediaType.APPLICATION_JSON)
+ @Consumes(MediaType.APPLICATION_JSON)
+ public List<Job> getList(List<JobInfo> jobInfos) {
+ try {
+ LOG.debug("fetching jobs with ids :{}", jobInfos);
+ List<Job> allJobs = getAggregator().readJobsByIds(jobInfos);
+ for(Job job : allJobs) {
+ job.setSessionTag(null);
+ }
+
+ return allJobs;
+ } catch (WebApplicationException ex) {
+ LOG.error("Exception occured while fetching all jobs.", ex);
+ throw ex;
+ } catch (Exception ex) {
+ LOG.error("Exception occured while fetching all jobs.", ex);
+ throw new ServiceFormattedException(ex.getMessage(), ex);
+ }
+ }
+
+ /**
+ * Create job
+ */
+ @POST
+ @Consumes(MediaType.APPLICATION_JSON)
+ public Response create(JobRequest request, @Context HttpServletResponse response,
+ @Context UriInfo ui) {
+ try {
+ Map jobInfo = PropertyUtils.describe(request.job);
+ Job job = new JobImpl(jobInfo);
+ JobController createdJobController = new JobServiceInternal().createJob(job, getResourceManager());
+ JSONObject jobObject = jsonObjectFromJob(createdJobController);
+ response.setHeader("Location",
+ String.format("%s/%s", ui.getAbsolutePath().toString(), job.getId()));
+ return Response.ok(jobObject).status(201).build();
+ } catch (WebApplicationException ex) {
+ LOG.error("Error occurred while creating job : ",ex);
+ throw ex;
+ } catch (ItemNotFound itemNotFound) {
+ LOG.error("Error occurred while creating job : ",itemNotFound);
+ throw new NotFoundFormattedException(itemNotFound.getMessage(), itemNotFound);
+ } catch (Throwable ex) {
+ LOG.error("Error occurred while creating job : ",ex);
+ throw new ServiceFormattedException(ex.getMessage(), ex);
+ }
+ }
+
+ /**
+ * Remove connection credentials
+ */
+ @DELETE
+ @Path("auth")
+ public Response removePassword() {
+ try {
+ //new UserLocalHiveAuthCredentials().remove(context);
+ //connectionLocal.remove(context); // force reconnect on next get
+ return Response.ok().status(200).build();
+ } catch (WebApplicationException ex) {
+ throw ex;
+ } catch (Exception ex) {
+ throw new ServiceFormattedException(ex.getMessage(), ex);
+ }
+ }
+
+
+ /**
+ * Invalidate session
+ */
+ @DELETE
+ @Path("sessions/{sessionTag}")
+ public Response invalidateSession(@PathParam("sessionTag") String sessionTag) {
+ try {
+ //Connection connection = connectionLocal.get(context);
+ //connection.invalidateSessionByTag(sessionTag);
+ return Response.ok().build();
+ } catch (WebApplicationException ex) {
+ throw ex;
+ } catch (Exception ex) {
+ throw new ServiceFormattedException(ex.getMessage(), ex);
+ }
+ }
+
+ /**
+ * Session status
+ */
+ @GET
+ @Path("sessions/{sessionTag}")
+ @Produces(MediaType.APPLICATION_JSON)
+ public Response sessionStatus(@PathParam("sessionTag") String sessionTag) {
+ try {
+ //Connection connection = connectionLocal.get(context);
+
+ JSONObject session = new JSONObject();
+ session.put("sessionTag", sessionTag);
+ try {
+ //connection.getSessionByTag(sessionTag);
+ session.put("actual", true);
+ } catch (Exception /*HiveClientException*/ ex) {
+ session.put("actual", false);
+ }
+
+ //TODO: New implementation
+
+ JSONObject status = new JSONObject();
+ status.put("session", session);
+ return Response.ok(status).build();
+ } catch (WebApplicationException ex) {
+ throw ex;
+ } catch (Exception ex) {
+ throw new ServiceFormattedException(ex.getMessage(), ex);
+ }
+ }
+
+ /**
+ * Wrapper object for json mapping
+ */
+ public static class JobRequest {
+ public JobImpl job;
+ }
+
+ /**
+ * Wrapper for authentication json mapping
+ */
+ public static class AuthRequest {
+ public String password;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/853a1ce7/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/jobs/JobServiceInternal.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/jobs/JobServiceInternal.java b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/jobs/JobServiceInternal.java
new file mode 100644
index 0000000..1409ba8
--- /dev/null
+++ b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/jobs/JobServiceInternal.java
@@ -0,0 +1,35 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.ambari.view.hive20.resources.jobs;
+
+import org.apache.ambari.view.hive20.resources.jobs.viewJobs.Job;
+import org.apache.ambari.view.hive20.resources.jobs.viewJobs.JobController;
+import org.apache.ambari.view.hive20.resources.jobs.viewJobs.JobResourceManager;
+
+public class JobServiceInternal {
+ public JobController createJob(Job job, JobResourceManager resourceManager) throws Throwable {
+ resourceManager.create(job);
+
+ JobController createdJobController = resourceManager.readController(job.getId());
+ createdJobController.submit();
+ resourceManager.saveIfModified(createdJobController);
+ return createdJobController;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/853a1ce7/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/jobs/ModifyNotificationDelegate.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/jobs/ModifyNotificationDelegate.java b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/jobs/ModifyNotificationDelegate.java
new file mode 100644
index 0000000..073cdc7
--- /dev/null
+++ b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/jobs/ModifyNotificationDelegate.java
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.view.hive20.resources.jobs;
+
+public interface ModifyNotificationDelegate {
+ boolean onModification(Object object);
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/853a1ce7/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/jobs/ModifyNotificationInvocationHandler.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/jobs/ModifyNotificationInvocationHandler.java b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/jobs/ModifyNotificationInvocationHandler.java
new file mode 100644
index 0000000..51058f5
--- /dev/null
+++ b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/jobs/ModifyNotificationInvocationHandler.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.view.hive20.resources.jobs;
+
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.Method;
+
+public class ModifyNotificationInvocationHandler implements InvocationHandler {
+ private Object proxied;
+ private ModifyNotificationDelegate modifyDelegate;
+
+ public ModifyNotificationInvocationHandler(Object proxied, ModifyNotificationDelegate delegate) {
+ this.proxied = proxied;
+ this.modifyDelegate = delegate;
+ }
+
+ @Override
+ public Object invoke(Object o, Method method, Object[] args) throws Throwable {
+ if (method.getName().startsWith("set")) {
+ modifyDelegate.onModification(proxied);
+ }
+ return method.invoke(proxied, args);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/853a1ce7/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/jobs/NoOperationStatusSetException.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/jobs/NoOperationStatusSetException.java b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/jobs/NoOperationStatusSetException.java
new file mode 100644
index 0000000..31d97d0
--- /dev/null
+++ b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/jobs/NoOperationStatusSetException.java
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.view.hive20.resources.jobs;
+
+
+public class NoOperationStatusSetException extends Exception {
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/853a1ce7/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/jobs/ProgressRetriever.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/jobs/ProgressRetriever.java b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/jobs/ProgressRetriever.java
new file mode 100644
index 0000000..4d8c7d7
--- /dev/null
+++ b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/jobs/ProgressRetriever.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.view.hive20.resources.jobs;
+
+import org.apache.ambari.view.hive20.resources.jobs.atsJobs.TezVertexId;
+import org.apache.ambari.view.hive20.resources.jobs.rm.RMParser;
+import org.apache.ambari.view.hive20.resources.jobs.viewJobs.Job;
+import org.apache.ambari.view.hive20.utils.ServiceFormattedException;
+import org.apache.ambari.view.hive20.utils.SharedObjectsFactory;
+
+import java.util.List;
+
+public class ProgressRetriever {
+ private final Progress progress;
+ private final Job job;
+ private final SharedObjectsFactory sharedObjects;
+
+ public ProgressRetriever(Job job, SharedObjectsFactory sharedObjects) {
+ this.job = job;
+ this.sharedObjects = sharedObjects;
+
+ this.progress = new Progress();
+ }
+
+ public Progress getProgress() {
+ jobCheck();
+
+ progress.dagProgress = sharedObjects.getRMParser().getDAGProgress(
+ job.getApplicationId(), job.getDagId());
+
+ List<TezVertexId> vertices = sharedObjects.getATSParser().getVerticesForDAGId(job.getDagId());
+ progress.vertexProgresses = sharedObjects.getRMParser().getDAGVerticesProgress(job.getApplicationId(), job.getDagId(), vertices);
+
+ return progress;
+ }
+
+ public void jobCheck() {
+ if (job.getApplicationId() == null || job.getApplicationId().isEmpty()) {
+ throw new ServiceFormattedException("E070 ApplicationId is not defined yet");
+ }
+ if (job.getDagId() == null || job.getDagId().isEmpty()) {
+ throw new ServiceFormattedException("E080 DagID is not defined yet");
+ }
+ }
+
+ public static class Progress {
+ public Double dagProgress;
+ public List<RMParser.VertexProgress> vertexProgresses;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/853a1ce7/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/jobs/ResultsPaginationController.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/jobs/ResultsPaginationController.java b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/jobs/ResultsPaginationController.java
new file mode 100644
index 0000000..6efa2a9
--- /dev/null
+++ b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/jobs/ResultsPaginationController.java
@@ -0,0 +1,286 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.view.hive20.resources.jobs;
+
+
+import com.google.common.base.Function;
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import org.apache.ambari.view.ViewContext;
+import org.apache.ambari.view.hive20.client.ColumnDescription;
+import org.apache.ambari.view.hive20.client.Cursor;
+import org.apache.ambari.view.hive20.client.HiveClientException;
+import org.apache.ambari.view.hive20.client.Row;
+import org.apache.ambari.view.hive20.utils.BadRequestFormattedException;
+import org.apache.ambari.view.hive20.utils.ResultFetchFormattedException;
+import org.apache.ambari.view.hive20.utils.ResultNotReadyFormattedException;
+import org.apache.ambari.view.hive20.utils.ServiceFormattedException;
+import org.apache.commons.collections4.map.PassiveExpiringMap;
+import org.apache.hadoop.hbase.util.Strings;
+
+import javax.ws.rs.core.Response;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Callable;
+
+/**
+ * Results Pagination Controller
+ * Persists cursors for result sets
+ */
+public class ResultsPaginationController {
+ public static final String DEFAULT_SEARCH_ID = "default";
+ private static Map<String, ResultsPaginationController> viewSingletonObjects = new HashMap<String, ResultsPaginationController>();
+ public static ResultsPaginationController getInstance(ViewContext context) {
+ if (!viewSingletonObjects.containsKey(context.getInstanceName()))
+ viewSingletonObjects.put(context.getInstanceName(), new ResultsPaginationController());
+ return viewSingletonObjects.get(context.getInstanceName());
+ }
+
+ public ResultsPaginationController() {
+ }
+
+ private static final long EXPIRING_TIME = 10*60*1000; // 10 minutes
+ private static final int DEFAULT_FETCH_COUNT = 50;
+ private Map<String, Cursor<Row, ColumnDescription>> resultsCache;
+
+ public static class CustomTimeToLiveExpirationPolicy extends PassiveExpiringMap.ConstantTimeToLiveExpirationPolicy<String, Cursor<Row, ColumnDescription>> {
+ public CustomTimeToLiveExpirationPolicy(long timeToLiveMillis) {
+ super(timeToLiveMillis);
+ }
+
+ @Override
+ public long expirationTime(String key, Cursor<Row, ColumnDescription> value) {
+ if (key.startsWith("$")) {
+ return -1; //never expire
+ }
+ return super.expirationTime(key, value);
+ }
+ }
+
+ private Map<String, Cursor<Row, ColumnDescription>> getResultsCache() {
+ if (resultsCache == null) {
+ PassiveExpiringMap<String, Cursor<Row, ColumnDescription>> resultsCacheExpiringMap =
+ new PassiveExpiringMap<>(new CustomTimeToLiveExpirationPolicy(EXPIRING_TIME));
+ resultsCache = Collections.synchronizedMap(resultsCacheExpiringMap);
+ }
+ return resultsCache;
+ }
+
+ /**
+ * Renew timer of cache entry.
+ * @param key name/id of results request
+ * @return false if entry not found; true if renew was ok
+ */
+ public boolean keepAlive(String key, String searchId) {
+ if (searchId == null)
+ searchId = DEFAULT_SEARCH_ID;
+ String effectiveKey = key + "?" + searchId;
+ if (!getResultsCache().containsKey(effectiveKey)) {
+ return false;
+ }
+ Cursor cursor = getResultsCache().get(effectiveKey);
+ getResultsCache().put(effectiveKey, cursor);
+ cursor.keepAlive();
+ return true;
+ }
+
+ private Cursor<Row, ColumnDescription> getResultsSet(String key, Callable<Cursor<Row, ColumnDescription>> makeResultsSet) {
+ if (!getResultsCache().containsKey(key)) {
+ Cursor resultSet;
+ try {
+ resultSet = makeResultsSet.call();
+ if (resultSet.isResettable()) {
+ resultSet.reset();
+ }
+ } catch (ResultNotReadyFormattedException | ResultFetchFormattedException ex) {
+ throw ex;
+ } catch (Exception ex) {
+ throw new ServiceFormattedException(ex.getMessage(), ex);
+ }
+ getResultsCache().put(key, resultSet);
+ }
+
+ return getResultsCache().get(key);
+ }
+
+ public Response.ResponseBuilder request(String key, String searchId, boolean canExpire, String fromBeginning, Integer count, String format, String requestedColumns, Callable<Cursor<Row, ColumnDescription>> makeResultsSet) throws HiveClientException {
+ if (searchId == null)
+ searchId = DEFAULT_SEARCH_ID;
+ key = key + "?" + searchId;
+ if (!canExpire)
+ key = "$" + key;
+ if (fromBeginning != null && fromBeginning.equals("true") && getResultsCache().containsKey(key)) {
+
+ getResultsCache().remove(key);
+ }
+
+ Cursor<Row, ColumnDescription> resultSet = getResultsSet(key, makeResultsSet);
+
+ if (count == null)
+ count = DEFAULT_FETCH_COUNT;
+
+ List<ColumnDescription> allschema = resultSet.getDescriptions();
+ List<Row> allRowEntries = FluentIterable.from(resultSet)
+ .limit(count).toList();
+
+ List<ColumnDescription> schema = allschema;
+
+ final Set<Integer> selectedColumns = getRequestedColumns(requestedColumns);
+ if (!selectedColumns.isEmpty()) {
+ schema = filter(allschema, selectedColumns);
+ }
+
+ List<Object[]> rows = FluentIterable.from(allRowEntries)
+ .transform(new Function<Row, Object[]>() {
+ @Override
+ public Object[] apply(Row input) {
+ if(!selectedColumns.isEmpty()) {
+ return filter(Lists.newArrayList(input.getRow()), selectedColumns).toArray();
+ } else {
+ return input.getRow();
+ }
+ }
+ }).toList();
+
+ int read = rows.size();
+ if(format != null && format.equalsIgnoreCase("d3")) {
+ List<Map<String,Object>> results = new ArrayList<>();
+ for(int i=0; i<rows.size(); i++) {
+ Object[] row = rows.get(i);
+ Map<String, Object> keyValue = new HashMap<>(row.length);
+ for(int j=0; j<row.length; j++) {
+ //Replace dots in schema with underscore
+ String schemaName = schema.get(j).getName();
+ keyValue.put(schemaName.replace('.','_'), row[j]);
+ }
+ results.add(keyValue);
+ }
+ return Response.ok(results);
+ } else {
+ ResultsResponse resultsResponse = new ResultsResponse();
+ resultsResponse.setSchema(schema);
+ resultsResponse.setRows(rows);
+ resultsResponse.setReadCount(read);
+ resultsResponse.setHasNext(resultSet.hasNext());
+ // resultsResponse.setSize(resultSet.size());
+ resultsResponse.setOffset(resultSet.getOffset());
+ resultsResponse.setHasResults(true);
+ return Response.ok(resultsResponse);
+ }
+ }
+
+ private <T> List<T> filter(List<T> list, Set<Integer> selectedColumns) {
+ List<T> filtered = Lists.newArrayList();
+ for(int i: selectedColumns) {
+ if(list != null && list.get(i) != null)
+ filtered.add(list.get(i));
+ }
+
+ return filtered;
+ }
+
+ private Set<Integer> getRequestedColumns(String requestedColumns) {
+ if(Strings.isEmpty(requestedColumns)) {
+ return new HashSet<>();
+ }
+ Set<Integer> selectedColumns = Sets.newHashSet();
+ for (String columnRequested : requestedColumns.split(",")) {
+ try {
+ selectedColumns.add(Integer.parseInt(columnRequested));
+ } catch (NumberFormatException ex) {
+ throw new BadRequestFormattedException("Columns param should be comma-separated integers", ex);
+ }
+ }
+ return selectedColumns;
+ }
+
+ private static class ResultsResponse {
+ private List<ColumnDescription> schema;
+ private List<String[]> rows;
+ private int readCount;
+ private boolean hasNext;
+ private long offset;
+ private boolean hasResults;
+
+ public void setSchema(List<ColumnDescription> schema) {
+ this.schema = schema;
+ }
+
+ public List<ColumnDescription> getSchema() {
+ return schema;
+ }
+
+ public void setRows(List<Object[]> rows) {
+ if( null == rows ){
+ this.rows = null;
+ }
+ this.rows = new ArrayList<String[]>(rows.size());
+ for(Object[] row : rows ){
+ String[] strs = new String[row.length];
+ for( int colNum = 0 ; colNum < row.length ; colNum++ ){
+ String value = String.valueOf(row[colNum]);
+ if(row[colNum] != null && (value.isEmpty() || value.equalsIgnoreCase("null"))){
+ strs[colNum] = String.format("\"%s\"",value);
+ }else{
+ strs[colNum] = value;
+ }
+ }
+ this.rows.add(strs);
+ }
+ }
+
+ public List<String[]> getRows() {
+ return rows;
+ }
+
+ public void setReadCount(int readCount) {
+ this.readCount = readCount;
+ }
+
+ public void setHasNext(boolean hasNext) {
+ this.hasNext = hasNext;
+ }
+
+ public boolean isHasNext() {
+ return hasNext;
+ }
+
+ public long getOffset() {
+ return offset;
+ }
+
+ public void setOffset(long offset) {
+ this.offset = offset;
+ }
+
+ public boolean getHasResults() {
+ return hasResults;
+ }
+
+ public void setHasResults(boolean hasResults) {
+ this.hasResults = hasResults;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/853a1ce7/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/jobs/atsJobs/ATSParser.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/jobs/atsJobs/ATSParser.java b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/jobs/atsJobs/ATSParser.java
new file mode 100644
index 0000000..6e9753d
--- /dev/null
+++ b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/jobs/atsJobs/ATSParser.java
@@ -0,0 +1,248 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.view.hive20.resources.jobs.atsJobs;
+
+import org.json.simple.JSONArray;
+import org.json.simple.JSONObject;
+import org.json.simple.JSONValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ * Parser of ATS responses
+ */
+public class ATSParser implements IATSParser {
+ protected final static Logger LOG =
+ LoggerFactory.getLogger(ATSParser.class);
+
+ private ATSRequestsDelegate delegate;
+
+ private static final long MillisInSecond = 1000L;
+
+ public ATSParser(ATSRequestsDelegate delegate) {
+ this.delegate = delegate;
+ }
+
+ /**
+ * returns all HiveQueryIDs from ATS for the given user.
+ * @param username
+ * @return
+ */
+ @Override
+ public List<HiveQueryId> getHiveQueryIdsForUser(String username) {
+ JSONObject entities = delegate.hiveQueryIdsForUser(username);
+ return parseHqidJsonFromATS(entities);
+ }
+
+ /**
+ * parses the JSONArray or hive query IDs
+ * @param entities: should contain 'entities' element as JSONArray
+ * @return
+ */
+ private List<HiveQueryId> parseHqidJsonFromATS(JSONObject entities) {
+ JSONArray jobs = (JSONArray) entities.get("entities");
+
+ return getHqidListFromJsonArray(jobs);
+ }
+
+ /**
+ * parses List of HiveQueryIds from JSON
+ * @param jobs
+ * @return
+ */
+ private List<HiveQueryId> getHqidListFromJsonArray(JSONArray jobs) {
+ List<HiveQueryId> parsedJobs = new LinkedList<>();
+ for (Object job : jobs) {
+ try {
+ HiveQueryId parsedJob = parseAtsHiveJob((JSONObject) job);
+ parsedJobs.add(parsedJob);
+ } catch (Exception ex) {
+ LOG.error("Error while parsing ATS job", ex);
+ }
+ }
+
+ return parsedJobs;
+ }
+
+ @Override
+ public List<TezVertexId> getVerticesForDAGId(String dagId) {
+ JSONObject entities = delegate.tezVerticesListForDAG(dagId);
+ JSONArray vertices = (JSONArray) entities.get("entities");
+
+ List<TezVertexId> parsedVertices = new LinkedList<TezVertexId>();
+ for(Object vertex : vertices) {
+ try {
+ TezVertexId parsedVertex = parseVertex((JSONObject) vertex);
+ parsedVertices.add(parsedVertex);
+ } catch (Exception ex) {
+ LOG.error("Error while parsing the vertex", ex);
+ }
+ }
+
+ return parsedVertices;
+ }
+
+ @Override
+ public HiveQueryId getHiveQueryIdByOperationId(String guidString) {
+ JSONObject entities = delegate.hiveQueryIdByOperationId(guidString);
+ return getHiveQueryIdFromJson(entities);
+ }
+
+ private HiveQueryId getHiveQueryIdFromJson(JSONObject entities) {
+ JSONArray jobs = (JSONArray) entities.get("entities");
+
+ if (jobs.size() == 0) {
+ return new HiveQueryId();
+ }
+
+ return parseAtsHiveJob((JSONObject) jobs.get(0));
+ }
+
+ /**
+ * returns the hive entity from ATS. empty object if not found.
+ *
+ * @param hiveId: the entityId of the hive
+ * @return: empty entity if not found else HiveQueryId
+ */
+ @Override
+ public HiveQueryId getHiveQueryIdByHiveEntityId(String hiveId) {
+ JSONObject entity = delegate.hiveQueryEntityByEntityId(hiveId);
+ return parseAtsHiveJob(entity);
+ }
+
+ @Override
+ public TezDagId getTezDAGByName(String name) {
+ JSONArray tezDagEntities = (JSONArray) delegate.tezDagByName(name).get("entities");
+ return parseTezDag(tezDagEntities);
+ }
+
+ @Override
+ public TezDagId getTezDAGByEntity(String entity) {
+ JSONArray tezDagEntities = (JSONArray) delegate.tezDagByEntity(entity).get("entities");
+ return parseTezDag(tezDagEntities);
+ }
+
+ /**
+ * fetches the HIVE_QUERY_ID from ATS for given user between given time period
+ *
+ * @param username: username for which to fetch hive query IDs
+ * @param startTime: time in miliseconds, inclusive
+ * @param endTime: time in miliseconds, exclusive
+ * @return: List of HIVE_QUERY_ID
+ */
+ @Override
+ public List<HiveQueryId> getHiveQueryIdsForUserByTime(String username, long startTime, long endTime) {
+ JSONObject entities = delegate.hiveQueryIdsForUserByTime(username, startTime, endTime);
+ return parseHqidJsonFromATS(entities);
+ }
+
+ @Override
+ public List<HiveQueryId> getHiveQueryIdByEntityList(List<String> hiveIds) {
+ List<HiveQueryId> hiveQueryIds = new LinkedList<>();
+ for (String id : hiveIds) {
+ HiveQueryId hqi = this.getHiveQueryIdByHiveEntityId(id);
+ if (null != hqi.entity) {
+ hiveQueryIds.add(hqi);
+ }
+ }
+ return hiveQueryIds;
+ }
+
+ private TezDagId parseTezDag(JSONArray tezDagEntities) {
+ assert tezDagEntities.size() <= 1;
+ if (tezDagEntities.size() == 0) {
+ return new TezDagId();
+ }
+ JSONObject tezDagEntity = (JSONObject) tezDagEntities.get(0);
+
+ TezDagId parsedDag = new TezDagId();
+ JSONArray applicationIds = (JSONArray) ((JSONObject) tezDagEntity.get("primaryfilters")).get("applicationId");
+ parsedDag.entity = (String) tezDagEntity.get("entity");
+ parsedDag.applicationId = (String) applicationIds.get(0);
+ parsedDag.status = (String) ((JSONObject) tezDagEntity.get("otherinfo")).get("status");
+ return parsedDag;
+ }
+
+ private HiveQueryId parseAtsHiveJob(JSONObject job) {
+ HiveQueryId parsedJob = new HiveQueryId();
+
+ parsedJob.entity = (String) job.get("entity");
+ parsedJob.url = delegate.hiveQueryIdDirectUrl((String) job.get("entity"));
+ parsedJob.starttime = ((Long) job.get("starttime"));
+
+ JSONObject primaryfilters = (JSONObject) job.get("primaryfilters");
+ JSONArray operationIds = (JSONArray) primaryfilters.get("operationid");
+ if (operationIds != null) {
+ parsedJob.operationId = (String) (operationIds).get(0);
+ }
+ JSONArray users = (JSONArray) primaryfilters.get("user");
+ if (users != null) {
+ parsedJob.user = (String) (users).get(0);
+ }
+
+ JSONObject lastEvent = getLastEvent(job);
+ long lastEventTimestamp = ((Long) lastEvent.get("timestamp"));
+
+ parsedJob.duration = (lastEventTimestamp - parsedJob.starttime) / MillisInSecond;
+
+ JSONObject otherinfo = (JSONObject) job.get("otherinfo");
+ if (otherinfo.get("QUERY") != null) { // workaround for HIVE-10829
+ JSONObject query = (JSONObject) JSONValue.parse((String) otherinfo.get("QUERY"));
+
+ parsedJob.query = (String) query.get("queryText");
+ JSONObject stages = (JSONObject) ((JSONObject) query.get("queryPlan")).get("STAGE PLANS");
+
+ List<String> dagIds = new LinkedList<String>();
+ List<JSONObject> stagesList = new LinkedList<JSONObject>();
+
+ for (Object key : stages.keySet()) {
+ JSONObject stage = (JSONObject) stages.get(key);
+ if (stage.get("Tez") != null) {
+ String dagId = (String) ((JSONObject) stage.get("Tez")).get("DagId:");
+ dagIds.add(dagId);
+ }
+ stagesList.add(stage);
+ }
+ parsedJob.dagNames = dagIds;
+ parsedJob.stages = stagesList;
+ }
+
+ if (otherinfo.get("VERSION") != null) {
+ parsedJob.version = (Long) otherinfo.get("VERSION");
+ }
+ return parsedJob;
+ }
+
+ private TezVertexId parseVertex(JSONObject vertex) {
+ TezVertexId tezVertexId = new TezVertexId();
+ tezVertexId.entity = (String)vertex.get("entity");
+ JSONObject otherinfo = (JSONObject)vertex.get("otherinfo");
+ if (otherinfo != null)
+ tezVertexId.vertexName = (String)otherinfo.get("vertexName");
+ return tezVertexId;
+ }
+
+ private JSONObject getLastEvent(JSONObject atsEntity) {
+ JSONArray events = (JSONArray) atsEntity.get("events");
+ return (JSONObject) events.get(0);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/853a1ce7/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/jobs/atsJobs/ATSParserFactory.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/jobs/atsJobs/ATSParserFactory.java b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/jobs/atsJobs/ATSParserFactory.java
new file mode 100644
index 0000000..343202e
--- /dev/null
+++ b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/jobs/atsJobs/ATSParserFactory.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.view.hive20.resources.jobs.atsJobs;
+
+import org.apache.ambari.view.ViewContext;
+
+public class ATSParserFactory {
+
+ private ViewContext context;
+
+ public ATSParserFactory(ViewContext context) {
+ this.context = context;
+ }
+
+ public ATSParser getATSParser() {
+ ATSRequestsDelegateImpl delegate = new ATSRequestsDelegateImpl(context, getATSUrl(context));
+ return new ATSParser(delegate);
+ }
+
+ public static String getATSUrl(ViewContext context) {
+ return context.getProperties().get("yarn.ats.url");
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/853a1ce7/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/jobs/atsJobs/ATSRequestsDelegate.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/jobs/atsJobs/ATSRequestsDelegate.java b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/jobs/atsJobs/ATSRequestsDelegate.java
new file mode 100644
index 0000000..dac42aa
--- /dev/null
+++ b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/resources/jobs/atsJobs/ATSRequestsDelegate.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.view.hive20.resources.jobs.atsJobs;
+
+import org.json.simple.JSONObject;
+
+public interface ATSRequestsDelegate {
+ String hiveQueryIdDirectUrl(String entity);
+
+ String hiveQueryIdOperationIdUrl(String operationId);
+
+ String tezDagDirectUrl(String entity);
+
+ String tezDagNameUrl(String name);
+
+ String tezVerticesListForDAGUrl(String dagId);
+
+ JSONObject hiveQueryIdsForUser(String username);
+
+ JSONObject hiveQueryIdByOperationId(String operationId);
+
+ JSONObject tezDagByName(String name);
+
+ JSONObject tezVerticesListForDAG(String dagId);
+
+ JSONObject tezDagByEntity(String entity);
+
+ JSONObject hiveQueryIdsForUserByTime(String username, long startTime, long endTime);
+
+ JSONObject hiveQueryEntityByEntityId(String hiveEntityId);
+}