You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by al...@apache.org on 2015/03/12 21:23:04 UTC
[4/5] ambari git commit: AMBARI-10035. Hive View: Retrieve history
from ATS (alexantonenko)
http://git-wip-us.apache.org/repos/asf/ambari/blob/8dbdbf66/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/JobControllerImpl.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/JobControllerImpl.java b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/JobControllerImpl.java
deleted file mode 100644
index a9f315c..0000000
--- a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/JobControllerImpl.java
+++ /dev/null
@@ -1,326 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ambari.view.hive.resources.jobs;
-
-import org.apache.ambari.view.ViewContext;
-import org.apache.ambari.view.hive.client.*;
-import org.apache.ambari.view.hive.persistence.utils.ItemNotFound;
-import org.apache.ambari.view.hive.resources.savedQueries.SavedQuery;
-import org.apache.ambari.view.hive.resources.savedQueries.SavedQueryResourceManager;
-import org.apache.ambari.view.hive.utils.*;
-import org.apache.ambari.view.hive.utils.HdfsApi;
-import org.apache.ambari.view.hive.utils.HdfsUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.lang.reflect.Proxy;
-import java.text.SimpleDateFormat;
-import java.util.*;
-
-public class JobControllerImpl implements JobController, ModifyNotificationDelegate {
- private final static Logger LOG =
- LoggerFactory.getLogger(JobControllerImpl.class);
-
- private ViewContext context;
- private Job jobUnproxied;
- private Job job;
- private boolean modified;
-
- private OperationHandleControllerFactory operationHandleControllerFactory;
- private ConnectionController hiveSession;
- private SavedQueryResourceManager savedQueryResourceManager;
-
- /**
- * JobController constructor
- * Warning: Create JobControllers ONLY using JobControllerFactory!
- */
- public JobControllerImpl(ViewContext context, Job job) {
- this.context = context;
- setJobPOJO(job);
- operationHandleControllerFactory = OperationHandleControllerFactory.getInstance(context);
- hiveSession = ConnectionController.getInstance(context);
- savedQueryResourceManager = SavedQueryResourceManager.getInstance(context);
- }
-
- public String getQueryForJob() {
- FilePaginator paginator = new FilePaginator(job.getQueryFile(), context);
- String query;
- try {
- query = paginator.readPage(0); //warning - reading only 0 page restricts size of query to 1MB
- } catch (IOException e) {
- throw new ServiceFormattedException("Error when reading file: " + e.toString(), e);
- } catch (InterruptedException e) {
- throw new ServiceFormattedException("Error when reading file: " + e.toString(), e);
- }
- return query;
- }
-
- private static final String DEFAULT_DB = "default";
- public String getJobDatabase() {
- if (job.getDataBase() != null) {
- return job.getDataBase();
- } else {
- return DEFAULT_DB;
- }
- }
-
- @Override
- public void submit() {
- setupHiveBeforeQueryExecute();
-
- String query = getQueryForJob();
- OperationHandleController handleController = hiveSession.executeQuery(query);
-
- handleController.persistHandleForJob(job);
- }
-
- private void setupHiveBeforeQueryExecute() {
- String database = getJobDatabase();
- hiveSession.selectDatabase(database);
- }
-
- @Override
- public void cancel() throws ItemNotFound {
- OperationHandleController handle = operationHandleControllerFactory.getHandleForJob(job);
- handle.cancel();
- }
-
- @Override
- public void onRead() {
- updateOperationStatus();
- updateOperationLogs();
-
- updateJobDuration();
- }
-
- public void updateOperationStatus() {
- try {
-
- OperationHandleController handle = operationHandleControllerFactory.getHandleForJob(job);
- String status = handle.getOperationStatus();
- job.setStatus(status);
- LOG.debug("Status of job#" + job.getId() + " is " + job.getStatus());
-
- } catch (NoOperationStatusSetException e) {
- LOG.info("Operation state is not set for job#" + job.getId());
-
- } catch (HiveErrorStatusException e) {
- LOG.debug("Error updating status for job#" + job.getId() + ": " + e.getMessage());
- job.setStatus(Job.JOB_STATE_UNKNOWN);
-
- } catch (HiveClientException e) {
- throw new ServiceFormattedException("Could not fetch job status " + job.getId(), e);
-
- } catch (ItemNotFound itemNotFound) {
- LOG.debug("No TOperationHandle for job#" + job.getId() + ", can't update status");
- }
- }
-
- public void updateOperationLogs() {
- try {
- OperationHandleController handle = operationHandleControllerFactory.getHandleForJob(job);
- String logs = handle.getLogs();
-
-// LogParser info = LogParser.parseLog(logs);
-
- String logFilePath = job.getLogFile();
- HdfsUtil.putStringToFile(context, logFilePath, logs);
-
- } catch (HiveClientRuntimeException ex) {
- LOG.error("Error while fetching logs: " + ex.getMessage());
- } catch (ItemNotFound itemNotFound) {
- LOG.debug("No TOperationHandle for job#" + job.getId() + ", can't read logs");
- }
- }
-
- public boolean isJobEnded() {
- String status = job.getStatus();
- return status.equals(Job.JOB_STATE_FINISHED) || status.equals(Job.JOB_STATE_CANCELED) ||
- status.equals(Job.JOB_STATE_CLOSED) || status.equals(Job.JOB_STATE_ERROR) ||
- status.equals(Job.JOB_STATE_UNKNOWN); // Unknown is not finished, but polling makes no sense
- }
-
- @Override
- public Job getJob() {
- return job;
- }
-
- /**
- * Use carefully. Returns unproxied bean object
- * @return unproxied bean object
- */
- @Override
- public Job getJobPOJO() {
- return jobUnproxied;
- }
-
- public void setJobPOJO(Job jobPOJO) {
- Job jobModifyNotificationProxy = (Job) Proxy.newProxyInstance(jobPOJO.getClass().getClassLoader(),
- new Class[]{Job.class},
- new ModifyNotificationInvocationHandler(jobPOJO, this));
- this.job = jobModifyNotificationProxy;
-
- this.jobUnproxied = jobPOJO;
- }
-
- @Override
- public Cursor getResults() throws ItemNotFound {
- OperationHandleController handle = operationHandleControllerFactory.getHandleForJob(job);
- return handle.getResults();
- }
-
- @Override
- public void afterCreation() {
- setupStatusDirIfNotPresent();
- setupQueryFileIfNotPresent();
- setupLogFileIfNotPresent();
-
- setCreationDate();
- }
-
- public void setupLogFileIfNotPresent() {
- if (job.getLogFile() == null || job.getLogFile().isEmpty()) {
- setupLogFile();
- }
- }
-
- public void setupQueryFileIfNotPresent() {
- if (job.getQueryFile() == null || job.getQueryFile().isEmpty()) {
- setupQueryFile();
- }
- }
-
- public void setupStatusDirIfNotPresent() {
- if (job.getStatusDir() == null || job.getStatusDir().isEmpty()) {
- setupStatusDir();
- }
- }
-
- private static final long MillisInSecond = 1000L;
-
- public void updateJobDuration() {
- job.setDuration(System.currentTimeMillis() / MillisInSecond - job.getDateSubmitted());
- }
-
- public void setCreationDate() {
- job.setDateSubmitted(System.currentTimeMillis() / MillisInSecond);
- }
-
-
-
- private void setupLogFile() {
- LOG.debug("Creating log file for job#" + job.getId());
-
- String logFile = job.getStatusDir() + "/" + "logs";
- HdfsUtil.putStringToFile(context, logFile, "");
-
- job.setLogFile(logFile);
- LOG.debug("Log file for job#" + job.getId() + ": " + logFile);
- }
-
- private void setupStatusDir() {
- String newDirPrefix = makeStatusDirectoryPrefix();
- String newDir = HdfsUtil.findUnallocatedFileName(context, newDirPrefix, "");
-
- job.setStatusDir(newDir);
- LOG.debug("Status dir for job#" + job.getId() + ": " + newDir);
- }
-
- private String makeStatusDirectoryPrefix() {
- String userScriptsPath = context.getProperties().get("jobs.dir");
-
- if (userScriptsPath == null) { // TODO: move check to initialization code
- String msg = "jobs.dir is not configured!";
- LOG.error(msg);
- throw new MisconfigurationFormattedException("jobs.dir");
- }
-
- String normalizedName = String.format("hive-job-%d", job.getId());
- String timestamp = new SimpleDateFormat("yyyy-MM-dd_hh-mm").format(new Date());
- return String.format(userScriptsPath +
- "/%s-%s", normalizedName, timestamp);
- }
-
- private void setupQueryFile() {
- String statusDir = job.getStatusDir();
- assert statusDir != null : "setupStatusDir() should be called first";
-
- String jobQueryFilePath = statusDir + "/" + "query.hql";
-
- try {
-
- if (job.getForcedContent() != null) {
-
- HdfsUtil.putStringToFile(context, jobQueryFilePath, job.getForcedContent());
- job.setForcedContent(""); // prevent forcedContent to be written to DB
-
- }
- else if (job.getQueryId() != null) {
-
- String savedQueryFile = getRelatedSavedQueryFile();
- HdfsApi.getInstance(context).copy(savedQueryFile, jobQueryFilePath);
- job.setQueryFile(jobQueryFilePath);
-
- } else {
-
- throw new BadRequestFormattedException("queryId or forcedContent should be passed!", null);
-
- }
-
- } catch (IOException e) {
- throw new ServiceFormattedException("Error in creation: " + e.toString(), e);
- } catch (InterruptedException e) {
- throw new ServiceFormattedException("Error in creation: " + e.toString(), e);
- }
- job.setQueryFile(jobQueryFilePath);
-
- LOG.debug("Query file for job#" + job.getId() + ": " + jobQueryFilePath);
- }
-
- private String getRelatedSavedQueryFile() {
- SavedQuery savedQuery;
- try {
- savedQuery = savedQueryResourceManager.read(job.getQueryId());
- } catch (ItemNotFound itemNotFound) {
- throw new BadRequestFormattedException("queryId not found!", itemNotFound);
- }
- return savedQuery.getQueryFile();
- }
-
- @Override
- public boolean onModification(Object object) {
- setModified(true);
- return true;
- }
-
- @Override
- public boolean isModified() {
- return modified;
- }
-
- public void setModified(boolean modified) {
- this.modified = modified;
- }
-
- @Override
- public void clearModified() {
- setModified(false);
- }
-}
http://git-wip-us.apache.org/repos/asf/ambari/blob/8dbdbf66/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/JobImpl.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/JobImpl.java b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/JobImpl.java
deleted file mode 100644
index 7d65957..0000000
--- a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/JobImpl.java
+++ /dev/null
@@ -1,203 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ambari.view.hive.resources.jobs;
-
-import org.apache.commons.beanutils.PropertyUtils;
-
-import java.lang.reflect.InvocationTargetException;
-import java.util.Map;
-
-/**
- * Bean to represent saved query
- */
-public class JobImpl implements Job {
- private String title = null;
- private String queryFile = null;
- private String statusDir = null;
- private Long dateSubmitted = 0L;
- private Long duration = 0L;
- private String status = JOB_STATE_UNKNOWN;
- private String forcedContent = null;
- private String dataBase = null;
- private Integer queryId = null;
-
- private Integer id = null;
- private String owner = null;
-
- private String logFile;
- private String confFile;
-
- public JobImpl() {}
- public JobImpl(Map<String, Object> stringObjectMap) throws InvocationTargetException, IllegalAccessException {
- for (Map.Entry<String, Object> entry : stringObjectMap.entrySet()) {
- try {
- PropertyUtils.setProperty(this, entry.getKey(), entry.getValue());
- } catch (NoSuchMethodException e) {
- //do nothing, skip
- }
- }
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) return true;
- if (!(o instanceof Job)) return false;
-
- JobImpl job = (JobImpl) o;
-
- if (id != null ? !id.equals(job.id) : job.id != null) return false;
-
- return true;
- }
-
- @Override
- public int hashCode() {
- return id != null ? id.hashCode() : 0;
- }
-
- @Override
- public Integer getId() {
- return id;
- }
-
- @Override
- public void setId(Integer id) {
- this.id = id;
- }
-
- @Override
- public String getOwner() {
- return owner;
- }
-
- @Override
- public void setOwner(String owner) {
- this.owner = owner;
- }
-
- @Override
- public String getTitle() {
- return title;
- }
-
- @Override
- public void setTitle(String title) {
- this.title = title;
- }
-
- @Override
- public String getQueryFile() {
- return queryFile;
- }
-
- @Override
- public void setQueryFile(String queryFile) {
- this.queryFile = queryFile;
- }
-
- @Override
- public Long getDateSubmitted() {
- return dateSubmitted;
- }
-
- @Override
- public void setDateSubmitted(Long dateSubmitted) {
- this.dateSubmitted = dateSubmitted;
- }
-
- @Override
- public Long getDuration() {
- return duration;
- }
-
- @Override
- public void setDuration(Long duration) {
- this.duration = duration;
- }
-
- @Override
- public String getStatus() {
- return status;
- }
-
- @Override
- public void setStatus(String status) {
- this.status = status;
- }
-
- @Override
- public String getForcedContent() {
- return forcedContent;
- }
-
- @Override
- public void setForcedContent(String forcedContent) {
- this.forcedContent = forcedContent;
- }
-
- @Override
- public Integer getQueryId() {
- return queryId;
- }
-
- @Override
- public void setQueryId(Integer queryId) {
- this.queryId = queryId;
- }
-
- @Override
- public String getStatusDir() {
- return statusDir;
- }
-
- @Override
- public void setStatusDir(String statusDir) {
- this.statusDir = statusDir;
- }
-
- @Override
- public String getDataBase() {
- return dataBase;
- }
-
- @Override
- public void setDataBase(String dataBase) {
- this.dataBase = dataBase;
- }
-
- @Override
- public String getLogFile() {
- return logFile;
- }
-
- @Override
- public void setLogFile(String logFile) {
- this.logFile = logFile;
- }
-
- @Override
- public String getConfFile() {
- return confFile;
- }
-
- @Override
- public void setConfFile(String confFile) {
- this.confFile = confFile;
- }
-}
http://git-wip-us.apache.org/repos/asf/ambari/blob/8dbdbf66/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/JobResourceManager.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/JobResourceManager.java b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/JobResourceManager.java
deleted file mode 100644
index 139b29a..0000000
--- a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/JobResourceManager.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ambari.view.hive.resources.jobs;
-
-import org.apache.ambari.view.ViewContext;
-import org.apache.ambari.view.hive.client.*;
-import org.apache.ambari.view.hive.persistence.utils.FilteringStrategy;
-import org.apache.ambari.view.hive.persistence.utils.ItemNotFound;
-import org.apache.ambari.view.hive.resources.PersonalCRUDResourceManager;
-import org.apache.ambari.view.hive.utils.*;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.*;
-
-/**
- * Object that provides CRUD operations for query objects
- */
-public class JobResourceManager extends PersonalCRUDResourceManager<Job> {
- private final static Logger LOG =
- LoggerFactory.getLogger(JobResourceManager.class);
-
- private JobControllerFactory jobControllerFactory;
-
- /**
- * Constructor
- * @param context View Context instance
- */
- public JobResourceManager(ViewContext context) {
- super(JobImpl.class, context);
- jobControllerFactory = JobControllerFactory.getInstance(context);
- }
-
- @Override
- public Job create(Job object) {
- super.create(object);
- JobController jobController = jobControllerFactory.createControllerForJob(object);
-
- try {
-
- jobController.afterCreation();
- saveIfModified(jobController);
-
- } catch (ServiceFormattedException e) {
- cleanupAfterErrorAndThrowAgain(object, e);
- }
-
- return object;
- }
-
- private void saveIfModified(JobController jobController) {
- if (jobController.isModified()) {
- save(jobController.getJobPOJO());
- jobController.clearModified();
- }
- }
-
-
- @Override
- public Job read(Integer id) throws ItemNotFound {
- Job job = super.read(id);
- JobController jobController = jobControllerFactory.createControllerForJob(job);
- jobController.onRead();
- saveIfModified(jobController);
- return job;
- }
-
- @Override
- public List<Job> readAll(FilteringStrategy filteringStrategy) {
- return super.readAll(filteringStrategy);
- }
-
- @Override
- public void delete(Integer resourceId) throws ItemNotFound {
- super.delete(resourceId);
- }
-
- public JobController readController(Integer id) throws ItemNotFound {
- Job job = read(id);
- return jobControllerFactory.createControllerForJob(job);
- }
-
- public Cursor getJobResultsCursor(Job job) {
- try {
- JobController jobController = jobControllerFactory.createControllerForJob(job);
- return jobController.getResults();
- } catch (ItemNotFound itemNotFound) {
- throw new NotFoundFormattedException("Job results are expired", null);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/ambari/blob/8dbdbf66/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/JobResourceProvider.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/JobResourceProvider.java b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/JobResourceProvider.java
index 780921d..460278e 100644
--- a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/JobResourceProvider.java
+++ b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/JobResourceProvider.java
@@ -22,6 +22,8 @@ import com.google.inject.Inject;
import org.apache.ambari.view.*;
import org.apache.ambari.view.hive.persistence.utils.ItemNotFound;
import org.apache.ambari.view.hive.persistence.utils.OnlyOwnersFilteringStrategy;
+import org.apache.ambari.view.hive.resources.jobs.viewJobs.*;
+import org.apache.ambari.view.hive.utils.SharedObjectsFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -43,7 +45,7 @@ public class JobResourceProvider implements ResourceProvider<Job> {
protected synchronized JobResourceManager getResourceManager() {
if (resourceManager == null) {
- resourceManager = new JobResourceManager(context);
+ resourceManager = new JobResourceManager(new SharedObjectsFactory(context), context);
}
return resourceManager;
}
@@ -51,7 +53,7 @@ public class JobResourceProvider implements ResourceProvider<Job> {
@Override
public Job getResource(String resourceId, Set<String> properties) throws SystemException, NoSuchResourceException, UnsupportedPropertyException {
try {
- return getResourceManager().read(Integer.valueOf(resourceId));
+ return getResourceManager().read(resourceId);
} catch (ItemNotFound itemNotFound) {
throw new NoSuchResourceException(resourceId);
}
@@ -74,7 +76,7 @@ public class JobResourceProvider implements ResourceProvider<Job> {
throw new SystemException("error on creating resource", e);
}
getResourceManager().create(item);
- JobController jobController = JobControllerFactory.getInstance(context).createControllerForJob(item);
+ JobController jobController = new SharedObjectsFactory(context).getJobControllerFactory().createControllerForJob(item);
jobController.submit();
}
@@ -89,7 +91,7 @@ public class JobResourceProvider implements ResourceProvider<Job> {
throw new SystemException("error on updating resource", e);
}
try {
- getResourceManager().update(item, Integer.valueOf(resourceId));
+ getResourceManager().update(item, resourceId);
} catch (ItemNotFound itemNotFound) {
throw new NoSuchResourceException(resourceId);
}
@@ -99,7 +101,7 @@ public class JobResourceProvider implements ResourceProvider<Job> {
@Override
public boolean deleteResource(String resourceId) throws SystemException, NoSuchResourceException, UnsupportedPropertyException {
try {
- getResourceManager().delete(Integer.valueOf(resourceId));
+ getResourceManager().delete(resourceId);
} catch (ItemNotFound itemNotFound) {
throw new NoSuchResourceException(resourceId);
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/8dbdbf66/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/JobService.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/JobService.java b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/JobService.java
index 1c4f2a9..d9c69e8 100644
--- a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/JobService.java
+++ b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/JobService.java
@@ -24,7 +24,11 @@ import org.apache.ambari.view.hive.BaseService;
import org.apache.ambari.view.hive.backgroundjobs.BackgroundJobController;
import org.apache.ambari.view.hive.client.Cursor;
import org.apache.ambari.view.hive.persistence.utils.ItemNotFound;
-import org.apache.ambari.view.hive.persistence.utils.OnlyOwnersFilteringStrategy;
+import org.apache.ambari.view.hive.resources.jobs.atsJobs.ATSRequestsDelegate;
+import org.apache.ambari.view.hive.resources.jobs.atsJobs.ATSRequestsDelegateImpl;
+import org.apache.ambari.view.hive.resources.jobs.atsJobs.ATSParser;
+import org.apache.ambari.view.hive.resources.jobs.atsJobs.IATSParser;
+import org.apache.ambari.view.hive.resources.jobs.viewJobs.*;
import org.apache.ambari.view.hive.utils.*;
import org.apache.ambari.view.hive.utils.HdfsApi;
import org.apache.commons.beanutils.PropertyUtils;
@@ -59,16 +63,25 @@ public class JobService extends BaseService {
ViewResourceHandler handler;
protected JobResourceManager resourceManager;
+ private IOperationHandleResourceManager opHandleResourceManager;
protected final static Logger LOG =
LoggerFactory.getLogger(JobService.class);
protected synchronized JobResourceManager getResourceManager() {
if (resourceManager == null) {
- resourceManager = new JobResourceManager(context);
+ SharedObjectsFactory connectionsFactory = getSharedObjectsFactory();
+ resourceManager = new JobResourceManager(connectionsFactory, context);
}
return resourceManager;
}
+ private IOperationHandleResourceManager getOperationHandleResourceManager() {
+ if (opHandleResourceManager == null) {
+ opHandleResourceManager = new OperationHandleResourceManager(getSharedObjectsFactory());
+ }
+ return opHandleResourceManager;
+ }
+
/**
* Get single item
*/
@@ -77,7 +90,7 @@ public class JobService extends BaseService {
@Produces(MediaType.APPLICATION_JSON)
public Response getOne(@PathParam("jobId") String jobId) {
try {
- JobController jobController = getResourceManager().readController(Integer.valueOf(jobId));
+ JobController jobController = getResourceManager().readController(jobId);
JSONObject jsonJob = jsonObjectFromJob(jobController);
@@ -110,7 +123,7 @@ public class JobService extends BaseService {
@Context HttpServletResponse response,
@QueryParam("columns") final String requestedColumns) {
try {
- JobController jobController = getResourceManager().readController(Integer.valueOf(jobId));
+ JobController jobController = getResourceManager().readController(jobId);
final Cursor resultSet = jobController.getResults();
resultSet.selectColumns(requestedColumns);
@@ -153,7 +166,7 @@ public class JobService extends BaseService {
@QueryParam("columns") final String requestedColumns,
@Context HttpServletResponse response) {
try {
- final JobController jobController = getResourceManager().readController(Integer.valueOf(jobId));
+ final JobController jobController = getResourceManager().readController(jobId);
String backgroundJobId = "csv" + String.valueOf(jobController.getJob().getId());
if (commence != null && commence.equals("true")) {
@@ -167,7 +180,7 @@ public class JobService extends BaseService {
Cursor resultSet = jobController.getResults();
resultSet.selectColumns(requestedColumns);
- FSDataOutputStream stream = HdfsApi.getInstance(context).create(targetFile, true);
+ FSDataOutputStream stream = getSharedObjectsFactory().getHdfsApi().create(targetFile, true);
Writer writer = new BufferedWriter(new OutputStreamWriter(stream));
CSVPrinter csvPrinter = new CSVPrinter(writer, CSVFormat.DEFAULT);
try {
@@ -225,7 +238,7 @@ public class JobService extends BaseService {
@QueryParam("searchId") String searchId,
@QueryParam("columns") final String requestedColumns) {
try {
- final JobController jobController = getResourceManager().readController(Integer.valueOf(jobId));
+ final JobController jobController = getResourceManager().readController(jobId);
return ResultsPaginationController.getInstance(context)
.request(jobId, searchId, true, fromBeginning, count,
@@ -276,13 +289,13 @@ public class JobService extends BaseService {
try {
JobController jobController;
try {
- jobController = getResourceManager().readController(Integer.valueOf(id));
+ jobController = getResourceManager().readController(id);
} catch (ItemNotFound itemNotFound) {
throw new NotFoundFormattedException(itemNotFound.getMessage(), itemNotFound);
}
jobController.cancel();
if (remove != null && remove.compareTo("true") == 0) {
- getResourceManager().delete(Integer.valueOf(id));
+ getResourceManager().delete(id);
}
// getResourceManager().delete(Integer.valueOf(queryId));
return Response.status(204).build();
@@ -303,8 +316,10 @@ public class JobService extends BaseService {
public Response getList() {
try {
LOG.debug("Getting all job");
- List allJobs = getResourceManager().readAll(
- new OnlyOwnersFilteringStrategy(this.context.getUsername())); //TODO: move strategy to PersonalCRUDRM
+ ATSRequestsDelegate transport = new ATSRequestsDelegateImpl(context, "http://127.0.0.1:8188");
+ IATSParser atsParser = new ATSParser(transport);
+ Aggregator aggregator = new Aggregator(getResourceManager(), getOperationHandleResourceManager(), atsParser);
+ List allJobs = aggregator.readAll(context.getUsername());
JSONObject object = new JSONObject();
object.put("jobs", allJobs);
http://git-wip-us.apache.org/repos/asf/ambari/blob/8dbdbf66/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/LogParser.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/LogParser.java b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/LogParser.java
index 090781c..3952491 100644
--- a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/LogParser.java
+++ b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/LogParser.java
@@ -23,29 +23,35 @@ import java.util.regex.Matcher;
import java.util.regex.Pattern;
public class LogParser {
- public static final Pattern HADOOP_MR_JOBS_RE = Pattern.compile("(http[^\\s]*/proxy/([a-z0-9_]+?)/)");
- public static final Pattern HADOOP_TEZ_JOBS_RE = Pattern.compile("\\(Executing on YARN cluster with App id ([a-z0-9_]+?)\\)");
- private LinkedHashSet<JobId> jobsList;
+ public static final Pattern HADOOP_MR_APPS_RE = Pattern.compile("(http[^\\s]*/proxy/([a-z0-9_]+?)/)");
+ public static final Pattern HADOOP_TEZ_APPS_RE = Pattern.compile("\\(Executing on YARN cluster with App id ([a-z0-9_]+?)\\)");
+ private LinkedHashSet<AppId> appsList;
+
+ private LogParser() {}
public static LogParser parseLog(String logs) {
LogParser parser = new LogParser();
- LinkedHashSet<JobId> mrJobIds = getMRJobIds(logs);
- LinkedHashSet<JobId> tezJobIds = getTezJobIds(logs);
+ parser.setAppsList(parseApps(logs, parser));
+ return parser;
+ }
- LinkedHashSet<JobId> jobIds = new LinkedHashSet<JobId>();
- jobIds.addAll(mrJobIds);
- jobIds.addAll(tezJobIds);
+ public static LinkedHashSet<AppId> parseApps(String logs, LogParser parser) {
+ LinkedHashSet<AppId> mrAppIds = getMRAppIds(logs);
+ LinkedHashSet<AppId> tezAppIds = getTezAppIds(logs);
- parser.setJobsList(jobIds);
- return parser;
+ LinkedHashSet<AppId> appIds = new LinkedHashSet<AppId>();
+ appIds.addAll(mrAppIds);
+ appIds.addAll(tezAppIds);
+
+ return appIds;
}
- private static LinkedHashSet<JobId> getMRJobIds(String logs) {
- Matcher m = HADOOP_MR_JOBS_RE.matcher(logs);
- LinkedHashSet<JobId> list = new LinkedHashSet<JobId>();
+ private static LinkedHashSet<AppId> getMRAppIds(String logs) {
+ Matcher m = HADOOP_MR_APPS_RE.matcher(logs);
+ LinkedHashSet<AppId> list = new LinkedHashSet<AppId>();
while (m.find()) {
- JobId applicationInfo = new JobId();
+ AppId applicationInfo = new AppId();
applicationInfo.setTrackingUrl(m.group(1));
applicationInfo.setIdentifier(m.group(2));
list.add(applicationInfo);
@@ -53,27 +59,34 @@ public class LogParser {
return list;
}
- private static LinkedHashSet<JobId> getTezJobIds(String logs) {
- Matcher m = HADOOP_TEZ_JOBS_RE.matcher(logs);
- LinkedHashSet<JobId> list = new LinkedHashSet<JobId>();
+ private static LinkedHashSet<AppId> getTezAppIds(String logs) {
+ Matcher m = HADOOP_TEZ_APPS_RE.matcher(logs);
+ LinkedHashSet<AppId> list = new LinkedHashSet<AppId>();
while (m.find()) {
- JobId applicationInfo = new JobId();
- applicationInfo.setTrackingUrl(null);
+ AppId applicationInfo = new AppId();
+ applicationInfo.setTrackingUrl("");
applicationInfo.setIdentifier(m.group(1));
list.add(applicationInfo);
}
return list;
}
- public void setJobsList(LinkedHashSet<JobId> jobsList) {
- this.jobsList = jobsList;
+ public void setAppsList(LinkedHashSet<AppId> appsList) {
+ this.appsList = appsList;
+ }
+
+ public LinkedHashSet<AppId> getAppsList() {
+ return appsList;
}
- public LinkedHashSet<JobId> getJobsList() {
- return jobsList;
+ public AppId getLastAppInList() {
+ Object[] appIds = appsList.toArray();
+ if (appIds.length == 0)
+ return null;
+ return (AppId) appIds[appsList.size()-1];
}
- public static class JobId {
+ public static class AppId {
private String trackingUrl;
private String identifier;
@@ -96,11 +109,11 @@ public class LogParser {
@Override
public boolean equals(Object o) {
if (this == o) return true;
- if (!(o instanceof JobId)) return false;
+ if (!(o instanceof AppId)) return false;
- JobId jobId = (JobId) o;
+ AppId appId = (AppId) o;
- if (!identifier.equals(jobId.identifier)) return false;
+ if (!identifier.equals(appId.identifier)) return false;
return true;
}
@@ -110,4 +123,16 @@ public class LogParser {
return identifier.hashCode();
}
}
+
+ public static class EmptyAppId extends AppId {
+ @Override
+ public String getTrackingUrl() {
+ return "";
+ }
+
+ @Override
+ public String getIdentifier() {
+ return "";
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/8dbdbf66/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/OperationHandleController.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/OperationHandleController.java b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/OperationHandleController.java
index 551ebdd..e146d55 100644
--- a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/OperationHandleController.java
+++ b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/OperationHandleController.java
@@ -19,10 +19,10 @@
package org.apache.ambari.view.hive.resources.jobs;
-import org.apache.ambari.view.ViewContext;
-import org.apache.ambari.view.hive.client.ConnectionPool;
import org.apache.ambari.view.hive.client.Cursor;
import org.apache.ambari.view.hive.client.HiveClientException;
+import org.apache.ambari.view.hive.client.IConnectionFactory;
+import org.apache.ambari.view.hive.resources.jobs.viewJobs.Job;
import org.apache.ambari.view.hive.utils.ServiceFormattedException;
import org.apache.hive.service.cli.thrift.TGetOperationStatusResp;
import org.apache.hive.service.cli.thrift.TOperationHandle;
@@ -33,18 +33,16 @@ public class OperationHandleController {
private final static Logger LOG =
LoggerFactory.getLogger(OperationHandleController.class);
- private ViewContext context;
+ private IConnectionFactory connectionsFabric;
private TOperationHandle operationHandle;
- private OperationHandleResourceManager operationHandlesStorage;
+ private IOperationHandleResourceManager operationHandlesStorage;
- public OperationHandleController(ViewContext context, TOperationHandle storedOperationHandle, OperationHandleResourceManager operationHandlesStorage) {
- this.context = context;
+ public OperationHandleController(IConnectionFactory connectionsFabric, TOperationHandle storedOperationHandle, IOperationHandleResourceManager operationHandlesStorage) {
+ this.connectionsFabric = connectionsFabric;
this.operationHandle = storedOperationHandle;
this.operationHandlesStorage = operationHandlesStorage;
}
-
-
public TOperationHandle getStoredOperationHandle() {
return operationHandle;
}
@@ -54,7 +52,7 @@ public class OperationHandleController {
}
public String getOperationStatus() throws NoOperationStatusSetException, HiveClientException {
- TGetOperationStatusResp statusResp = ConnectionPool.getConnection(context).getOperationStatus(operationHandle);
+ TGetOperationStatusResp statusResp = connectionsFabric.getHiveConnection().getOperationStatus(operationHandle);
if (!statusResp.isSetOperationState()) {
throw new NoOperationStatusSetException("Operation state is not set");
}
@@ -93,7 +91,7 @@ public class OperationHandleController {
public void cancel() {
try {
- ConnectionPool.getConnection(context).cancelOperation(operationHandle);
+ connectionsFabric.getHiveConnection().cancelOperation(operationHandle);
} catch (HiveClientException e) {
throw new ServiceFormattedException("Cancel failed: " + e.toString(), e);
}
@@ -104,10 +102,10 @@ public class OperationHandleController {
}
public String getLogs() {
- return ConnectionPool.getConnection(context).getLogs(operationHandle);
+ return connectionsFabric.getHiveConnection().getLogs(operationHandle);
}
public Cursor getResults() {
- return ConnectionPool.getConnection(context).getResults(operationHandle);
+ return connectionsFabric.getHiveConnection().getResults(operationHandle);
}
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/8dbdbf66/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/OperationHandleControllerFactory.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/OperationHandleControllerFactory.java b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/OperationHandleControllerFactory.java
index 5d4a8af..0310855 100644
--- a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/OperationHandleControllerFactory.java
+++ b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/OperationHandleControllerFactory.java
@@ -18,31 +18,22 @@
package org.apache.ambari.view.hive.resources.jobs;
-import org.apache.ambari.view.ViewContext;
import org.apache.ambari.view.hive.persistence.utils.ItemNotFound;
+import org.apache.ambari.view.hive.resources.jobs.viewJobs.Job;
+import org.apache.ambari.view.hive.utils.SharedObjectsFactory;
import org.apache.hive.service.cli.thrift.TOperationHandle;
-import java.util.HashMap;
-import java.util.Map;
-
public class OperationHandleControllerFactory {
- private ViewContext context;
- private OperationHandleResourceManager operationHandlesStorage;
-
- private OperationHandleControllerFactory(ViewContext context) {
- this.context = context;
- operationHandlesStorage = new OperationHandleResourceManager(context);
- }
+ private SharedObjectsFactory connectionsFabric;
+ private IOperationHandleResourceManager operationHandlesStorage;
- private static Map<String, OperationHandleControllerFactory> viewSingletonObjects = new HashMap<String, OperationHandleControllerFactory>();
- public static OperationHandleControllerFactory getInstance(ViewContext context) {
- if (!viewSingletonObjects.containsKey(context.getInstanceName()))
- viewSingletonObjects.put(context.getInstanceName(), new OperationHandleControllerFactory(context));
- return viewSingletonObjects.get(context.getInstanceName());
+ public OperationHandleControllerFactory(SharedObjectsFactory connectionsFabric) {
+ this.connectionsFabric = connectionsFabric;
+ operationHandlesStorage = new OperationHandleResourceManager(connectionsFabric);
}
public OperationHandleController createControllerForHandle(TOperationHandle storedOperationHandle) {
- return new OperationHandleController(context, storedOperationHandle, operationHandlesStorage);
+ return new OperationHandleController(connectionsFabric, storedOperationHandle, operationHandlesStorage);
}
public OperationHandleController getHandleForJob(Job job) throws ItemNotFound {
http://git-wip-us.apache.org/repos/asf/ambari/blob/8dbdbf66/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/OperationHandleResourceManager.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/OperationHandleResourceManager.java b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/OperationHandleResourceManager.java
index cffed38..5004677 100644
--- a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/OperationHandleResourceManager.java
+++ b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/OperationHandleResourceManager.java
@@ -18,42 +18,43 @@
package org.apache.ambari.view.hive.resources.jobs;
-import org.apache.ambari.view.ViewContext;
+import org.apache.ambari.view.hive.persistence.IStorageFactory;
import org.apache.ambari.view.hive.persistence.utils.FilteringStrategy;
import org.apache.ambari.view.hive.persistence.utils.Indexed;
import org.apache.ambari.view.hive.persistence.utils.ItemNotFound;
import org.apache.ambari.view.hive.resources.SharedCRUDResourceManager;
+import org.apache.ambari.view.hive.resources.jobs.viewJobs.Job;
import org.apache.ambari.view.hive.utils.ServiceFormattedException;
import org.apache.hive.service.cli.thrift.TOperationHandle;
-import sun.reflect.generics.reflectiveObjects.NotImplementedException;
import java.util.List;
-public class OperationHandleResourceManager extends SharedCRUDResourceManager<StoredOperationHandle> {
+public class OperationHandleResourceManager extends SharedCRUDResourceManager<StoredOperationHandle>
+ implements IOperationHandleResourceManager {
/**
* Constructor
- *
- * @param context View Context instance
*/
- public OperationHandleResourceManager(ViewContext context) {
- super(StoredOperationHandle.class, context);
+ public OperationHandleResourceManager(IStorageFactory storageFabric) {
+ super(StoredOperationHandle.class, storageFabric);
}
+ @Override
public List<StoredOperationHandle> readJobRelatedHandles(final Job job) {
- try {
- return getStorage().loadWhere(StoredOperationHandle.class, "jobId = " + job.getId());
- } catch (NotImplementedException e) {
- // fallback to filtering strategy
- return getStorage().loadAll(StoredOperationHandle.class, new FilteringStrategy() {
- @Override
- public boolean isConform(Indexed item) {
- StoredOperationHandle handle = (StoredOperationHandle) item;
- return (handle.getJobId() != null && handle.getJobId().equals(job.getId()));
- }
- });
- }
+ return storageFabric.getStorage().loadAll(StoredOperationHandle.class, new FilteringStrategy() {
+ @Override
+ public boolean isConform(Indexed item) {
+ StoredOperationHandle handle = (StoredOperationHandle) item;
+ return (handle.getJobId() != null && handle.getJobId().equals(job.getId()));
+ }
+
+ @Override
+ public String whereStatement() {
+ return "jobId = '" + job.getId() + "'";
+ }
+ });
}
+ @Override
public void putHandleForJob(TOperationHandle h, Job job) {
StoredOperationHandle handle = StoredOperationHandle.buildFromTOperationHandle(h);
handle.setJobId(job.getId());
@@ -71,11 +72,13 @@ public class OperationHandleResourceManager extends SharedCRUDResourceManager<St
}
}
+ @Override
public boolean containsHandleForJob(Job job) {
List<StoredOperationHandle> jobRelatedHandles = readJobRelatedHandles(job);
return jobRelatedHandles.size() > 0;
}
+ @Override
public TOperationHandle getHandleForJob(Job job) throws ItemNotFound {
List<StoredOperationHandle> jobRelatedHandles = readJobRelatedHandles(job);
if (jobRelatedHandles.size() == 0)
http://git-wip-us.apache.org/repos/asf/ambari/blob/8dbdbf66/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/StoredOperationHandle.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/StoredOperationHandle.java b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/StoredOperationHandle.java
index 193b226..1d3f6e0 100644
--- a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/StoredOperationHandle.java
+++ b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/StoredOperationHandle.java
@@ -40,9 +40,9 @@ public class StoredOperationHandle implements Indexed {
private String guid;
private String secret;
- private Integer jobId;
+ private String jobId;
- private Integer id;
+ private String id;
public StoredOperationHandle() {}
public StoredOperationHandle(Map<String, Object> stringObjectMap) throws InvocationTargetException, IllegalAccessException {
@@ -126,21 +126,21 @@ public class StoredOperationHandle implements Indexed {
this.secret = secret;
}
- public Integer getJobId() {
+ public String getJobId() {
return jobId;
}
- public void setJobId(Integer jobId) {
+ public void setJobId(String jobId) {
this.jobId = jobId;
}
@Override
- public Integer getId() {
+ public String getId() {
return id;
}
@Override
- public void setId(Integer id) {
+ public void setId(String id) {
this.id = id;
}
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/8dbdbf66/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/atsJobs/ATSParser.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/atsJobs/ATSParser.java b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/atsJobs/ATSParser.java
new file mode 100644
index 0000000..b644d4c
--- /dev/null
+++ b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/atsJobs/ATSParser.java
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.view.hive.resources.jobs.atsJobs;
+
+import org.apache.ambari.view.hive.utils.ServiceFormattedException;
+import org.json.simple.JSONArray;
+import org.json.simple.JSONObject;
+import org.json.simple.JSONValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.LinkedList;
+import java.util.List;
+
+public class ATSParser implements IATSParser {
+ protected final static Logger LOG =
+ LoggerFactory.getLogger(ATSParser.class);
+
+ private ATSRequestsDelegate delegate;
+
+ private static final long MillisInSecond = 1000L;
+
+ public ATSParser(ATSRequestsDelegate delegate) {
+ this.delegate = delegate;
+ }
+
+ @Override
+ public List<HiveQueryId> getHiveQuieryIdsList(String username) {
+ JSONObject entities = delegate.hiveQueryIdList(username);
+ JSONArray jobs = (JSONArray) entities.get("entities");
+
+ List<HiveQueryId> parsedJobs = new LinkedList<HiveQueryId>();
+ for(Object job : jobs) {
+ try {
+ HiveQueryId parsedJob = parseAtsHiveJob((JSONObject) job);
+ parsedJobs.add(parsedJob);
+ } catch (Exception ex) {
+ LOG.error("Error while parsing ATS job", ex);
+ }
+ }
+
+ return parsedJobs;
+ }
+
+ @Override
+ public HiveQueryId getHiveQuieryIdByOperationId(byte[] guid) {
+ String guidString = new String(guid);
+ JSONObject entities = delegate.hiveQueryIdByOperationId(guidString);
+ JSONArray jobs = (JSONArray) entities.get("entities");
+
+ assert jobs.size() <= 1;
+ if (jobs.size() == 0) {
+ //TODO: throw appropriate exception
+ throw new ServiceFormattedException("HIVE_QUERY_ID with operationid=" + guidString + " not found");
+ }
+
+ return parseAtsHiveJob((JSONObject) jobs.get(0));
+ }
+
+ @Override
+ public TezDagId getTezDAGByName(String name) {
+ JSONArray tezDagEntities = (JSONArray) delegate.tezDagByName(name).get("entities");
+ assert tezDagEntities.size() <= 1;
+ if (tezDagEntities.size() == 0) {
+ return new TezDagId();
+ }
+ JSONObject tezDagEntity = (JSONObject) tezDagEntities.get(0);
+
+ TezDagId parsedDag = new TezDagId();
+ JSONArray applicationIds = (JSONArray) ((JSONObject) tezDagEntity.get("primaryfilters")).get("applicationId");
+ parsedDag.applicationId = (String) applicationIds.get(0);
+ parsedDag.status = (String) ((JSONObject) tezDagEntity.get("otherinfo")).get("status");
+ return parsedDag;
+ }
+
+ private HiveQueryId parseAtsHiveJob(JSONObject job) {
+ HiveQueryId parsedJob = new HiveQueryId();
+
+ parsedJob.entity = (String) job.get("entity");
+ parsedJob.starttime = ((Long) job.get("starttime")) / MillisInSecond;
+
+ JSONObject primaryfilters = (JSONObject) job.get("primaryfilters");
+ JSONArray operationIds = (JSONArray) primaryfilters.get("operationid");
+ if (operationIds != null) {
+ parsedJob.operationId = (String) (operationIds).get(0);
+ }
+ JSONArray users = (JSONArray) primaryfilters.get("user");
+ if (users != null) {
+ parsedJob.user = (String) (users).get(0);
+ }
+
+ JSONObject lastEvent = getLastEvent(job);
+ long lastEventTimestamp = ((Long) lastEvent.get("timestamp")) / MillisInSecond;
+
+ parsedJob.duration = lastEventTimestamp - parsedJob.starttime;
+
+ JSONObject otherinfo = (JSONObject) job.get("otherinfo");
+ JSONObject query = (JSONObject) JSONValue.parse((String) otherinfo.get("QUERY"));
+
+ parsedJob.query = (String) query.get("queryText");
+ JSONObject stages = (JSONObject) ((JSONObject) query.get("queryPlan")).get("STAGE PLANS");
+
+ List<String> dagIds = new LinkedList<String>();
+ List<JSONObject> stagesList = new LinkedList<JSONObject>();
+
+ for (Object key : stages.keySet()) {
+ JSONObject stage = (JSONObject) stages.get(key);
+ if (stage.get("Tez") != null) {
+ String dagId = (String) ((JSONObject) stage.get("Tez")).get("DagName:");
+ dagIds.add(dagId);
+ }
+ stagesList.add(stage);
+ }
+ parsedJob.dagNames = dagIds;
+ parsedJob.stages = stagesList;
+ return parsedJob;
+ }
+
+ private JSONObject getLastEvent(JSONObject atsEntity) {
+ JSONArray events = (JSONArray) atsEntity.get("events");
+ return (JSONObject) events.get(0);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/8dbdbf66/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/atsJobs/ATSParserFactory.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/atsJobs/ATSParserFactory.java b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/atsJobs/ATSParserFactory.java
new file mode 100644
index 0000000..f5e9bcf
--- /dev/null
+++ b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/atsJobs/ATSParserFactory.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.view.hive.resources.jobs.atsJobs;
+
+import org.apache.ambari.view.ViewContext;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class ATSParserFactory {
+
+ private ViewContext context;
+
+ public ATSParserFactory(ViewContext context) {
+ this.context = context;
+ }
+
+ public ATSParser getATSParser() {
+ ATSRequestsDelegateImpl delegate = new ATSRequestsDelegateImpl(context, getATSUrl(context));
+ return new ATSParser(delegate);
+ }
+
+ public static String getATSUrl(ViewContext context) {
+ return context.getProperties().get("yarn.ats.url");
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/8dbdbf66/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/atsJobs/ATSRequestsDelegate.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/atsJobs/ATSRequestsDelegate.java b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/atsJobs/ATSRequestsDelegate.java
new file mode 100644
index 0000000..3aa07d4
--- /dev/null
+++ b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/atsJobs/ATSRequestsDelegate.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.view.hive.resources.jobs.atsJobs;
+
+import org.json.simple.JSONObject;
+
+public interface ATSRequestsDelegate {
+ JSONObject hiveQueryIdList(String username);
+
+ JSONObject hiveQueryIdByOperationId(String operationId);
+
+ JSONObject tezDagByName(String name);
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/8dbdbf66/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/atsJobs/ATSRequestsDelegateImpl.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/atsJobs/ATSRequestsDelegateImpl.java b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/atsJobs/ATSRequestsDelegateImpl.java
new file mode 100644
index 0000000..047bd63
--- /dev/null
+++ b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/atsJobs/ATSRequestsDelegateImpl.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.view.hive.resources.jobs.atsJobs;
+
+import org.apache.ambari.view.ViewContext;
+import org.apache.commons.io.IOUtils;
+import org.json.simple.JSONObject;
+import org.json.simple.JSONValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.HashMap;
+
+public class ATSRequestsDelegateImpl implements ATSRequestsDelegate {
+ protected final static Logger LOG =
+ LoggerFactory.getLogger(ATSRequestsDelegateImpl.class);
+ public static final String EMPTY_ENTITIES_JSON = "{ \"entities\" : [ ] }";
+
+ private ViewContext context;
+ private String atsUrl;
+
+ public ATSRequestsDelegateImpl(ViewContext context, String atsUrl) {
+ this.context = context;
+ this.atsUrl = atsUrl;
+ }
+
+ @Override
+ public JSONObject hiveQueryIdList(String username) {
+ String hiveQueriesListUrl = atsUrl + "/ws/v1/timeline/HIVE_QUERY_ID?primaryFilter=requestuser:" + username;
+ String response = readFromWithDefault(hiveQueriesListUrl, "{ \"entities\" : [ ] }");
+ return (JSONObject) JSONValue.parse(response);
+ }
+
+ @Override
+ public JSONObject hiveQueryIdByOperationId(String operationId) {
+ String hiveQueriesListUrl = atsUrl + "/ws/v1/timeline/HIVE_QUERY_ID?primaryFilter=operationid:" + operationId;
+ String response = readFromWithDefault(hiveQueriesListUrl, "{ \"entities\" : [ ] }");
+ return (JSONObject) JSONValue.parse(response);
+ }
+
+ @Override
+ public JSONObject tezDagByName(String name) {
+ String tezDagUrl = atsUrl + "/ws/v1/timeline/TEZ_DAG_ID?primaryFilter=dagName:" + name;
+ String response = readFromWithDefault(tezDagUrl, EMPTY_ENTITIES_JSON);
+ return (JSONObject) JSONValue.parse(response);
+ }
+
+ protected String readFromWithDefault(String hiveQueriesListUrl, String defaultResponse) {
+ String response;
+ try {
+ InputStream responseInputStream = context.getURLStreamProvider().readFrom(hiveQueriesListUrl, "GET",
+ null, new HashMap<String, String>());
+ response = IOUtils.toString(responseInputStream);
+ } catch (IOException e) {
+ LOG.error("Error while reading from ATS", e);
+ response = defaultResponse;
+ }
+ return response;
+ }
+
+ public String getAtsUrl() {
+ return atsUrl;
+ }
+
+ public void setAtsUrl(String atsUrl) {
+ this.atsUrl = atsUrl;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/8dbdbf66/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/atsJobs/HiveQueryId.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/atsJobs/HiveQueryId.java b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/atsJobs/HiveQueryId.java
new file mode 100644
index 0000000..edb726b
--- /dev/null
+++ b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/atsJobs/HiveQueryId.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.view.hive.resources.jobs.atsJobs;
+
+import org.json.simple.JSONObject;
+
+import java.util.List;
+
+public class HiveQueryId {
+ public String entity;
+ public String query;
+
+ public List<String> dagNames;
+
+ public List<JSONObject> stages;
+
+ public long starttime;
+ public long duration;
+ public String operationId;
+ public String user;
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/8dbdbf66/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/atsJobs/IATSParser.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/atsJobs/IATSParser.java b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/atsJobs/IATSParser.java
new file mode 100644
index 0000000..d029fdc
--- /dev/null
+++ b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/atsJobs/IATSParser.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.view.hive.resources.jobs.atsJobs;
+
+import java.util.List;
+
+public interface IATSParser {
+ List<HiveQueryId> getHiveQuieryIdsList(String username);
+
+ HiveQueryId getHiveQuieryIdByOperationId(byte[] guid);
+
+ TezDagId getTezDAGByName(String name);
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/8dbdbf66/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/atsJobs/TezDagId.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/atsJobs/TezDagId.java b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/atsJobs/TezDagId.java
new file mode 100644
index 0000000..061c51c
--- /dev/null
+++ b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/atsJobs/TezDagId.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.view.hive.resources.jobs.atsJobs;
+
+public class TezDagId {
+ public static final String STATUS_UNKNOWN = "UNKNOWN";
+ public String applicationId = "";
+ public String dagName = "";
+ public String status = STATUS_UNKNOWN;
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/8dbdbf66/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/viewJobs/IJobControllerFactory.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/viewJobs/IJobControllerFactory.java b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/viewJobs/IJobControllerFactory.java
new file mode 100644
index 0000000..89fbb85
--- /dev/null
+++ b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/viewJobs/IJobControllerFactory.java
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.view.hive.resources.jobs.viewJobs;
+
+public interface IJobControllerFactory {
+ JobController createControllerForJob(Job job);
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/8dbdbf66/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/viewJobs/Job.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/viewJobs/Job.java b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/viewJobs/Job.java
new file mode 100644
index 0000000..004932c
--- /dev/null
+++ b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/viewJobs/Job.java
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.view.hive.resources.jobs.viewJobs;
+
+
+import org.apache.ambari.view.hive.persistence.utils.Indexed;
+import org.apache.ambari.view.hive.persistence.utils.PersonalResource;
+
+import java.io.Serializable;
+
+/**
+ * Interface for Job bean to create Proxy for it
+ */
+public interface Job extends Serializable,Indexed,PersonalResource {
+ public static final String JOB_STATE_UNKNOWN = "Unknown";
+ public static final String JOB_STATE_INITIALIZED = "Initialized";
+ public static final String JOB_STATE_RUNNING = "Running";
+ public static final String JOB_STATE_FINISHED = "Succeeded";
+ public static final String JOB_STATE_CANCELED = "Canceled";
+ public static final String JOB_STATE_CLOSED = "Closed";
+ public static final String JOB_STATE_ERROR = "Error";
+ public static final String JOB_STATE_PENDING = "Pending";
+
+ String getId();
+
+ void setId(String id);
+
+ String getOwner();
+
+ void setOwner(String owner);
+
+ String getTitle();
+
+ void setTitle(String title);
+
+ String getQueryFile();
+
+ void setQueryFile(String queryFile);
+
+ Long getDateSubmitted();
+
+ void setDateSubmitted(Long dateSubmitted);
+
+ Long getDuration();
+
+ void setDuration(Long duration);
+
+ String getStatus();
+
+ void setStatus(String status);
+
+ String getForcedContent();
+
+ void setForcedContent(String forcedContent);
+
+ String getQueryId();
+
+ void setQueryId(String queryId);
+
+ String getStatusDir();
+
+ void setStatusDir(String statusDir);
+
+ String getDataBase();
+
+ void setDataBase(String dataBase);
+
+ String getLogFile();
+
+ void setLogFile(String logFile);
+
+ String getConfFile();
+
+ void setConfFile(String confFile);
+
+ String getApplicationId();
+
+ void setApplicationId(String applicationId);
+
+ String getDagName();
+
+ void setDagName(String DagName);
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/8dbdbf66/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/viewJobs/JobController.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/viewJobs/JobController.java b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/viewJobs/JobController.java
new file mode 100644
index 0000000..339e194
--- /dev/null
+++ b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/viewJobs/JobController.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.view.hive.resources.jobs.viewJobs;
+
+import org.apache.ambari.view.hive.client.Cursor;
+import org.apache.ambari.view.hive.persistence.utils.ItemNotFound;
+
+public interface JobController {
+ void submit();
+
+ void cancel() throws ItemNotFound;
+
+ Job getJob();
+
+ /**
+ * Use carefully. Returns unproxied bean object
+ * @return unproxied bean object
+ */
+ Job getJobPOJO();
+
+ Cursor getResults() throws ItemNotFound;
+
+ void afterCreation();
+
+ void update();
+
+ boolean isModified();
+
+ void clearModified();
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/8dbdbf66/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/viewJobs/JobControllerFactory.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/viewJobs/JobControllerFactory.java b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/viewJobs/JobControllerFactory.java
new file mode 100644
index 0000000..12d1cdb
--- /dev/null
+++ b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/viewJobs/JobControllerFactory.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.view.hive.resources.jobs.viewJobs;
+
+import org.apache.ambari.view.ViewContext;
+import org.apache.ambari.view.hive.utils.SharedObjectsFactory;
+
+public class JobControllerFactory implements IJobControllerFactory {
+ private SharedObjectsFactory sharedObjectsFactory;
+ private ViewContext context;
+
+ public JobControllerFactory(ViewContext context, SharedObjectsFactory sharedObjectsFactory) {
+ this.sharedObjectsFactory = sharedObjectsFactory;
+ this.context = context;
+ }
+
+ @Override
+ public JobController createControllerForJob(Job job) {
+ return new JobControllerImpl(context, job,
+ sharedObjectsFactory.getHiveConnectionController(),
+ sharedObjectsFactory.getOperationHandleControllerFactory(),
+ sharedObjectsFactory.getSavedQueryResourceManager(),
+ sharedObjectsFactory.getATSParser(),
+ sharedObjectsFactory.getHdfsApi());
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/8dbdbf66/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/viewJobs/JobControllerImpl.java
----------------------------------------------------------------------
diff --git a/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/viewJobs/JobControllerImpl.java b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/viewJobs/JobControllerImpl.java
new file mode 100644
index 0000000..a100f3d
--- /dev/null
+++ b/contrib/views/hive/src/main/java/org/apache/ambari/view/hive/resources/jobs/viewJobs/JobControllerImpl.java
@@ -0,0 +1,343 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.view.hive.resources.jobs.viewJobs;
+
+import org.apache.ambari.view.ViewContext;
+import org.apache.ambari.view.hive.client.*;
+import org.apache.ambari.view.hive.persistence.utils.ItemNotFound;
+import org.apache.ambari.view.hive.resources.jobs.*;
+import org.apache.ambari.view.hive.resources.jobs.atsJobs.IATSParser;
+import org.apache.ambari.view.hive.resources.savedQueries.SavedQuery;
+import org.apache.ambari.view.hive.resources.savedQueries.SavedQueryResourceManager;
+import org.apache.ambari.view.hive.utils.*;
+import org.apache.ambari.view.hive.utils.HdfsApi;
+import org.apache.ambari.view.hive.utils.HdfsUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.lang.reflect.Proxy;
+import java.text.SimpleDateFormat;
+import java.util.*;
+
+public class JobControllerImpl implements JobController, ModifyNotificationDelegate {
+ private final static Logger LOG =
+ LoggerFactory.getLogger(JobControllerImpl.class);
+
+ private ViewContext context;
+ private HdfsApi hdfsApi;
+ private Job jobUnproxied;
+ private Job job;
+ private boolean modified;
+
+ private OperationHandleControllerFactory opHandleControllerFactory;
+ private ConnectionController hiveSession;
+ private SavedQueryResourceManager savedQueryResourceManager;
+ private IATSParser atsParser;
+
+ /**
+ * JobController constructor
+ * Warning: Create JobControllers ONLY using JobControllerFactory!
+ */
+ public JobControllerImpl(ViewContext context, Job job,
+ ConnectionController hiveSession,
+ OperationHandleControllerFactory opHandleControllerFactory,
+ SavedQueryResourceManager savedQueryResourceManager,
+ IATSParser atsParser,
+ HdfsApi hdfsApi) {
+ this.context = context;
+ setJobPOJO(job);
+ this.opHandleControllerFactory = opHandleControllerFactory;
+ this.hiveSession = hiveSession;
+ this.savedQueryResourceManager = savedQueryResourceManager;
+ this.atsParser = atsParser;
+ this.hdfsApi = hdfsApi;
+ }
+
+ public String getQueryForJob() {
+ FilePaginator paginator = new FilePaginator(job.getQueryFile(), hdfsApi);
+ String query;
+ try {
+ query = paginator.readPage(0); //warning - reading only 0 page restricts size of query to 1MB
+ } catch (IOException e) {
+ throw new ServiceFormattedException("Error when reading file: " + e.toString(), e);
+ } catch (InterruptedException e) {
+ throw new ServiceFormattedException("Error when reading file: " + e.toString(), e);
+ }
+ return query;
+ }
+
+ private static final String DEFAULT_DB = "default";
+ public String getJobDatabase() {
+ if (job.getDataBase() != null) {
+ return job.getDataBase();
+ } else {
+ return DEFAULT_DB;
+ }
+ }
+
+ @Override
+ public void submit() {
+ setupHiveBeforeQueryExecute();
+
+ String query = getQueryForJob();
+ OperationHandleController handleController = hiveSession.executeQuery(query);
+
+ handleController.persistHandleForJob(job);
+
+// atsParser.getHiveQuieryIdsList()
+ }
+
+ private void setupHiveBeforeQueryExecute() {
+ String database = getJobDatabase();
+ hiveSession.selectDatabase(database);
+ }
+
+ @Override
+ public void cancel() throws ItemNotFound {
+ OperationHandleController handle = opHandleControllerFactory.getHandleForJob(job);
+ handle.cancel();
+ }
+
+ @Override
+ public void update() {
+ updateOperationStatus();
+ updateOperationLogs();
+
+ updateJobDuration();
+ }
+
+ public void updateOperationStatus() {
+ try {
+
+ OperationHandleController handle = opHandleControllerFactory.getHandleForJob(job);
+ String status = handle.getOperationStatus();
+ job.setStatus(status);
+ LOG.debug("Status of job#" + job.getId() + " is " + job.getStatus());
+
+ } catch (NoOperationStatusSetException e) {
+ LOG.info("Operation state is not set for job#" + job.getId());
+
+ } catch (HiveErrorStatusException e) {
+ LOG.debug("Error updating status for job#" + job.getId() + ": " + e.getMessage());
+ job.setStatus(Job.JOB_STATE_UNKNOWN);
+
+ } catch (HiveClientException e) {
+ throw new ServiceFormattedException("Could not fetch job status " + job.getId(), e);
+
+ } catch (ItemNotFound itemNotFound) {
+ LOG.debug("No TOperationHandle for job#" + job.getId() + ", can't update status");
+ }
+ }
+
+ public void updateOperationLogs() {
+ try {
+ OperationHandleController handle = opHandleControllerFactory.getHandleForJob(job);
+ String logs = handle.getLogs();
+
+ LogParser info = LogParser.parseLog(logs);
+ LogParser.AppId app = info.getLastAppInList();
+ if (app != null) {
+ job.setApplicationId(app.getIdentifier());
+ }
+
+ String logFilePath = job.getLogFile();
+ HdfsUtil.putStringToFile(hdfsApi, logFilePath, logs);
+
+ } catch (HiveClientRuntimeException ex) {
+ LOG.error("Error while fetching logs: " + ex.getMessage());
+ } catch (ItemNotFound itemNotFound) {
+ LOG.debug("No TOperationHandle for job#" + job.getId() + ", can't read logs");
+ }
+ }
+
+ public boolean isJobEnded() {
+ String status = job.getStatus();
+ return status.equals(Job.JOB_STATE_FINISHED) || status.equals(Job.JOB_STATE_CANCELED) ||
+ status.equals(Job.JOB_STATE_CLOSED) || status.equals(Job.JOB_STATE_ERROR) ||
+ status.equals(Job.JOB_STATE_UNKNOWN); // Unknown is not finished, but polling makes no sense
+ }
+
+ @Override
+ public Job getJob() {
+ return job;
+ }
+
+ /**
+ * Use carefully. Returns unproxied bean object
+ * @return unproxied bean object
+ */
+ @Override
+ public Job getJobPOJO() {
+ return jobUnproxied;
+ }
+
+ public void setJobPOJO(Job jobPOJO) {
+ Job jobModifyNotificationProxy = (Job) Proxy.newProxyInstance(jobPOJO.getClass().getClassLoader(),
+ new Class[]{Job.class},
+ new ModifyNotificationInvocationHandler(jobPOJO, this));
+ this.job = jobModifyNotificationProxy;
+
+ this.jobUnproxied = jobPOJO;
+ }
+
+ @Override
+ public Cursor getResults() throws ItemNotFound {
+ OperationHandleController handle = opHandleControllerFactory.getHandleForJob(job);
+ return handle.getResults();
+ }
+
+ @Override
+ public void afterCreation() {
+ setupStatusDirIfNotPresent();
+ setupQueryFileIfNotPresent();
+ setupLogFileIfNotPresent();
+
+ setCreationDate();
+ }
+
+ public void setupLogFileIfNotPresent() {
+ if (job.getLogFile() == null || job.getLogFile().isEmpty()) {
+ setupLogFile();
+ }
+ }
+
+ public void setupQueryFileIfNotPresent() {
+ if (job.getQueryFile() == null || job.getQueryFile().isEmpty()) {
+ setupQueryFile();
+ }
+ }
+
+ public void setupStatusDirIfNotPresent() {
+ if (job.getStatusDir() == null || job.getStatusDir().isEmpty()) {
+ setupStatusDir();
+ }
+ }
+
+ private static final long MillisInSecond = 1000L;
+
+ public void updateJobDuration() {
+ job.setDuration(System.currentTimeMillis() / MillisInSecond - job.getDateSubmitted());
+ }
+
+ public void setCreationDate() {
+ job.setDateSubmitted(System.currentTimeMillis() / MillisInSecond);
+ }
+
+
+
+ private void setupLogFile() {
+ LOG.debug("Creating log file for job#" + job.getId());
+
+ String logFile = job.getStatusDir() + "/" + "logs";
+ HdfsUtil.putStringToFile(hdfsApi, logFile, "");
+
+ job.setLogFile(logFile);
+ LOG.debug("Log file for job#" + job.getId() + ": " + logFile);
+ }
+
+ private void setupStatusDir() {
+ String newDirPrefix = makeStatusDirectoryPrefix();
+ String newDir = HdfsUtil.findUnallocatedFileName(hdfsApi, newDirPrefix, "");
+
+ job.setStatusDir(newDir);
+ LOG.debug("Status dir for job#" + job.getId() + ": " + newDir);
+ }
+
+ private String makeStatusDirectoryPrefix() {
+ String userScriptsPath = context.getProperties().get("jobs.dir");
+
+ if (userScriptsPath == null) { // TODO: move check to initialization code
+ String msg = "jobs.dir is not configured!";
+ LOG.error(msg);
+ throw new MisconfigurationFormattedException("jobs.dir");
+ }
+
+ String normalizedName = String.format("hive-job-%s", job.getId());
+ String timestamp = new SimpleDateFormat("yyyy-MM-dd_hh-mm").format(new Date());
+ return String.format(userScriptsPath +
+ "/%s-%s", normalizedName, timestamp);
+ }
+
+ private void setupQueryFile() {
+ String statusDir = job.getStatusDir();
+ assert statusDir != null : "setupStatusDir() should be called first";
+
+ String jobQueryFilePath = statusDir + "/" + "query.hql";
+
+ try {
+
+ if (job.getForcedContent() != null) {
+
+ HdfsUtil.putStringToFile(hdfsApi, jobQueryFilePath, job.getForcedContent());
+ job.setForcedContent(""); // prevent forcedContent to be written to DB
+
+ }
+ else if (job.getQueryId() != null) {
+
+ String savedQueryFile = getRelatedSavedQueryFile();
+ hdfsApi.copy(savedQueryFile, jobQueryFilePath);
+ job.setQueryFile(jobQueryFilePath);
+
+ } else {
+
+ throw new BadRequestFormattedException("queryId or forcedContent should be passed!", null);
+
+ }
+
+ } catch (IOException e) {
+ throw new ServiceFormattedException("Error in creation: " + e.toString(), e);
+ } catch (InterruptedException e) {
+ throw new ServiceFormattedException("Error in creation: " + e.toString(), e);
+ }
+ job.setQueryFile(jobQueryFilePath);
+
+ LOG.debug("Query file for job#" + job.getId() + ": " + jobQueryFilePath);
+ }
+
+ private String getRelatedSavedQueryFile() {
+ SavedQuery savedQuery;
+ try {
+ savedQuery = savedQueryResourceManager.read(job.getQueryId());
+ } catch (ItemNotFound itemNotFound) {
+ throw new BadRequestFormattedException("queryId not found!", itemNotFound);
+ }
+ return savedQuery.getQueryFile();
+ }
+
+ @Override
+ public boolean onModification(Object object) {
+ setModified(true);
+ return true;
+ }
+
+ @Override
+ public boolean isModified() {
+ return modified;
+ }
+
+ public void setModified(boolean modified) {
+ this.modified = modified;
+ }
+
+ @Override
+ public void clearModified() {
+ setModified(false);
+ }
+}