You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by ni...@apache.org on 2016/12/23 10:43:02 UTC
[2/2] ambari git commit: AMBARI-19256 : Asset support Rest API
(Belliraj HB via nitirajrathore)
AMBARI-19256 : Asset support Rest API (Belliraj HB via nitirajrathore)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/47a98824
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/47a98824
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/47a98824
Branch: refs/heads/trunk
Commit: 47a98824566ad3015b9c9655a53b34e171f43ae9
Parents: e900093
Author: Nitiraj Rathore <ni...@gmail.com>
Authored: Fri Dec 23 16:11:12 2016 +0530
Committer: Nitiraj Rathore <ni...@gmail.com>
Committed: Fri Dec 23 16:12:35 2016 +0530
----------------------------------------------------------------------
.../ambari/view/AssetDefinitionRefType.java | 23 +
.../org/apache/oozie/ambari/view/Constants.java | 25 +
.../apache/oozie/ambari/view/EntityStatus.java | 23 +
.../apache/oozie/ambari/view/OozieDelegate.java | 243 ++++
.../ambari/view/OozieProxyImpersonator.java | 1087 ++++++++----------
.../apache/oozie/ambari/view/OozieUtils.java | 226 ++--
.../ambari/view/ServiceFormattedException.java | 53 +
.../oozie/ambari/view/WorkflowFilesService.java | 176 +--
.../ambari/view/assets/AssetDefinitionRepo.java | 29 +
.../oozie/ambari/view/assets/AssetRepo.java | 37 +-
.../oozie/ambari/view/assets/AssetResource.java | 197 ++++
.../oozie/ambari/view/assets/AssetService.java | 102 +-
.../ambari/view/assets/model/ActionAsset.java | 112 +-
.../assets/model/ActionAssetDefinition.java | 42 +
.../view/assets/model/AssetDefintion.java | 69 ++
.../oozie/ambari/view/model/APIResult.java | 63 +
.../oozie/ambari/view/model/BaseModel.java | 61 +-
.../apache/oozie/ambari/view/model/Indexed.java | 24 +
.../apache/oozie/ambari/view/model/Paging.java | 30 +
.../apache/oozie/ambari/view/model/When.java | 28 +
.../apache/oozie/ambari/view/repo/BaseRepo.java | 113 ++
.../workflowmanager/WorkflowManagerService.java | 94 +-
.../WorkflowsManagerResource.java | 8 +-
.../view/workflowmanager/WorkflowsRepo.java | 63 +-
.../view/workflowmanager/model/Workflow.java | 133 +--
.../views/wfmanager/src/main/resources/view.xml | 17 +
26 files changed, 2006 insertions(+), 1072 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ambari/blob/47a98824/contrib/views/wfmanager/src/main/java/org/apache/oozie/ambari/view/AssetDefinitionRefType.java
----------------------------------------------------------------------
diff --git a/contrib/views/wfmanager/src/main/java/org/apache/oozie/ambari/view/AssetDefinitionRefType.java b/contrib/views/wfmanager/src/main/java/org/apache/oozie/ambari/view/AssetDefinitionRefType.java
new file mode 100644
index 0000000..8c96504
--- /dev/null
+++ b/contrib/views/wfmanager/src/main/java/org/apache/oozie/ambari/view/AssetDefinitionRefType.java
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.ambari.view;
+
+public enum AssetDefinitionRefType {
+ HDFS,
+ DB
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/47a98824/contrib/views/wfmanager/src/main/java/org/apache/oozie/ambari/view/Constants.java
----------------------------------------------------------------------
diff --git a/contrib/views/wfmanager/src/main/java/org/apache/oozie/ambari/view/Constants.java b/contrib/views/wfmanager/src/main/java/org/apache/oozie/ambari/view/Constants.java
new file mode 100644
index 0000000..238b002
--- /dev/null
+++ b/contrib/views/wfmanager/src/main/java/org/apache/oozie/ambari/view/Constants.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.ambari.view;
+
+public class Constants {
+ public static final String STATUS_FAILED = "failed";
+ public static final String STATUS_OK = "ok";
+ public static final String STATUS_KEY = "status";
+ public static final String MESSAGE_KEY = "message";
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/47a98824/contrib/views/wfmanager/src/main/java/org/apache/oozie/ambari/view/EntityStatus.java
----------------------------------------------------------------------
diff --git a/contrib/views/wfmanager/src/main/java/org/apache/oozie/ambari/view/EntityStatus.java b/contrib/views/wfmanager/src/main/java/org/apache/oozie/ambari/view/EntityStatus.java
new file mode 100644
index 0000000..6447cf2
--- /dev/null
+++ b/contrib/views/wfmanager/src/main/java/org/apache/oozie/ambari/view/EntityStatus.java
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.ambari.view;
+
+public enum EntityStatus {
+ DRAFT,
+ PUBLISHED
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ambari/blob/47a98824/contrib/views/wfmanager/src/main/java/org/apache/oozie/ambari/view/OozieDelegate.java
----------------------------------------------------------------------
diff --git a/contrib/views/wfmanager/src/main/java/org/apache/oozie/ambari/view/OozieDelegate.java b/contrib/views/wfmanager/src/main/java/org/apache/oozie/ambari/view/OozieDelegate.java
new file mode 100644
index 0000000..2779f05
--- /dev/null
+++ b/contrib/views/wfmanager/src/main/java/org/apache/oozie/ambari/view/OozieDelegate.java
@@ -0,0 +1,243 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.ambari.view;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import javax.ws.rs.HttpMethod;
+import javax.ws.rs.core.HttpHeaders;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.MultivaluedMap;
+import javax.ws.rs.core.Response;
+
+import org.apache.ambari.view.ViewContext;
+import org.apache.commons.io.IOUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class OozieDelegate {
+ private final static Logger LOGGER = LoggerFactory
+ .getLogger(OozieDelegate.class);
+ private static final String OOZIEPARAM_PREFIX = "oozieparam.";
+ private static final int OOZIEPARAM_PREFIX_LENGTH = OOZIEPARAM_PREFIX
+ .length();
+ private static final String EQUAL_SYMBOL = "=";
+ private static final String OOZIE_WF_RERUN_FAILNODES_CONF_KEY = "oozie.wf.rerun.failnodes";
+ private static final String OOZIE_USE_SYSTEM_LIBPATH_CONF_KEY = "oozie.use.system.libpath";
+ private static final String USER_NAME_HEADER = "user.name";
+ private static final String USER_OOZIE_SUPER = "oozie";
+ private static final String DO_AS_HEADER = "doAs";
+ private static final String SERVICE_URI_PROP = "oozie.service.uri";
+ private static final String DEFAULT_SERVICE_URI = "http://sandbox.hortonworks.com:11000/oozie";
+
+ private ViewContext viewContext;
+
+ private OozieUtils oozieUtils = new OozieUtils();
+ private final Utils utils = new Utils();
+ private final AmbariIOUtil ambariIOUtil;
+
+ public OozieDelegate(ViewContext viewContext) {
+ super();
+ this.viewContext = viewContext;
+ this.ambariIOUtil = new AmbariIOUtil(viewContext);
+ }
+
+ public String submitWorkflowJobToOozie(HttpHeaders headers,
+ String filePath, MultivaluedMap<String, String> queryParams,
+ JobType jobType) {
+ String nameNode = "hdfs://"
+ + viewContext.getCluster().getConfigurationValue("hdfs-site",
+ "dfs.namenode.rpc-address");
+
+ if (!queryParams.containsKey("config.nameNode")) {
+ ArrayList<String> nameNodes = new ArrayList<String>();
+ LOGGER.info("Namenode===" + nameNode);
+ nameNodes.add(nameNode);
+ queryParams.put("config.nameNode", nameNodes);
+ }
+
+ Map<String, String> workflowConigs = getWorkflowConfigs(filePath,
+ queryParams, jobType, nameNode);
+ String configXMl = oozieUtils.generateConfigXml(workflowConigs);
+ LOGGER.info("Config xml==" + configXMl);
+ HashMap<String, String> customHeaders = new HashMap<String, String>();
+ customHeaders.put("Content-Type", "application/xml;charset=UTF-8");
+ Response serviceResponse = consumeService(headers, getServiceUri()
+ + "/v2/jobs?" + getJobSumbitOozieParams(queryParams),
+ HttpMethod.POST, configXMl, customHeaders);
+
+ LOGGER.info("Resp from oozie status entity=="
+ + serviceResponse.getEntity());
+ if (serviceResponse.getEntity() instanceof String) {
+ return (String) serviceResponse.getEntity();
+ } else {
+ return "success";
+ }
+ }
+
+ public Response consumeService(HttpHeaders headers, String path,
+ MultivaluedMap<String, String> queryParameters, String method,
+ String body) throws Exception {
+ return consumeService(headers, this.buildUri(path, queryParameters),
+ method, body, null);
+ }
+
+ private Response consumeService(HttpHeaders headers, String urlToRead,
+ String method, String body, Map<String, String> customHeaders) {
+ Response response = null;
+ InputStream stream = readFromOozie(headers, urlToRead, method, body,
+ customHeaders);
+ String stringResponse = null;
+ try {
+ stringResponse = IOUtils.toString(stream);
+ } catch (IOException e) {
+ LOGGER.error("Error while converting stream to string", e);
+ throw new RuntimeException(e);
+ }
+ if (stringResponse.contains(Response.Status.BAD_REQUEST.name())) {
+ response = Response.status(Response.Status.BAD_REQUEST)
+ .entity(stringResponse).type(MediaType.TEXT_PLAIN).build();
+ } else {
+ response = Response.status(Response.Status.OK)
+ .entity(stringResponse)
+ .type(utils.deduceType(stringResponse)).build();
+ }
+ return response;
+ }
+
+ public InputStream readFromOozie(HttpHeaders headers, String urlToRead,
+ String method, String body, Map<String, String> customHeaders) {
+
+ Map<String, String> newHeaders = utils.getHeaders(headers);
+ newHeaders.put(USER_NAME_HEADER, USER_OOZIE_SUPER);
+
+ newHeaders.put(DO_AS_HEADER, viewContext.getUsername());
+ newHeaders.put("Accept", MediaType.APPLICATION_JSON);
+ if (customHeaders != null) {
+ newHeaders.putAll(customHeaders);
+ }
+ LOGGER.info(String.format("Proxy request for url: [%s] %s", method,
+ urlToRead));
+
+ return ambariIOUtil.readFromUrl(urlToRead, method, body, newHeaders);
+ }
+
+ private Map<String, String> getWorkflowConfigs(String filePath,
+ MultivaluedMap<String, String> queryParams, JobType jobType,
+ String nameNode) {
+ HashMap<String, String> workflowConigs = new HashMap<String, String>();
+ if (queryParams.containsKey("resourceManager")
+ && "useDefault".equals(queryParams.getFirst("resourceManager"))) {
+ String jobTrackerNode = viewContext.getCluster()
+ .getConfigurationValue("yarn-site",
+ "yarn.resourcemanager.address");
+ LOGGER.info("jobTrackerNode===" + jobTrackerNode);
+ workflowConigs.put("resourceManager", jobTrackerNode);
+ workflowConigs.put("jobTracker", jobTrackerNode);
+ }
+ if (queryParams != null) {
+ for (Map.Entry<String, List<String>> entry : queryParams.entrySet()) {
+ if (entry.getKey().startsWith("config.")) {
+ if (entry.getValue() != null && entry.getValue().size() > 0) {
+ workflowConigs.put(entry.getKey().substring(7), entry
+ .getValue().get(0));
+ }
+ }
+ }
+ }
+
+ if (queryParams.containsKey("oozieconfig.useSystemLibPath")) {
+ String useSystemLibPath = queryParams
+ .getFirst("oozieconfig.useSystemLibPath");
+ workflowConigs.put(OOZIE_USE_SYSTEM_LIBPATH_CONF_KEY,
+ useSystemLibPath);
+ } else {
+ workflowConigs.put(OOZIE_USE_SYSTEM_LIBPATH_CONF_KEY, "true");
+ }
+ if (queryParams.containsKey("oozieconfig.rerunOnFailure")) {
+ String rerunFailnodes = queryParams
+ .getFirst("oozieconfig.rerunOnFailure");
+ workflowConigs.put(OOZIE_WF_RERUN_FAILNODES_CONF_KEY,
+ rerunFailnodes);
+ } else {
+ workflowConigs.put(OOZIE_WF_RERUN_FAILNODES_CONF_KEY, "true");
+ }
+ workflowConigs.put("user.name", viewContext.getUsername());
+ workflowConigs.put(oozieUtils.getJobPathPropertyKey(jobType), nameNode
+ + filePath);
+ return workflowConigs;
+ }
+
+ private String getJobSumbitOozieParams(
+ MultivaluedMap<String, String> queryParams) {
+ StringBuilder query = new StringBuilder();
+ if (queryParams != null) {
+ for (Map.Entry<String, List<String>> entry : queryParams.entrySet()) {
+ if (entry.getKey().startsWith(OOZIEPARAM_PREFIX)) {
+ if (entry.getValue() != null && entry.getValue().size() > 0) {
+ for (String val : entry.getValue()) {
+ query.append(
+ entry.getKey().substring(
+ OOZIEPARAM_PREFIX_LENGTH))
+ .append(EQUAL_SYMBOL).append(val)
+ .append("&");
+ }
+ }
+ }
+ }
+ }
+ return query.toString();
+ }
+
+ private String getServiceUri() {
+ String serviceURI = viewContext.getProperties().get(SERVICE_URI_PROP) != null ? viewContext
+ .getProperties().get(SERVICE_URI_PROP) : DEFAULT_SERVICE_URI;
+ return serviceURI;
+ }
+
+ private String buildUri(String absolutePath,
+ MultivaluedMap<String, String> queryParameters) {
+ int index = absolutePath.indexOf("proxy/") + 5;
+ absolutePath = absolutePath.substring(index);
+ String serviceURI = getServiceUri();
+ serviceURI += absolutePath;
+ MultivaluedMap<String, String> params = addOrReplaceUserName(queryParameters);
+ return serviceURI + utils.convertParamsToUrl(params);
+ }
+
+ private MultivaluedMap<String, String> addOrReplaceUserName(
+ MultivaluedMap<String, String> parameters) {
+ for (Map.Entry<String, List<String>> entry : parameters.entrySet()) {
+ if ("user.name".equals(entry.getKey())) {
+ ArrayList<String> vals = new ArrayList<String>(1);
+ vals.add(viewContext.getUsername());
+ entry.setValue(vals);
+ }
+ }
+ return parameters;
+ }
+
+ public String getDagUrl(String jobid) {
+ return getServiceUri() + "/v2/job/" + jobid + "?show=graph";
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/47a98824/contrib/views/wfmanager/src/main/java/org/apache/oozie/ambari/view/OozieProxyImpersonator.java
----------------------------------------------------------------------
diff --git a/contrib/views/wfmanager/src/main/java/org/apache/oozie/ambari/view/OozieProxyImpersonator.java b/contrib/views/wfmanager/src/main/java/org/apache/oozie/ambari/view/OozieProxyImpersonator.java
index 08a166d..9dd02d4 100644
--- a/contrib/views/wfmanager/src/main/java/org/apache/oozie/ambari/view/OozieProxyImpersonator.java
+++ b/contrib/views/wfmanager/src/main/java/org/apache/oozie/ambari/view/OozieProxyImpersonator.java
@@ -17,12 +17,14 @@
*/
package org.apache.oozie.ambari.view;
+import static org.apache.oozie.ambari.view.Constants.MESSAGE_KEY;
+import static org.apache.oozie.ambari.view.Constants.STATUS_KEY;
+import static org.apache.oozie.ambari.view.Constants.STATUS_OK;
+
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
-import java.util.ArrayList;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
import javax.inject.Inject;
@@ -40,8 +42,8 @@ import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.core.MediaType;
-import javax.ws.rs.core.MultivaluedMap;
import javax.ws.rs.core.Response;
+import javax.ws.rs.core.Response.Status;
import javax.ws.rs.core.StreamingOutput;
import javax.ws.rs.core.UriInfo;
@@ -50,6 +52,7 @@ import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.hadoop.security.AccessControlException;
+import org.apache.oozie.ambari.view.assets.AssetResource;
import org.apache.oozie.ambari.view.workflowmanager.WorkflowManagerService;
import org.apache.oozie.ambari.view.workflowmanager.WorkflowsManagerResource;
import org.slf4j.Logger;
@@ -63,639 +66,449 @@ import com.google.inject.Singleton;
*/
@Singleton
public class OozieProxyImpersonator {
- private final static Logger LOGGER = LoggerFactory
- .getLogger(OozieProxyImpersonator.class);
-
- private static final String OOZIEPARAM_PREFIX = "oozieparam.";
- private static final int OOZIEPARAM_PREFIX_LENGTH = OOZIEPARAM_PREFIX
- .length();
- private static final String EQUAL_SYMBOL = "=";
- private static final String OOZIE_WF_RERUN_FAILNODES_CONF_KEY = "oozie.wf.rerun.failnodes";
- private static final String OOZIE_USE_SYSTEM_LIBPATH_CONF_KEY = "oozie.use.system.libpath";
-
- private ViewContext viewContext;
-
- private static final String USER_NAME_HEADER = "user.name";
- private static final String USER_OOZIE_SUPER = "oozie";
- private static final String DO_AS_HEADER = "doAs";
-
- private static final String SERVICE_URI_PROP = "oozie.service.uri";
- private static final String DEFAULT_SERVICE_URI = "http://sandbox.hortonworks.com:11000/oozie";
- private Utils utils = new Utils();
- private AmbariIOUtil ambariIOUtil;
- private OozieUtils oozieUtils = new OozieUtils();
- private HDFSFileUtils hdfsFileUtils;
- private WorkflowFilesService workflowFilesService;
- //private WorkflowManagerService workflowManagerService;
-
- private static enum ErrorCodes {
- OOZIE_SUBMIT_ERROR("error.oozie.submit", "Oozie Submit error"), OOZIE_IO_ERROR(
- "error.oozie.io", "Oozie I/O error"), FILE_ACCESS_ACL_ERROR(
- "error.file.access.control",
- "Access Error to file due to access control"), FILE_ACCESS_UNKNOWN_ERROR(
- "error.file.access", "Error accessing file"), WORKFLOW_PATH_EXISTS(
- "error.workflow.path.exists", "Worfklow path exists");
- private String errorCode;
- private String description;
-
- ErrorCodes(String errorCode, String description) {
- this.errorCode = errorCode;
- this.description = description;
- }
-
- public String getErrorCode() {
- return errorCode;
- }
-
- public String getDescription() {
- return description;
- }
- }
-
- @Inject
- public OozieProxyImpersonator(ViewContext viewContext) {
- this.viewContext = viewContext;
- hdfsFileUtils = new HDFSFileUtils(viewContext);
- workflowFilesService = new WorkflowFilesService(hdfsFileUtils);
- ambariIOUtil=new AmbariIOUtil(viewContext);
- //workflowManagerService = new WorkflowManagerService(viewContext);
- LOGGER.info(String.format(
- "OozieProxyImpersonator initialized for instance: %s",
- viewContext.getInstanceName()));
-
- }
-
- @Path("/fileServices")
- public FileServices fileServices() {
- return new FileServices(viewContext);
- }
-
- @Path("/wfprojects")
- public WorkflowsManagerResource workflowsManagerResource() {
- return new WorkflowsManagerResource(viewContext);
+ private final static Logger LOGGER = LoggerFactory
+ .getLogger(OozieProxyImpersonator.class);
+
+ private ViewContext viewContext;
+ private final Utils utils = new Utils();
+
+
+ private final HDFSFileUtils hdfsFileUtils;
+ private final WorkflowFilesService workflowFilesService;
+ private WorkflowManagerService workflowManagerService;
+ private static final boolean PROJ_MANAGER_ENABLED = false;
+ private final OozieDelegate oozieDelegate;
+ private final OozieUtils oozieUtils = new OozieUtils();
+ private final AssetResource assetResource;
+ private final AmbariIOUtil ambariIOUtil;
+ private static enum ErrorCodes {
+ OOZIE_SUBMIT_ERROR("error.oozie.submit", "Oozie Submit error"), OOZIE_IO_ERROR(
+ "error.oozie.io", "Oozie I/O error"), FILE_ACCESS_ACL_ERROR(
+ "error.file.access.control",
+ "Access Error to file due to access control"), FILE_ACCESS_UNKNOWN_ERROR(
+ "error.file.access", "Error accessing file"), WORKFLOW_PATH_EXISTS(
+ "error.workflow.path.exists", "Worfklow path exists");
+ private String errorCode;
+ private String description;
+
+ ErrorCodes(String errorCode, String description) {
+ this.errorCode = errorCode;
+ this.description = description;
+ }
+
+ public String getErrorCode() {
+ return errorCode;
+ }
+
+ public String getDescription() {
+ return description;
+ }
+ }
+
+ @Inject
+ public OozieProxyImpersonator(ViewContext viewContext) {
+ this.viewContext = viewContext;
+ hdfsFileUtils = new HDFSFileUtils(viewContext);
+ workflowFilesService = new WorkflowFilesService(hdfsFileUtils);
+ this.oozieDelegate = new OozieDelegate(viewContext);
+ assetResource = new AssetResource(viewContext);
+ if (PROJ_MANAGER_ENABLED) {
+ workflowManagerService = new WorkflowManagerService(viewContext);
+ }
+ ambariIOUtil=new AmbariIOUtil(viewContext);
+
+ LOGGER.info(String.format(
+ "OozieProxyImpersonator initialized for instance: %s",
+ viewContext.getInstanceName()));
+
+ }
+
+ @Path("/fileServices")
+ public FileServices fileServices() {
+ return new FileServices(viewContext);
+ }
+
+ @Path("/wfprojects")
+ public WorkflowsManagerResource workflowsManagerResource() {
+ return new WorkflowsManagerResource(viewContext);
+ }
+
+ @Path("/assets")
+ public AssetResource assetResource() {
+ return this.assetResource;
+ }
+
+ @GET
+ @Path("/getCurrentUserName")
+ public Response getCurrentUserName() {
+ return Response.ok(viewContext.getUsername()).build();
+ }
+
+ @POST
+ @Path("/submitJob")
+ @Consumes({MediaType.TEXT_PLAIN + "," + MediaType.TEXT_XML})
+ public Response submitJob(String postBody, @Context HttpHeaders headers,
+ @Context UriInfo ui, @QueryParam("app.path") String appPath,
+ @QueryParam("projectId") String projectId,
+ @DefaultValue("false") @QueryParam("overwrite") Boolean overwrite,
+ @QueryParam("description") String description,
+ @QueryParam("jobType") String jobType) {
+ LOGGER.info("submit workflow job called");
+ return submitJobInternal(postBody, headers, ui, appPath, overwrite,
+ JobType.valueOf(jobType), projectId, description);
+ }
+
+ @POST
+ @Path("/saveWorkflow")
+ @Consumes({MediaType.TEXT_PLAIN + "," + MediaType.TEXT_XML})
+ public Response saveWorkflow(String postBody, @Context HttpHeaders headers,
+ @Context UriInfo ui, @QueryParam("app.path") String appPath, @QueryParam("description") String description,
+ @QueryParam("projectId") String projectId, @QueryParam("jobType") String jobType,
+ @DefaultValue("false") @QueryParam("overwrite") Boolean overwrite) {
+ LOGGER.info("save workflow called");
+ if (StringUtils.isEmpty(appPath)) {
+ throw new RuntimeException("app path can't be empty.");
+ }
+ appPath = appPath.trim();
+ if (!overwrite) {
+ boolean fileExists = hdfsFileUtils.fileExists(appPath);
+ if (fileExists) {
+ return getFileExistsResponse();
+ }
+ }
+ postBody = utils.formatXml(postBody);
+ try {
+ String filePath = workflowFilesService.createWorkflowFile(appPath,
+ postBody, overwrite);
+ LOGGER.info(String.format(
+ "submit workflow job done. filePath=[%s]", filePath));
+ if (PROJ_MANAGER_ENABLED) {
+ JobType deducedJobType = oozieUtils.deduceJobType(postBody);
+ String workflowName = oozieUtils.deduceWorkflowNameFromXml(postBody);
+ workflowManagerService.saveWorkflow(projectId, appPath,
+ deducedJobType, description,
+ viewContext.getUsername(), workflowName);
+ }
+
+ return Response.ok().build();
+ } catch (Exception ex) {
+ LOGGER.error(ex.getMessage(), ex);
+ return getRespCodeForException(ex);
+
+ }
+ }
+
+ @POST
+ @Path("/saveWorkflowDraft")
+ @Consumes({MediaType.TEXT_PLAIN + "," + MediaType.TEXT_XML})
+ public Response saveDraft(String postBody, @Context HttpHeaders headers,
+ @Context UriInfo ui, @QueryParam("app.path") String appPath,
+ @QueryParam("projectId") String projectId, @QueryParam("description") String description,
+ @DefaultValue("false") @QueryParam("overwrite") Boolean overwrite, @QueryParam("jobType") String jobTypeStr)
+ throws IOException {
+ LOGGER.info("save workflow called");
+ if (StringUtils.isEmpty(appPath)) {
+ throw new RuntimeException("app path can't be empty.");
+ }
+ appPath = appPath.trim();
+ workflowFilesService.saveDraft(appPath, postBody, overwrite);
+ if (PROJ_MANAGER_ENABLED) {
+ JobType jobType = StringUtils.isEmpty(jobTypeStr) ? JobType.WORKFLOW : JobType.valueOf(jobTypeStr);
+ String name = oozieUtils.deduceWorkflowNameFromJson(postBody);
+ workflowManagerService.saveWorkflow(projectId, appPath,
+ jobType, description,
+ viewContext.getUsername(), name);
+ }
+ return Response.ok().build();
+ }
+
+ @POST
+ @Path("/publishAsset")
+ @Consumes({MediaType.TEXT_PLAIN + "," + MediaType.TEXT_XML})
+ public Response publishAsset(String postBody, @Context HttpHeaders headers,
+ @Context UriInfo ui, @QueryParam("uploadPath") String uploadPath,
+ @DefaultValue("false") @QueryParam("overwrite") Boolean overwrite) {
+ LOGGER.info("publish asset called");
+ if (StringUtils.isEmpty(uploadPath)) {
+ throw new RuntimeException("upload path can't be empty.");
+ }
+ uploadPath = uploadPath.trim();
+ Map<String, String> validateAsset = assetResource.validateAsset(headers, postBody,
+ ui.getQueryParameters());
+ if (!STATUS_OK.equals(validateAsset.get(STATUS_KEY))) {
+ return Response.status(Status.BAD_REQUEST).entity(
+ validateAsset.get(MESSAGE_KEY)).build();
+ }
+ return saveAsset(postBody, uploadPath, overwrite);
+ }
+
+ private Response saveAsset(String postBody, String uploadPath,
+ Boolean overwrite) {
+ uploadPath = workflowFilesService.getAssetFileName(uploadPath);
+ if (!overwrite) {
+ boolean fileExists = hdfsFileUtils.fileExists(uploadPath);
+ if (fileExists) {
+ return getFileExistsResponse();
+ }
+ }
+ postBody = utils.formatXml(postBody);
+ try {
+ String filePath = workflowFilesService.createAssetFile(uploadPath,
+ postBody, overwrite);
+ LOGGER.info(String.format("publish asset job done. filePath=[%s]",
+ filePath));
+ return Response.ok().build();
+ } catch (Exception ex) {
+ LOGGER.error(ex.getMessage(), ex);
+ return getRespCodeForException(ex);
+ }
+ }
+
+
+ @GET
+ @Path("/readWorkflowDraft")
+ public Response readDraft(@QueryParam("workflowXmlPath") String workflowPath) {
+ if (StringUtils.isEmpty(workflowPath)) {
+ throw new RuntimeException("workflowXmlPath can't be empty.");
+ }
+ try {
+ final InputStream is = workflowFilesService.readDraft(workflowPath);
+ StreamingOutput streamer = new StreamingOutput() {
+ @Override
+ public void write(OutputStream os) throws IOException,
+ WebApplicationException {
+ IOUtils.copy(is, os);
+ is.close();
+ os.close();
+ }
+ };
+ return Response.ok(streamer).status(200).build();
+ } catch (IOException e) {
+ return getRespCodeForException(e);
+ }
+ }
+
+ @POST
+ @Path("/discardWorkflowDraft")
+ public Response discardDraft(
+ @QueryParam("workflowXmlPath") String workflowPath)
+ throws IOException {
+ workflowFilesService.discardDraft(workflowPath);
+ return Response.ok().build();
+ }
+
+ private Response submitJobInternal(String postBody, HttpHeaders headers,
+ UriInfo ui, String appPath, Boolean overwrite, JobType jobType,
+ String projectId, String description) {
+ if (StringUtils.isEmpty(appPath)) {
+ throw new RuntimeException("app path can't be empty.");
+ }
+ appPath = appPath.trim();
+ if (!overwrite) {
+ boolean fileExists = hdfsFileUtils.fileExists(appPath);
+ if (fileExists) {
+ return getFileExistsResponse();
+ }
+ }
+ postBody = utils.formatXml(postBody);
+ try {
+ String filePath = hdfsFileUtils.writeToFile(appPath, postBody,
+ overwrite);
+ LOGGER.info(String.format(
+ "submit workflow job done. filePath=[%s]", filePath));
+ } catch (Exception ex) {
+ LOGGER.error(ex.getMessage(), ex);
+ return getRespCodeForException(ex);
+
+ }
+ if (PROJ_MANAGER_ENABLED) {
+ String name = oozieUtils.deduceWorkflowNameFromXml(postBody);
+ workflowManagerService.saveWorkflow(projectId, appPath, jobType,
+ "todo description", viewContext.getUsername(), name);
+ }
+
+ String response = oozieDelegate.submitWorkflowJobToOozie(headers,
+ appPath, ui.getQueryParameters(), jobType);
+ if (response != null && response.trim().startsWith("{")) {
+ // dealing with oozie giving error but with 200 response.
+ return Response.status(Response.Status.OK).entity(response).build();
+ } else {
+ HashMap<String, String> resp = new HashMap<String, String>();
+ resp.put("status", ErrorCodes.OOZIE_SUBMIT_ERROR.getErrorCode());
+ resp.put("message", response);
+ return Response.status(Response.Status.BAD_REQUEST).entity(resp)
+ .build();
+ }
+
+ }
+
+ private Response getRespCodeForException(Exception ex) {
+ if (ex instanceof AccessControlException) {
+ HashMap<String, String> errorDetails = getErrorDetails(
+ ErrorCodes.FILE_ACCESS_ACL_ERROR.getErrorCode(),
+ ErrorCodes.FILE_ACCESS_ACL_ERROR.getDescription(), ex);
+ return Response.status(Response.Status.BAD_REQUEST)
+ .entity(errorDetails).build();
+ } else if (ex instanceof IOException) {
+ HashMap<String, String> errorDetails = getErrorDetails(
+ ErrorCodes.FILE_ACCESS_UNKNOWN_ERROR.getErrorCode(),
+ ErrorCodes.FILE_ACCESS_UNKNOWN_ERROR.getDescription(), ex);
+ return Response.status(Response.Status.INTERNAL_SERVER_ERROR)
+ .entity(errorDetails).build();
+ } else {
+ HashMap<String, String> errorDetails = getErrorDetails(
+ ErrorCodes.FILE_ACCESS_UNKNOWN_ERROR.getErrorCode(),
+ ErrorCodes.FILE_ACCESS_UNKNOWN_ERROR.getDescription(), ex);
+ return Response.status(Response.Status.INTERNAL_SERVER_ERROR)
+ .entity(errorDetails).build();
+ }
+
+ }
+
+ private Response getFileExistsResponse() {
+ HashMap<String, String> resp = new HashMap<String, String>();
+ resp.put("status", ErrorCodes.WORKFLOW_PATH_EXISTS.getErrorCode());
+ resp.put("message", ErrorCodes.WORKFLOW_PATH_EXISTS.getDescription());
+ return Response.status(Response.Status.BAD_REQUEST).entity(resp)
+ .build();
+ }
+
+ @GET
+ @Path("/readWorkflowDetail")
+ public Response isDraftAvailable(
+ @QueryParam("workflowXmlPath") String workflowPath) {
+ WorkflowFileInfo workflowDetails = workflowFilesService
+ .getWorkflowDetails(workflowPath);
+ return Response.ok(workflowDetails).build();
+ }
+
+ @GET
+ @Path("/readWorkflowXml")
+ public Response readWorkflowXxml(
+ @QueryParam("workflowXmlPath") String workflowPath) {
+ if (StringUtils.isEmpty(workflowPath)) {
+ throw new RuntimeException("workflowXmlPath can't be empty.");
+ }
+ try {
+ final InputStream is = workflowFilesService
+ .readWorkflowXml(workflowPath);
+ StreamingOutput streamer = new StreamingOutput() {
+ @Override
+ public void write(OutputStream os) throws IOException,
+ WebApplicationException {
+ IOUtils.copy(is, os);
+ is.close();
+ os.close();
+ }
+ };
+ return Response.ok(streamer).status(200).build();
+ } catch (IOException e) {
+ return getRespCodeForException(e);
+ }
+ }
+
+ private HashMap<String, String> getErrorDetails(String status,
+ String message, Exception ex) {
+ HashMap<String, String> resp = new HashMap<String, String>();
+ resp.put("status", status);
+ if (message != null) {
+ resp.put("message", message);
+ }
+ if (ex != null) {
+ resp.put("stackTrace", ExceptionUtils.getFullStackTrace(ex));
+ }
+ return resp;
+ }
+
+ @GET
+ @Path("/getDag")
+ @Produces("image/png")
+ public Response getDag(@Context HttpHeaders headers,
+ @Context UriInfo ui, @QueryParam("jobid") String jobid) {
+ Map<String, String> newHeaders = utils.getHeaders(headers);
+ final InputStream is = oozieDelegate.readFromOozie(headers,
+ oozieDelegate.getDagUrl(jobid), HttpMethod.GET, null,
+ newHeaders);
+ StreamingOutput streamer = new StreamingOutput() {
+ @Override
+ public void write(OutputStream os) throws IOException,
+ WebApplicationException {
+ IOUtils.copy(is, os);
+ is.close();
+ os.close();
+ }
+
+ };
+ return Response.ok(streamer).status(200).build();
+ }
+
+ @GET
+ @Path("/{path: .*}")
+ public Response handleGet(@Context HttpHeaders headers, @Context UriInfo ui) {
+ try {
+ return oozieDelegate.consumeService(headers, ui.getAbsolutePath()
+ .getPath(), ui.getQueryParameters(), HttpMethod.GET, null);
+ } catch (Exception ex) {
+ LOGGER.error("Error in GET proxy", ex);
+ return Response.status(Response.Status.INTERNAL_SERVER_ERROR)
+ .entity(getErrorDetailsForException("Oozie", ex)).build();
+ }
+ }
+
+ @POST
+ @Path("/{path: .*}")
+ public Response handlePost(String xml, @Context HttpHeaders headers,
+ @Context UriInfo ui) {
+ try {
+
+ return oozieDelegate.consumeService(headers, ui.getAbsolutePath()
+ .getPath(), ui.getQueryParameters(), HttpMethod.POST, xml);
+ } catch (Exception ex) {
+ LOGGER.error("Error in POST proxy", ex);
+ return Response.status(Response.Status.INTERNAL_SERVER_ERROR)
+ .entity(getErrorDetailsForException("Oozie", ex)).build();
+ }
+ }
+
+ @DELETE
+ @Path("/{path: .*}")
+ public Response handleDelete(@Context HttpHeaders headers,
+ @Context UriInfo ui) {
+ try {
+ return oozieDelegate.consumeService(headers, ui.getAbsolutePath()
+ .getPath(), ui.getQueryParameters(), HttpMethod.POST, null);
+ } catch (Exception ex) {
+ LOGGER.error("Error in DELETE proxy", ex);
+ return Response.status(Response.Status.INTERNAL_SERVER_ERROR)
+ .entity(getErrorDetailsForException("Oozie", ex)).build();
+ }
+ }
+
+ @PUT
+ @Path("/{path: .*}")
+ public Response handlePut(String body, @Context HttpHeaders headers,
+ @Context UriInfo ui) {
+ try {
+ return oozieDelegate.consumeService(headers, ui.getAbsolutePath()
+ .getPath(), ui.getQueryParameters(), HttpMethod.PUT, body);
+ } catch (Exception ex) {
+ LOGGER.error("Error in PUT proxy", ex);
+ return Response.status(Response.Status.INTERNAL_SERVER_ERROR)
+ .entity(getErrorDetailsForException("Oozie", ex)).build();
+ }
+ }
+
+ private Map<String, String> getErrorDetailsForException(String component,
+ Exception ex) {
+ String errorCode = component + "exception";
+ String errorMessage = component + " Exception";
+ if (ex instanceof RuntimeException) {
+ Throwable cause = ex.getCause();
+ if (cause instanceof IOException) {
+ errorCode = component + "io.exception";
+ errorMessage = component + "IO Exception";
+ }
}
-
- @GET
- @Path("/getCurrentUserName")
- public Response getCurrentUserName() {
- return Response.ok(viewContext.getUsername()).build();
- }
-
- @POST
- @Path("/submitJob")
- @Consumes({ MediaType.TEXT_PLAIN + "," + MediaType.TEXT_XML })
- public Response submitJob(String postBody, @Context HttpHeaders headers,
- @Context UriInfo ui, @QueryParam("app.path") String appPath,
- @DefaultValue("false") @QueryParam("overwrite") Boolean overwrite,
- @QueryParam("jobType") String jobType) {
- LOGGER.info("submit workflow job called");
- return submitJobInternal(postBody, headers, ui, appPath, overwrite,
- JobType.valueOf(jobType));
- }
-
- @POST
- @Path("/submitWorkflow")
- @Consumes({ MediaType.TEXT_PLAIN + "," + MediaType.TEXT_XML })
- public Response submitWorkflow(String postBody,
- @Context HttpHeaders headers, @Context UriInfo ui,
- @QueryParam("app.path") String appPath,
- @DefaultValue("false") @QueryParam("overwrite") Boolean overwrite) {
- LOGGER.info("submit workflow job called");
- return submitJobInternal(postBody, headers, ui, appPath, overwrite,
- JobType.WORKFLOW);
- }
-
- @POST
- @Path("/saveWorkflow")
- @Consumes({ MediaType.TEXT_PLAIN + "," + MediaType.TEXT_XML })
- public Response saveWorkflow(String postBody, @Context HttpHeaders headers,
- @Context UriInfo ui, @QueryParam("app.path") String appPath,
- @DefaultValue("false") @QueryParam("overwrite") Boolean overwrite) {
- LOGGER.info("save workflow called");
- if (StringUtils.isEmpty(appPath)) {
- throw new RuntimeException("app path can't be empty.");
- }
- appPath = appPath.trim();
- if (!overwrite) {
- boolean fileExists = hdfsFileUtils.fileExists(appPath);
- if (fileExists) {
- return getFileExistsResponse();
- }
- }
- postBody = utils.formatXml(postBody);
- try {
- String filePath = workflowFilesService.createWorkflowFile(appPath,
- postBody, overwrite);
- LOGGER.info(String.format(
- "submit workflow job done. filePath=[%s]", filePath));
- /* workflowManagerService.saveWorkflow(appPath, JobType.WORKFLOW,
- "todo description", viewContext.getUsername());*/
- return Response.ok().build();
- } catch (Exception ex) {
- LOGGER.error(ex.getMessage(), ex);
- return getRespCodeForException(ex);
-
- }
- }
-
- @POST
- @Path("/publishAsset")
- @Consumes({ MediaType.TEXT_PLAIN + "," + MediaType.TEXT_XML })
- public Response publishAsset(String postBody, @Context HttpHeaders headers,
- @Context UriInfo ui, @QueryParam("uploadPath") String uploadPath,
- @DefaultValue("false") @QueryParam("overwrite") Boolean overwrite) {
- LOGGER.info("publish asset called");
- if (StringUtils.isEmpty(uploadPath)) {
- throw new RuntimeException("upload path can't be empty.");
- }
- uploadPath = uploadPath.trim();
- Response dryRunResponse = validateAsset(headers, postBody, ui.getQueryParameters());
- if (dryRunResponse.getStatus() == 200) {
- return saveAsset(postBody, uploadPath, overwrite);
- }
- return dryRunResponse;
- }
-
- private Response validateAsset(HttpHeaders headers, String postBody, MultivaluedMap<String, String> queryParams) {
- String workflowXml = oozieUtils.generateWorkflowXml(postBody);
- try {
- String tempWfPath = "/user/"+viewContext.getUsername()+"/tmpooziewfs/tempwf.xml";
- hdfsFileUtils.writeToFile(tempWfPath, workflowXml, true);
- queryParams.put("oozieparam.action",getAsList("dryrun"));
- queryParams.put("oozieconfig.rerunOnFailure",getAsList("false"));
- queryParams.put("oozieconfig.useSystemLibPath",getAsList("true"));
- queryParams.put("resourceManager",getAsList("useDefault"));
- String dryRunResp = submitWorkflowJobToOozie(headers,tempWfPath,queryParams,JobType.WORKFLOW);
- LOGGER.info(String.format("resp from validating asset=[%s]",dryRunResp));
- if (dryRunResp != null && dryRunResp.trim().startsWith("{")) {
- return Response.status(Response.Status.OK).entity(dryRunResp).build();
- } else {
- HashMap<String, String> resp = new HashMap<String, String>();
- resp.put("status", ErrorCodes.OOZIE_SUBMIT_ERROR.getErrorCode());
- resp.put("message", dryRunResp);
- //resp.put("stackTrace", dryRunResp);
- return Response.status(Response.Status.BAD_REQUEST).entity(resp).build();
- }
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
-
- private List<String> getAsList(String string) {
- ArrayList<String> li=new ArrayList<>(1);
- li.add(string);
- return li;
- }
-
- private Response saveAsset(String postBody, String uploadPath,
- Boolean overwrite) {
- uploadPath = workflowFilesService.getAssetFileName(uploadPath);
- if (!overwrite) {
- boolean fileExists = hdfsFileUtils.fileExists(uploadPath);
- if (fileExists) {
- return getFileExistsResponse();
- }
- }
- postBody = utils.formatXml(postBody);
- try {
- String filePath = workflowFilesService.createAssetFile(uploadPath,
- postBody, overwrite);
- LOGGER.info(String.format(
- "publish asset job done. filePath=[%s]", filePath));
- return Response.ok().build();
- } catch (Exception ex) {
- LOGGER.error(ex.getMessage(), ex);
- return getRespCodeForException(ex);
- }
- }
-
- @POST
- @Path("/saveWorkflowDraft")
- @Consumes({ MediaType.TEXT_PLAIN + "," + MediaType.TEXT_XML })
- public Response saveDraft(String postBody, @Context HttpHeaders headers,
- @Context UriInfo ui, @QueryParam("app.path") String appPath,
- @DefaultValue("false") @QueryParam("overwrite") Boolean overwrite)
- throws IOException {
- LOGGER.info("save workflow called");
- if (StringUtils.isEmpty(appPath)) {
- throw new RuntimeException("app path can't be empty.");
- }
- appPath = appPath.trim();
- workflowFilesService.saveDraft(appPath, postBody, overwrite);
- /* workflowManagerService.saveWorkflow(appPath, JobType.WORKFLOW,
- "todo description", viewContext.getUsername());*/
- return Response.ok().build();
- }
-
- @GET
- @Path("/readWorkflowDraft")
- public Response readDraft(@QueryParam("workflowXmlPath") String workflowPath) {
- if (StringUtils.isEmpty(workflowPath)) {
- throw new RuntimeException("workflowXmlPath can't be empty.");
- }
- try {
- final InputStream is = workflowFilesService.readDraft(workflowPath);
- StreamingOutput streamer = new StreamingOutput() {
- @Override
- public void write(OutputStream os) throws IOException,
- WebApplicationException {
- IOUtils.copy(is, os);
- is.close();
- os.close();
- }
- };
- return Response.ok(streamer).status(200).build();
- } catch (IOException e) {
- return getRespCodeForException(e);
- }
- }
-
- @POST
- @Path("/discardWorkflowDraft")
- public Response discardDraft(@QueryParam("workflowXmlPath") String workflowPath) throws IOException{
- workflowFilesService.discardDraft(workflowPath);
- return Response.ok().build();
- }
-
- private Response submitJobInternal(String postBody, HttpHeaders headers,
- UriInfo ui, String appPath, Boolean overwrite, JobType jobType) {
- if (StringUtils.isEmpty(appPath)) {
- throw new RuntimeException("app path can't be empty.");
- }
- appPath = appPath.trim();
- if (!overwrite) {
- boolean fileExists = hdfsFileUtils.fileExists(appPath);
- if (fileExists) {
- return getFileExistsResponse();
- }
- }
- postBody = utils.formatXml(postBody);
- try {
- String filePath = hdfsFileUtils.writeToFile(appPath, postBody,
- overwrite);
- LOGGER.info(String.format(
- "submit workflow job done. filePath=[%s]", filePath));
- } catch (Exception ex) {
- LOGGER.error(ex.getMessage(), ex);
- return getRespCodeForException(ex);
-
- }
- /* workflowManagerService.saveWorkflow(appPath, jobType,
- "todo description", viewContext.getUsername());*/
- String response = submitWorkflowJobToOozie(headers, appPath,
- ui.getQueryParameters(), jobType);
- if (response != null && response.trim().startsWith("{")) {
- // dealing with oozie giving error but with 200 response.
- return Response.status(Response.Status.OK).entity(response).build();
- } else {
- HashMap<String, String> resp = new HashMap<String, String>();
- resp.put("status", ErrorCodes.OOZIE_SUBMIT_ERROR.getErrorCode());
- resp.put("message", response);
- return Response.status(Response.Status.BAD_REQUEST).entity(resp)
- .build();
- }
-
- }
-
- private Response getRespCodeForException(Exception ex) {
- if (ex instanceof AccessControlException) {
- HashMap<String, String> errorDetails = getErrorDetails(
- ErrorCodes.FILE_ACCESS_ACL_ERROR.getErrorCode(),
- ErrorCodes.FILE_ACCESS_ACL_ERROR.getDescription(), ex);
- return Response.status(Response.Status.BAD_REQUEST)
- .entity(errorDetails).build();
- } else if (ex instanceof IOException) {
- HashMap<String, String> errorDetails = getErrorDetails(
- ErrorCodes.FILE_ACCESS_UNKNOWN_ERROR.getErrorCode(),
- ErrorCodes.FILE_ACCESS_UNKNOWN_ERROR.getDescription(), ex);
- return Response.status(Response.Status.INTERNAL_SERVER_ERROR)
- .entity(errorDetails).build();
- } else {
- HashMap<String, String> errorDetails = getErrorDetails(
- ErrorCodes.FILE_ACCESS_UNKNOWN_ERROR.getErrorCode(),
- ErrorCodes.FILE_ACCESS_UNKNOWN_ERROR.getDescription(), ex);
- return Response.status(Response.Status.INTERNAL_SERVER_ERROR)
- .entity(errorDetails).build();
- }
-
- }
-
- private Response getFileExistsResponse() {
- HashMap<String, String> resp = new HashMap<String, String>();
- resp.put("status", ErrorCodes.WORKFLOW_PATH_EXISTS.getErrorCode());
- resp.put("message", ErrorCodes.WORKFLOW_PATH_EXISTS.getDescription());
- return Response.status(Response.Status.BAD_REQUEST).entity(resp)
- .build();
- }
-
- @GET
- @Path("/readWorkflowDetail")
- public Response isDraftAvailable(@QueryParam("workflowXmlPath") String workflowPath){
- WorkflowFileInfo workflowDetails = workflowFilesService.getWorkflowDetails(workflowPath);
- return Response.ok(workflowDetails).build();
- }
-
- @GET
- @Path("/readWorkflowXml")
- public Response readWorkflowXxml(
- @QueryParam("workflowXmlPath") String workflowPath) {
- if (StringUtils.isEmpty(workflowPath)) {
- throw new RuntimeException("workflowXmlPath can't be empty.");
- }
- try {
- final InputStream is = workflowFilesService.readWorkflowXml(workflowPath);
- StreamingOutput streamer = new StreamingOutput() {
- @Override
- public void write(OutputStream os) throws IOException,
- WebApplicationException {
- IOUtils.copy(is, os);
- is.close();
- os.close();
- }
- };
- return Response.ok(streamer).status(200).build();
- } catch (IOException e) {
- return getRespCodeForException(e);
- }
- }
-
- private HashMap<String, String> getErrorDetails(String status,
- String message, Exception ex) {
- HashMap<String, String> resp = new HashMap<String, String>();
- resp.put("status", status);
- if (message != null) {
- resp.put("message", message);
- }
- if (ex != null) {
- resp.put("stackTrace", ExceptionUtils.getFullStackTrace(ex));
- }
- return resp;
- }
-
- @GET
- @Path("/getDag")
- @Produces("image/png")
- public Response submitWorkflow(@Context HttpHeaders headers,
- @Context UriInfo ui, @QueryParam("jobid") String jobid) {
- String imgUrl = getServiceUri() + "/v2/job/" + jobid + "?show=graph";
- Map<String, String> newHeaders = utils.getHeaders(headers);
- final InputStream is = readFromOozie(headers, imgUrl, HttpMethod.GET,
- null, newHeaders);
- StreamingOutput streamer = new StreamingOutput() {
-
- @Override
- public void write(OutputStream os) throws IOException,
- WebApplicationException {
- IOUtils.copy(is, os);
- is.close();
- os.close();
- }
-
- };
- return Response.ok(streamer).status(200).build();
- }
-
- @GET
- @Path("/{path: .*}")
- public Response handleGet(@Context HttpHeaders headers, @Context UriInfo ui) {
- try {
- String serviceURI = buildURI(ui);
- return consumeService(headers, serviceURI, HttpMethod.GET, null);
- } catch (Exception ex) {
- LOGGER.error("Error in GET proxy", ex);
- return Response.status(Response.Status.INTERNAL_SERVER_ERROR)
- .entity(getErrorDetailsForException("Oozie", ex)).build();
- }
- }
-
- @POST
- @Path("/{path: .*}")
- public Response handlePost(String xml, @Context HttpHeaders headers,
- @Context UriInfo ui) {
- try {
- String serviceURI = buildURI(ui);
- return consumeService(headers, serviceURI, HttpMethod.POST, xml);
- } catch (Exception ex) {
- LOGGER.error("Error in POST proxy", ex);
- return Response.status(Response.Status.INTERNAL_SERVER_ERROR)
- .entity(getErrorDetailsForException("Oozie", ex)).build();
- }
- }
-
- @DELETE
- @Path("/{path: .*}")
- public Response handleDelete(@Context HttpHeaders headers,
- @Context UriInfo ui) {
- try {
- String serviceURI = buildURI(ui);
- return consumeService(headers, serviceURI, HttpMethod.POST, null);
- } catch (Exception ex) {
- LOGGER.error("Error in DELETE proxy", ex);
- return Response.status(Response.Status.INTERNAL_SERVER_ERROR)
- .entity(getErrorDetailsForException("Oozie", ex)).build();
- }
- }
-
- @PUT
- @Path("/{path: .*}")
- public Response handlePut(String body, @Context HttpHeaders headers,
- @Context UriInfo ui) {
- try {
- String serviceURI = buildURI(ui);
- return consumeService(headers, serviceURI, HttpMethod.PUT, body);
- } catch (Exception ex) {
- LOGGER.error("Error in PUT proxy", ex);
- return Response.status(Response.Status.INTERNAL_SERVER_ERROR)
- .entity(getErrorDetailsForException("Oozie", ex)).build();
- }
- }
-
- private Map<String, String> getErrorDetailsForException(String component,
- Exception ex) {
- String errorCode = component + "exception";
- String errorMessage = component + " Exception";
- if (ex instanceof RuntimeException) {
- Throwable cause = ex.getCause();
- if (cause instanceof IOException) {
- errorCode = component + "io.exception";
- errorMessage = component + "IO Exception";
- }
- }
- return getErrorDetails(errorCode, errorMessage, ex);
- }
-
- private String submitWorkflowJobToOozie(HttpHeaders headers,
- String filePath, MultivaluedMap<String, String> queryParams,
- JobType jobType) {
- String nameNode = "hdfs://"
- + viewContext.getCluster().getConfigurationValue("hdfs-site",
- "dfs.namenode.rpc-address");
-
- if (!queryParams.containsKey("config.nameNode")) {
- ArrayList<String> nameNodes = new ArrayList<String>();
- LOGGER.info("Namenode===" + nameNode);
- nameNodes.add(nameNode);
- queryParams.put("config.nameNode", nameNodes);
- }
-
- Map<String, String> workflowConigs = getWorkflowConfigs(filePath,
- queryParams, jobType, nameNode);
- String configXMl = oozieUtils.generateConfigXml(workflowConigs);
- LOGGER.info("Config xml==" + configXMl);
- HashMap<String, String> customHeaders = new HashMap<String, String>();
- customHeaders.put("Content-Type", "application/xml;charset=UTF-8");
- Response serviceResponse = consumeService(headers, getServiceUri()
- + "/v2/jobs?" + getJobSumbitOozieParams(queryParams),
- HttpMethod.POST, configXMl, customHeaders);
-
- LOGGER.info("Resp from oozie status entity=="
- + serviceResponse.getEntity());
- if (serviceResponse.getEntity() instanceof String) {
- return (String) serviceResponse.getEntity();
- } else {
- return "success";
- }
-
- }
-
- private Map<String, String> getWorkflowConfigs(String filePath,
- MultivaluedMap<String, String> queryParams, JobType jobType,
- String nameNode) {
- HashMap<String, String> workflowConigs = new HashMap<String, String>();
- if (queryParams.containsKey("resourceManager")
- && "useDefault".equals(queryParams.getFirst("resourceManager"))) {
- String jobTrackerNode = viewContext.getCluster()
- .getConfigurationValue("yarn-site",
- "yarn.resourcemanager.address");
- LOGGER.info("jobTrackerNode===" + jobTrackerNode);
- workflowConigs.put("resourceManager", jobTrackerNode);
- workflowConigs.put("jobTracker", jobTrackerNode);
- }
- if (queryParams != null) {
- for (Map.Entry<String, List<String>> entry : queryParams.entrySet()) {
- if (entry.getKey().startsWith("config.")) {
- if (entry.getValue() != null && entry.getValue().size() > 0) {
- workflowConigs.put(entry.getKey().substring(7), entry
- .getValue().get(0));
- }
- }
- }
- }
-
- if (queryParams.containsKey("oozieconfig.useSystemLibPath")) {
- String useSystemLibPath = queryParams
- .getFirst("oozieconfig.useSystemLibPath");
- workflowConigs.put(OOZIE_USE_SYSTEM_LIBPATH_CONF_KEY,
- useSystemLibPath);
- } else {
- workflowConigs.put(OOZIE_USE_SYSTEM_LIBPATH_CONF_KEY, "true");
- }
- if (queryParams.containsKey("oozieconfig.rerunOnFailure")) {
- String rerunFailnodes = queryParams
- .getFirst("oozieconfig.rerunOnFailure");
- workflowConigs.put(OOZIE_WF_RERUN_FAILNODES_CONF_KEY,
- rerunFailnodes);
- } else {
- workflowConigs.put(OOZIE_WF_RERUN_FAILNODES_CONF_KEY, "true");
- }
- workflowConigs.put("user.name", viewContext.getUsername());
- workflowConigs.put(oozieUtils.getJobPathPropertyKey(jobType), nameNode
- + filePath);
- return workflowConigs;
- }
-
- private String getJobSumbitOozieParams(
- MultivaluedMap<String, String> queryParams) {
- StringBuilder query = new StringBuilder();
- if (queryParams != null) {
- for (Map.Entry<String, List<String>> entry : queryParams.entrySet()) {
- if (entry.getKey().startsWith(OOZIEPARAM_PREFIX)) {
- if (entry.getValue() != null && entry.getValue().size() > 0) {
- for (String val : entry.getValue()) {
- query.append(
- entry.getKey().substring(
- OOZIEPARAM_PREFIX_LENGTH))
- .append(EQUAL_SYMBOL).append(val)
- .append("&");
- }
- }
- }
- }
- }
- return query.toString();
- }
-
- private String buildURI(UriInfo ui) {
- String uiURI = ui.getAbsolutePath().getPath();
- int index = uiURI.indexOf("proxy/") + 5;
- uiURI = uiURI.substring(index);
- String serviceURI = getServiceUri();
- serviceURI += uiURI;
- MultivaluedMap<String, String> params = addOrReplaceUserName(ui
- .getQueryParameters());
- return serviceURI + utils.convertParamsToUrl(params);
- }
-
- private MultivaluedMap<String, String> addOrReplaceUserName(
- MultivaluedMap<String, String> parameters) {
- for (Map.Entry<String, List<String>> entry : parameters.entrySet()) {
- if ("user.name".equals(entry.getKey())) {
- ArrayList<String> vals = new ArrayList<String>(1);
- vals.add(viewContext.getUsername());
- entry.setValue(vals);
- }
- }
- return parameters;
- }
-
- private String getServiceUri() {
- String serviceURI = viewContext.getProperties().get(SERVICE_URI_PROP) != null ? viewContext
- .getProperties().get(SERVICE_URI_PROP) : DEFAULT_SERVICE_URI;
- return serviceURI;
- }
-
- private Response consumeService(HttpHeaders headers, String urlToRead,
- String method, String body) throws Exception {
- return consumeService(headers, urlToRead, method, body, null);
- }
-
- private Response consumeService(HttpHeaders headers, String urlToRead,
- String method, String body, Map<String, String> customHeaders) {
- Response response = null;
- InputStream stream = readFromOozie(headers, urlToRead, method, body,
- customHeaders);
- String stringResponse = null;
- try {
- stringResponse = IOUtils.toString(stream);
- } catch (IOException e) {
- LOGGER.error("Error while converting stream to string", e);
- throw new RuntimeException(e);
- }
- if (stringResponse.contains(Response.Status.BAD_REQUEST.name())) {
- response = Response.status(Response.Status.BAD_REQUEST)
- .entity(stringResponse).type(MediaType.TEXT_PLAIN).build();
- } else {
- response = Response.status(Response.Status.OK)
- .entity(stringResponse)
- .type(utils.deduceType(stringResponse)).build();
- }
- return response;
- }
-
- private InputStream readFromOozie(HttpHeaders headers, String urlToRead,
- String method, String body, Map<String, String> customHeaders) {
-
- Map<String, String> newHeaders = utils.getHeaders(headers);
- newHeaders.put(USER_NAME_HEADER, USER_OOZIE_SUPER);
-
- newHeaders.put(DO_AS_HEADER, viewContext.getUsername());
- newHeaders.put("Accept", MediaType.APPLICATION_JSON);
- if (customHeaders != null) {
- newHeaders.putAll(customHeaders);
- }
- LOGGER.info(String.format("Proxy request for url: [%s] %s", method,
- urlToRead));
-
- return ambariIOUtil.readFromUrl(urlToRead, method, body, newHeaders);
- }
+ return getErrorDetails(errorCode, errorMessage, ex);
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ambari/blob/47a98824/contrib/views/wfmanager/src/main/java/org/apache/oozie/ambari/view/OozieUtils.java
----------------------------------------------------------------------
diff --git a/contrib/views/wfmanager/src/main/java/org/apache/oozie/ambari/view/OozieUtils.java b/contrib/views/wfmanager/src/main/java/org/apache/oozie/ambari/view/OozieUtils.java
index f002102..9791c47 100644
--- a/contrib/views/wfmanager/src/main/java/org/apache/oozie/ambari/view/OozieUtils.java
+++ b/contrib/views/wfmanager/src/main/java/org/apache/oozie/ambari/view/OozieUtils.java
@@ -32,92 +32,144 @@ import org.w3c.dom.Element;
import org.xml.sax.InputSource;
import org.xml.sax.SAXException;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonParser;
+
public class OozieUtils {
- private final static Logger LOGGER = LoggerFactory
- .getLogger(OozieUtils.class);
- private Utils utils = new Utils();
-
- public String generateConfigXml(Map<String, String> map) {
- DocumentBuilderFactory dbf = DocumentBuilderFactory.newInstance();
- DocumentBuilder db;
- try {
- db = dbf.newDocumentBuilder();
- Document doc = db.newDocument();
- Element configElement = doc.createElement("configuration");
- doc.appendChild(configElement);
- for (Map.Entry<String, String> entry : map.entrySet()) {
- Element propElement = doc.createElement("property");
- configElement.appendChild(propElement);
- Element nameElem = doc.createElement("name");
- nameElem.setTextContent(entry.getKey());
- Element valueElem = doc.createElement("value");
- valueElem.setTextContent(entry.getValue());
- propElement.appendChild(nameElem);
- propElement.appendChild(valueElem);
- }
- return utils.generateXml(doc);
- } catch (ParserConfigurationException e) {
- LOGGER.error("error in generating config xml", e);
- throw new RuntimeException(e);
- }
- }
- public String getJobPathPropertyKey(JobType jobType) {
- switch (jobType) {
- case WORKFLOW:
- return "oozie.wf.application.path";
- case COORDINATOR:
- return "oozie.coord.application.path";
- case BUNDLE:
- return "oozie.bundle.application.path";
- }
- throw new RuntimeException("Unknown Job Type");
- }
-
- public String generateWorkflowXml(String actionNodeXml) {
- DocumentBuilderFactory dbf = DocumentBuilderFactory.newInstance();
- DocumentBuilder db;
- try {
- db = dbf.newDocumentBuilder();
- Document doc = db.newDocument();
-
- Element workflowElement = doc.createElement("workflow-app");
- workflowElement.setAttribute("name", "testWorkflow");
- workflowElement.setAttribute("xmlns", "uri:oozie:workflow:0.5");
- doc.appendChild(workflowElement);
-
- Element startElement = doc.createElement("start");
- startElement.setAttribute("to", "testAction");
- workflowElement.appendChild(startElement);
-
- Element actionElement = doc.createElement("action");
- actionElement.setAttribute("name", "testAction");
- Element actionSettingsElement = db.parse(new InputSource(new StringReader(actionNodeXml))).getDocumentElement();
- actionElement.appendChild(doc.importNode(actionSettingsElement, true));
- workflowElement.appendChild(actionElement);
-
- Element actionOkTransitionElement = doc.createElement("ok");
- actionOkTransitionElement.setAttribute("to", "end");
- actionElement.appendChild(actionOkTransitionElement);
-
- Element actionErrorTransitionElement = doc.createElement("error");
- actionErrorTransitionElement.setAttribute("to", "kill");
- actionElement.appendChild(actionErrorTransitionElement);
-
- Element killElement = doc.createElement("kill");
- killElement.setAttribute("name", "kill");
- Element killMessageElement = doc.createElement("message");
- killMessageElement.setTextContent("Kill node message");
- killElement.appendChild(killMessageElement);
- workflowElement.appendChild(killElement);
-
- Element endElement = doc.createElement("end");
- endElement.setAttribute("name", "end");
- workflowElement.appendChild(endElement);
-
- return utils.generateXml(doc);
- } catch (ParserConfigurationException | SAXException | IOException e) {
- LOGGER.error("error in generating workflow xml", e);
- throw new RuntimeException(e);
- }
- }
+ private final static Logger LOGGER = LoggerFactory
+ .getLogger(OozieUtils.class);
+ private Utils utils = new Utils();
+
+ public String generateConfigXml(Map<String, String> map) {
+ DocumentBuilderFactory dbf = DocumentBuilderFactory.newInstance();
+ DocumentBuilder db;
+ try {
+ db = dbf.newDocumentBuilder();
+ Document doc = db.newDocument();
+ Element configElement = doc.createElement("configuration");
+ doc.appendChild(configElement);
+ for (Map.Entry<String, String> entry : map.entrySet()) {
+ Element propElement = doc.createElement("property");
+ configElement.appendChild(propElement);
+ Element nameElem = doc.createElement("name");
+ nameElem.setTextContent(entry.getKey());
+ Element valueElem = doc.createElement("value");
+ valueElem.setTextContent(entry.getValue());
+ propElement.appendChild(nameElem);
+ propElement.appendChild(valueElem);
+ }
+ return utils.generateXml(doc);
+ } catch (ParserConfigurationException e) {
+ LOGGER.error("error in generating config xml", e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ public String getJobPathPropertyKey(JobType jobType) {
+ switch (jobType) {
+ case WORKFLOW:
+ return "oozie.wf.application.path";
+ case COORDINATOR:
+ return "oozie.coord.application.path";
+ case BUNDLE:
+ return "oozie.bundle.application.path";
+ }
+ throw new RuntimeException("Unknown Job Type");
+ }
+
+ public JobType deduceJobType(String xml) {
+ try {
+ DocumentBuilderFactory dbf = DocumentBuilderFactory.newInstance();
+ DocumentBuilder db = null;
+
+ db = dbf.newDocumentBuilder();
+ InputSource is = new InputSource();
+ is.setCharacterStream(new StringReader(xml));
+
+ Document doc = db.parse(is);
+ String rootNode = doc.getDocumentElement().getNodeName();
+ if ("workflow-app".equals(rootNode)) {
+ return JobType.WORKFLOW;
+ } else if ("coordinator-app".equals(rootNode)) {
+ return JobType.COORDINATOR;
+ } else if ("bundle-app".equals(rootNode)) {
+ return JobType.BUNDLE;
+ }
+ throw new RuntimeException("invalid xml submitted");
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public String deduceWorkflowNameFromJson(String json) {
+ JsonElement jsonElement = new JsonParser().parse(json);
+ String name = jsonElement.getAsJsonObject().get("name").getAsString();
+ return name;
+ }
+
+ public String deduceWorkflowNameFromXml(String xml) {
+ try {
+ DocumentBuilderFactory dbf = DocumentBuilderFactory.newInstance();
+ DocumentBuilder db = dbf.newDocumentBuilder();
+ InputSource is = new InputSource();
+ is.setCharacterStream(new StringReader(xml));
+ Document doc = db.parse(is);
+ String name = doc.getDocumentElement().getAttributeNode("name").getValue();
+ return name;
+
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public String generateWorkflowXml(String actionNodeXml) {
+ DocumentBuilderFactory dbf = DocumentBuilderFactory.newInstance();
+ DocumentBuilder db;
+ try {
+ db = dbf.newDocumentBuilder();
+ Document doc = db.newDocument();
+
+ Element workflowElement = doc.createElement("workflow-app");
+ workflowElement.setAttribute("name", "testWorkflow");
+ workflowElement.setAttribute("xmlns", "uri:oozie:workflow:0.5");
+ doc.appendChild(workflowElement);
+
+ Element startElement = doc.createElement("start");
+ startElement.setAttribute("to", "testAction");
+ workflowElement.appendChild(startElement);
+
+ Element actionElement = doc.createElement("action");
+ actionElement.setAttribute("name", "testAction");
+ Element actionSettingsElement = db.parse(
+ new InputSource(new StringReader(actionNodeXml)))
+ .getDocumentElement();
+ actionElement.appendChild(doc.importNode(actionSettingsElement,
+ true));
+ workflowElement.appendChild(actionElement);
+
+ Element actionOkTransitionElement = doc.createElement("ok");
+ actionOkTransitionElement.setAttribute("to", "end");
+ actionElement.appendChild(actionOkTransitionElement);
+
+ Element actionErrorTransitionElement = doc.createElement("error");
+ actionErrorTransitionElement.setAttribute("to", "kill");
+ actionElement.appendChild(actionErrorTransitionElement);
+
+ Element killElement = doc.createElement("kill");
+ killElement.setAttribute("name", "kill");
+ Element killMessageElement = doc.createElement("message");
+ killMessageElement.setTextContent("Kill node message");
+ killElement.appendChild(killMessageElement);
+ workflowElement.appendChild(killElement);
+
+ Element endElement = doc.createElement("end");
+ endElement.setAttribute("name", "end");
+ workflowElement.appendChild(endElement);
+
+ return utils.generateXml(doc);
+ } catch (ParserConfigurationException | SAXException | IOException e) {
+ LOGGER.error("error in generating workflow xml", e);
+ throw new RuntimeException(e);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/47a98824/contrib/views/wfmanager/src/main/java/org/apache/oozie/ambari/view/ServiceFormattedException.java
----------------------------------------------------------------------
diff --git a/contrib/views/wfmanager/src/main/java/org/apache/oozie/ambari/view/ServiceFormattedException.java b/contrib/views/wfmanager/src/main/java/org/apache/oozie/ambari/view/ServiceFormattedException.java
new file mode 100644
index 0000000..3a57d63
--- /dev/null
+++ b/contrib/views/wfmanager/src/main/java/org/apache/oozie/ambari/view/ServiceFormattedException.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.ambari.view;
+
+import java.util.HashMap;
+
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+
+import org.apache.commons.lang.exception.ExceptionUtils;
+import org.json.simple.JSONObject;
+
+public class ServiceFormattedException extends WebApplicationException {
+ private final static int STATUS = 500;
+
+ public ServiceFormattedException(Throwable exception) {
+ super(errorEntity(exception.getMessage(), exception));
+ }
+
+ public ServiceFormattedException(String message, Throwable exception) {
+ super(errorEntity(message, exception));
+ }
+
+ protected static Response errorEntity(String message, Throwable e) {
+ HashMap<String, Object> response = new HashMap<String, Object>();
+ response.put("message", message);
+ String trace = null;
+ if (e != null) {
+ trace = ExceptionUtils.getStackTrace(e);
+ }
+ response.put("trace", trace);
+ response.put("status", STATUS);
+ return Response.status(STATUS).entity(new JSONObject(response))
+ .type(MediaType.APPLICATION_JSON).build();
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/47a98824/contrib/views/wfmanager/src/main/java/org/apache/oozie/ambari/view/WorkflowFilesService.java
----------------------------------------------------------------------
diff --git a/contrib/views/wfmanager/src/main/java/org/apache/oozie/ambari/view/WorkflowFilesService.java b/contrib/views/wfmanager/src/main/java/org/apache/oozie/ambari/view/WorkflowFilesService.java
index 01bde47..f98df0d 100644
--- a/contrib/views/wfmanager/src/main/java/org/apache/oozie/ambari/view/WorkflowFilesService.java
+++ b/contrib/views/wfmanager/src/main/java/org/apache/oozie/ambari/view/WorkflowFilesService.java
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -19,98 +19,104 @@ package org.apache.oozie.ambari.view;
import java.io.IOException;
import java.io.InputStream;
+
import org.apache.hadoop.fs.FileStatus;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class WorkflowFilesService {
- private HDFSFileUtils hdfsFileUtils;
+ private final static Logger LOGGER = LoggerFactory
+ .getLogger(WorkflowFilesService.class);
+ private HDFSFileUtils hdfsFileUtils;
+
+ public WorkflowFilesService(HDFSFileUtils hdfsFileUtils) {
+ super();
+ this.hdfsFileUtils = hdfsFileUtils;
+ }
+
+ public String createWorkflowFile(String appPath, String content,
+ boolean overwrite) throws IOException {
+ return hdfsFileUtils.writeToFile(getWorkflowFileName(appPath), content,
+ overwrite);
+ }
+
+ public String createAssetFile(String appPath, String content,
+ boolean overwrite) throws IOException {
+ return hdfsFileUtils.writeToFile(appPath, content,
+ overwrite);
+ }
- public WorkflowFilesService(HDFSFileUtils hdfsFileUtils) {
- super();
- this.hdfsFileUtils = hdfsFileUtils;
- }
+ public String saveDraft(String appPath, String content, boolean overwrite)
+ throws IOException {
+ return hdfsFileUtils.writeToFile(getWorkflowDrafFileName(appPath),
+ content, overwrite);
+ }
- public String createWorkflowFile(String appPath, String content,
- boolean overwrite) throws IOException {
- return hdfsFileUtils.writeToFile(getWorkflowFileName(appPath), content,
- overwrite);
- }
-
- public String createAssetFile(String appPath, String content,
- boolean overwrite) throws IOException {
- return hdfsFileUtils.writeToFile(appPath, content,
- overwrite);
- }
+ public InputStream readDraft(String appPath) throws IOException {
+ return hdfsFileUtils.read(getWorkflowDrafFileName(appPath));
+ }
- public String saveDraft(String appPath, String content, boolean overwrite)
- throws IOException {
- return hdfsFileUtils.writeToFile(getWorkflowDrafFileName(appPath),
- content, overwrite);
- }
+ public InputStream readWorkflowXml(String appPath) throws IOException {
+ return hdfsFileUtils.read(getWorkflowFileName(appPath));
+ }
- public InputStream readDraft(String appPath) throws IOException {
- return hdfsFileUtils.read(getWorkflowDrafFileName(appPath));
- }
- public InputStream readWorkflowXml(String appPath) throws IOException {
- return hdfsFileUtils.read(getWorkflowFileName(appPath));
- }
+ private String getWorkflowDrafFileName(String appPath) {
+ return getWorkflowFileName(appPath).concat(".draft.json");
+ }
- private String getWorkflowDrafFileName(String appPath) {
- return getWorkflowFileName(appPath).concat(".draft.json");
- }
+ private String getWorkflowFileName(String appPath) {
+ String workflowFile = null;
+ if (appPath.endsWith(".xml")) {
+ workflowFile = appPath;
+ } else {
+ workflowFile = appPath + (appPath.endsWith("/") ? "" : "/")
+ + "workflow.xml";
+ }
+ return workflowFile;
+ }
- private String getWorkflowFileName(String appPath) {
- String workflowFile = null;
- if (appPath.endsWith(".xml")) {
- workflowFile = appPath;
- } else {
- workflowFile = appPath + (appPath.endsWith("/") ? "" : "/")
- + "workflow.xml";
- }
- return workflowFile;
- }
-
- public String getAssetFileName(String appPath) {
- String assetFile = null;
- if (appPath.endsWith(".xml")) {
- assetFile = appPath;
- } else {
- assetFile = appPath + (appPath.endsWith("/") ? "" : "/")
- + "asset.xml";
- }
- return assetFile;
- }
+ public String getAssetFileName(String appPath) {
+ String assetFile = null;
+ if (appPath.endsWith(".xml")) {
+ assetFile = appPath;
+ } else {
+ assetFile = appPath + (appPath.endsWith("/") ? "" : "/")
+ + "asset.xml";
+ }
+ return assetFile;
+ }
- public void discardDraft(String workflowPath) throws IOException {
- hdfsFileUtils.deleteFile(getWorkflowDrafFileName(workflowPath));
+ public void discardDraft(String workflowPath) throws IOException {
+ hdfsFileUtils.deleteFile(getWorkflowDrafFileName(workflowPath));
- }
+ }
- public WorkflowFileInfo getWorkflowDetails(String appPath) {
- WorkflowFileInfo workflowInfo = new WorkflowFileInfo();
- workflowInfo.setWorkflowPath(getWorkflowFileName(appPath));
- boolean draftExists = hdfsFileUtils
- .fileExists(getWorkflowDrafFileName(appPath));
- workflowInfo.setDraftExists(draftExists);
- boolean workflowExists = hdfsFileUtils.fileExists(getWorkflowFileName(appPath));
- FileStatus workflowFileStatus = null;
- if (workflowExists){
- workflowFileStatus = hdfsFileUtils
- .getFileStatus(getWorkflowFileName(appPath));
- workflowInfo.setWorkflowModificationTime(workflowFileStatus
- .getModificationTime());
- }
- if (draftExists) {
- FileStatus draftFileStatus = hdfsFileUtils
- .getFileStatus(getWorkflowDrafFileName(appPath));
- workflowInfo.setDraftModificationTime(draftFileStatus
- .getModificationTime());
- if (!workflowExists){
- workflowInfo.setIsDraftCurrent(true);
- }else{
- workflowInfo.setIsDraftCurrent(draftFileStatus.getModificationTime()
- - workflowFileStatus.getModificationTime() > 0);
- }
- }
- return workflowInfo;
- }
+ public WorkflowFileInfo getWorkflowDetails(String appPath) {
+ WorkflowFileInfo workflowInfo = new WorkflowFileInfo();
+ workflowInfo.setWorkflowPath(getWorkflowFileName(appPath));
+ boolean draftExists = hdfsFileUtils
+ .fileExists(getWorkflowDrafFileName(appPath));
+ workflowInfo.setDraftExists(draftExists);
+ boolean workflowExists = hdfsFileUtils.fileExists(getWorkflowFileName(appPath));
+ FileStatus workflowFileStatus = null;
+ if (workflowExists) {
+ workflowFileStatus = hdfsFileUtils
+ .getFileStatus(getWorkflowFileName(appPath));
+ workflowInfo.setWorkflowModificationTime(workflowFileStatus
+ .getModificationTime());
+ }
+ if (draftExists) {
+ FileStatus draftFileStatus = hdfsFileUtils
+ .getFileStatus(getWorkflowDrafFileName(appPath));
+ workflowInfo.setDraftModificationTime(draftFileStatus
+ .getModificationTime());
+ if (!workflowExists) {
+ workflowInfo.setIsDraftCurrent(true);
+ } else {
+ workflowInfo.setIsDraftCurrent(draftFileStatus.getModificationTime()
+ - workflowFileStatus.getModificationTime() > 0);
+ }
+ }
+ return workflowInfo;
+ }
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/47a98824/contrib/views/wfmanager/src/main/java/org/apache/oozie/ambari/view/assets/AssetDefinitionRepo.java
----------------------------------------------------------------------
diff --git a/contrib/views/wfmanager/src/main/java/org/apache/oozie/ambari/view/assets/AssetDefinitionRepo.java b/contrib/views/wfmanager/src/main/java/org/apache/oozie/ambari/view/assets/AssetDefinitionRepo.java
new file mode 100644
index 0000000..cebc7ea
--- /dev/null
+++ b/contrib/views/wfmanager/src/main/java/org/apache/oozie/ambari/view/assets/AssetDefinitionRepo.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.ambari.view.assets;
+
+import org.apache.ambari.view.DataStore;
+import org.apache.oozie.ambari.view.assets.model.ActionAssetDefinition;
+import org.apache.oozie.ambari.view.repo.BaseRepo;
+
+public class AssetDefinitionRepo extends BaseRepo<ActionAssetDefinition> {
+ public AssetDefinitionRepo(DataStore dataStore) {
+ super(ActionAssetDefinition.class, dataStore);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/47a98824/contrib/views/wfmanager/src/main/java/org/apache/oozie/ambari/view/assets/AssetRepo.java
----------------------------------------------------------------------
diff --git a/contrib/views/wfmanager/src/main/java/org/apache/oozie/ambari/view/assets/AssetRepo.java b/contrib/views/wfmanager/src/main/java/org/apache/oozie/ambari/view/assets/AssetRepo.java
index 1390ec0..df936a4 100644
--- a/contrib/views/wfmanager/src/main/java/org/apache/oozie/ambari/view/assets/AssetRepo.java
+++ b/contrib/views/wfmanager/src/main/java/org/apache/oozie/ambari/view/assets/AssetRepo.java
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -17,26 +17,23 @@
*/
package org.apache.oozie.ambari.view.assets;
-import java.util.Collection;
-
import org.apache.ambari.view.DataStore;
+import org.apache.ambari.view.PersistenceException;
import org.apache.oozie.ambari.view.assets.model.ActionAsset;
+import org.apache.oozie.ambari.view.repo.BaseRepo;
-public class AssetRepo {
- private final DataStore dataStore;
+import java.util.Collection;
- public AssetRepo(DataStore dataStore) {
- super();
- this.dataStore = dataStore;
- }
- public void saveAsset(ActionAsset asset){
-
- }
- public void deleteAsset(ActionAsset asset){
-
- }
- public Collection<ActionAsset>listAllAssets(){
- return null;
- }
+public class AssetRepo extends BaseRepo<ActionAsset> {
+ public AssetRepo(DataStore dataStore) {
+ super(ActionAsset.class, dataStore);
+ }
+ public Collection<ActionAsset> getMyAsets(String userName) {
+ try {
+ return dataStore.findAll(ActionAsset.class, " owner='" + userName + "'");
+ } catch (PersistenceException e) {
+ throw new RuntimeException(e);
+ }
+ }
}