You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by ni...@apache.org on 2016/12/23 10:43:01 UTC
[1/2] ambari git commit: AMBARI-19256 : Asset support Rest API
(Belliraj HB via nitirajrathore)
Repository: ambari
Updated Branches:
refs/heads/trunk e90009310 -> 47a988245
http://git-wip-us.apache.org/repos/asf/ambari/blob/47a98824/contrib/views/wfmanager/src/main/java/org/apache/oozie/ambari/view/assets/AssetResource.java
----------------------------------------------------------------------
diff --git a/contrib/views/wfmanager/src/main/java/org/apache/oozie/ambari/view/assets/AssetResource.java b/contrib/views/wfmanager/src/main/java/org/apache/oozie/ambari/view/assets/AssetResource.java
new file mode 100644
index 0000000..0622971
--- /dev/null
+++ b/contrib/views/wfmanager/src/main/java/org/apache/oozie/ambari/view/assets/AssetResource.java
@@ -0,0 +1,197 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.ambari.view.assets;
+
+import com.google.gson.Gson;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonParser;
+import org.apache.ambari.view.ViewContext;
+import org.apache.oozie.ambari.view.*;
+import org.apache.oozie.ambari.view.assets.model.ActionAsset;
+import org.apache.oozie.ambari.view.assets.model.ActionAssetDefinition;
+import org.apache.oozie.ambari.view.assets.model.AssetDefintion;
+import org.apache.oozie.ambari.view.model.APIResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.ws.rs.*;
+import javax.ws.rs.core.*;
+import javax.ws.rs.core.Response.Status;
+import java.io.IOException;
+import java.util.*;
+
+import static org.apache.oozie.ambari.view.Constants.*;
+
+public class AssetResource {
+
+ private final static Logger LOGGER = LoggerFactory
+ .getLogger(AssetResource.class);
+ private final AssetService assetService;
+ private final ViewContext viewContext;
+ private final HDFSFileUtils hdfsFileUtils;
+ private OozieUtils oozieUtils = new OozieUtils();
+ private final OozieDelegate oozieDelegate;
+
+
+ public AssetResource(ViewContext viewContext) {
+ this.viewContext = viewContext;
+ this.assetService = new AssetService(viewContext);
+ hdfsFileUtils = new HDFSFileUtils(viewContext);
+ oozieDelegate = new OozieDelegate(viewContext);
+ }
+
+ @GET
+ public Response getAssets() {
+ try {
+ Collection<ActionAsset> assets = assetService.getAssets();
+ APIResult result = new APIResult();
+ result.setStatus(APIResult.Status.SUCCESS);
+ result.getPaging().setTotal(assets != null ? assets.size() : 0L);
+ result.setData(assets);
+ return Response.ok(result).build();
+ } catch (Exception e) {
+ throw new ServiceFormattedException(e);
+ }
+ }
+
+ @GET
+ @Path("/mine")
+ public Response getMyAssets() {
+ try {
+ Collection<ActionAsset> assets = assetService.getMyAssets();
+ APIResult result = new APIResult();
+ result.setStatus(APIResult.Status.SUCCESS);
+ result.getPaging().setTotal(assets != null ? assets.size() : 0L);
+ result.setData(assets);
+ return Response.ok(result).build();
+ } catch (Exception e) {
+ throw new ServiceFormattedException(e);
+ }
+ }
+ @POST
+ public Response saveAsset(@Context HttpHeaders headers,
+ @QueryParam("id") String id, @Context UriInfo ui, String body) {
+ try {
+ Gson gson = new Gson();
+ AssetDefintion assetDefinition = gson.fromJson(body,
+ AssetDefintion.class);
+ Map<String, String> validateAsset = validateAsset(headers,
+ assetDefinition.getDefinition(), ui.getQueryParameters());
+ if (!STATUS_OK.equals(validateAsset.get(STATUS_KEY))) {
+ return Response.status(Status.BAD_REQUEST).build();
+ }
+ assetService.saveAsset(id, viewContext.getUsername(), assetDefinition);
+ APIResult result = new APIResult();
+ result.setStatus(APIResult.Status.SUCCESS);
+ return Response.ok(result).build();
+ } catch (Exception e) {
+ throw new ServiceFormattedException(e);
+ }
+ }
+
+ private List<String> getAsList(String string) {
+ ArrayList<String> li = new ArrayList<>(1);
+ li.add(string);
+ return li;
+ }
+
+ public Map<String, String> validateAsset(HttpHeaders headers,
+ String postBody, MultivaluedMap<String, String> queryParams) {
+ String workflowXml = oozieUtils.generateWorkflowXml(postBody);
+ try {
+ Map<String, String> result = new HashMap<>();
+ String tempWfPath = "/tmp" + "/tmpooziewfs/tempwf.xml";
+ hdfsFileUtils.writeToFile(tempWfPath, workflowXml, true);
+ queryParams.put("oozieparam.action", getAsList("dryrun"));
+ queryParams.put("oozieconfig.rerunOnFailure", getAsList("false"));
+ queryParams.put("oozieconfig.useSystemLibPath", getAsList("true"));
+ queryParams.put("resourceManager", getAsList("useDefault"));
+ String dryRunResp = oozieDelegate.submitWorkflowJobToOozie(headers,
+ tempWfPath, queryParams, JobType.WORKFLOW);
+ LOGGER.info(String.format("resp from validating asset=[%s]",
+ dryRunResp));
+ if (dryRunResp != null && dryRunResp.trim().startsWith("{")) {
+ JsonElement jsonElement = new JsonParser().parse(dryRunResp);
+ JsonElement idElem = jsonElement.getAsJsonObject().get("id");
+ if (idElem != null) {
+ result.put(STATUS_KEY, STATUS_OK);
+ } else {
+ result.put(STATUS_KEY, STATUS_FAILED);
+ result.put(MESSAGE_KEY, dryRunResp);
+ }
+ } else {
+ result.put(STATUS_KEY, STATUS_FAILED);
+ result.put(MESSAGE_KEY, dryRunResp);
+ }
+ return result;
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @GET
+ @Path("/{id}")
+ public Response getAssetDetail(@PathParam("id") String id) {
+ try {
+ AssetDefintion assetDefinition = assetService.getAssetDetail(id);
+ APIResult result = new APIResult();
+ result.setStatus(APIResult.Status.SUCCESS);
+ result.setData(assetDefinition);
+ return Response.ok(result).build();
+ } catch (Exception e) {
+ throw new ServiceFormattedException(e);
+ }
+ }
+
+ @GET
+ @Path("/definition/id}")
+ public Response getAssetDefinition(@PathParam("defnitionId") String id) {
+ try {
+ ActionAssetDefinition assetDefinition = assetService
+ .getAssetDefinition(id);
+ APIResult result = new APIResult();
+ result.setStatus(APIResult.Status.SUCCESS);
+ result.setData(assetDefinition);
+ return Response.ok(result).build();
+ } catch (Exception e) {
+ throw new ServiceFormattedException(e);
+ }
+ }
+
+ @DELETE
+ @Path("/{id}")
+ public Response delete(@PathParam("id") String id) {
+ try {
+ ActionAsset asset = assetService.getAsset(id);
+ if (asset == null) {
+ throw new RuntimeException("Asset doesnt exist");
+ }
+ if (!viewContext.getUsername().equals(asset.getOwner())){
+ throw new RuntimeException(
+ "Dont have permission to delete this asset");
+ }
+ assetService.deleteAsset(id);
+ APIResult result = new APIResult();
+ result.setStatus(APIResult.Status.SUCCESS);
+ return Response.ok(result).build();
+ } catch (Exception e) {
+ throw new ServiceFormattedException(e);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/47a98824/contrib/views/wfmanager/src/main/java/org/apache/oozie/ambari/view/assets/AssetService.java
----------------------------------------------------------------------
diff --git a/contrib/views/wfmanager/src/main/java/org/apache/oozie/ambari/view/assets/AssetService.java b/contrib/views/wfmanager/src/main/java/org/apache/oozie/ambari/view/assets/AssetService.java
index 648a89f..9fe2f9c 100644
--- a/contrib/views/wfmanager/src/main/java/org/apache/oozie/ambari/view/assets/AssetService.java
+++ b/contrib/views/wfmanager/src/main/java/org/apache/oozie/ambari/view/assets/AssetService.java
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -21,31 +21,95 @@ import java.util.Collection;
import org.apache.ambari.view.ViewContext;
import org.apache.oozie.ambari.view.assets.model.ActionAsset;
+import org.apache.oozie.ambari.view.assets.model.ActionAssetDefinition;
+import org.apache.oozie.ambari.view.assets.model.AssetDefintion;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class AssetService {
- private AssetRepo assetRepo;
+ private final static Logger LOGGER = LoggerFactory
+ .getLogger(AssetService.class);
+ private final AssetRepo assetRepo;
+ private final AssetDefinitionRepo assetDefinitionRepo;
- public AssetService(ViewContext viewContext) {
- super();
- assetRepo = new AssetRepo(viewContext.getDataStore());
- }
+ private final ViewContext viewContext;
- public Collection<ActionAsset> getAssets() {
- return null;
- }
+ public AssetService(ViewContext viewContext) {
+ super();
+ this.viewContext = viewContext;
- public Collection<ActionAsset> getPrioritizedAssets() {
- Collection<ActionAsset> assets = getAssets();
- // reorder
- return assets;
- }
+ this.assetDefinitionRepo = new AssetDefinitionRepo(
+ viewContext.getDataStore());
+ this.assetRepo = new AssetRepo(viewContext.getDataStore());
- public void addAsset() {
+ }
- }
+ public Collection<ActionAsset> getAssets() {
+ return assetRepo.findAll();
+ }
- public void deleteAsset() {
+ public Collection<ActionAsset> getPrioritizedAssets() {
+ Collection<ActionAsset> assets = getAssets();
+ return assets;
+ }
+ public void saveAsset(String assetId, String userName,
+ AssetDefintion assetDefinition) {
+ if (assetId != null) {
+ ActionAsset actionAsset = assetRepo.findById(assetId);
+ if (actionAsset == null) {
+ throw new RuntimeException("could not find asset with id :"
+ + assetId);
+ }
+ actionAsset.setDescription(assetDefinition.getDescription());
+ actionAsset.setName(assetDefinition.getName());
+ actionAsset.setType(assetDefinition.getType());
+ ActionAssetDefinition assetDefinintion = assetDefinitionRepo
+ .findById(actionAsset.getDefinitionRef());
+ assetDefinintion.setData(assetDefinintion.getData());
+ assetDefinitionRepo.update(assetDefinintion);
+ assetRepo.update(actionAsset);
+ } else {
+ ActionAsset actionAsset = new ActionAsset();
+ actionAsset.setOwner(userName);
+ ActionAssetDefinition definition = new ActionAssetDefinition();
+ definition.setData(assetDefinition.getDefinition());
+ ActionAssetDefinition createdDefinition = assetDefinitionRepo
+ .create(definition);
+ actionAsset.setDefinitionRef(createdDefinition.getId());
+ actionAsset.setDescription(assetDefinition.getDescription());
+ actionAsset.setName(assetDefinition.getName());
+ actionAsset.setType(assetDefinition.getType());
+ assetRepo.create(actionAsset);
}
+ }
+
+
+ public void deleteAsset(String id) {
+ assetRepo.deleteById(id);
+ }
+
+ public AssetDefintion getAssetDetail(String assetId) {
+ AssetDefintion ad = new AssetDefintion();
+ ActionAsset actionAsset = assetRepo.findById(assetId);
+ ActionAssetDefinition actionDefinition = assetDefinitionRepo
+ .findById(actionAsset.getDefinitionRef());
+ ad.setDefinition(actionDefinition.getData());
+ ad.setDescription(actionAsset.getDescription());
+ ad.setName(actionAsset.getName());
+ return ad;
+ }
+
+ public ActionAssetDefinition getAssetDefinition(String assetDefintionId) {
+ return assetDefinitionRepo.findById(assetDefintionId);
+ }
+
+ public ActionAsset getAsset(String id) {
+ return assetRepo.findById(id);
+ }
+
+ public Collection<ActionAsset> getMyAssets() {
+ return assetRepo.getMyAsets(viewContext.getUsername());
+ }
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/47a98824/contrib/views/wfmanager/src/main/java/org/apache/oozie/ambari/view/assets/model/ActionAsset.java
----------------------------------------------------------------------
diff --git a/contrib/views/wfmanager/src/main/java/org/apache/oozie/ambari/view/assets/model/ActionAsset.java b/contrib/views/wfmanager/src/main/java/org/apache/oozie/ambari/view/assets/model/ActionAsset.java
index 8eb6081..200a4aa 100644
--- a/contrib/views/wfmanager/src/main/java/org/apache/oozie/ambari/view/assets/model/ActionAsset.java
+++ b/contrib/views/wfmanager/src/main/java/org/apache/oozie/ambari/view/assets/model/ActionAsset.java
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -17,44 +17,74 @@
*/
package org.apache.oozie.ambari.view.assets.model;
+import org.apache.oozie.ambari.view.AssetDefinitionRefType;
+import org.apache.oozie.ambari.view.EntityStatus;
import org.apache.oozie.ambari.view.model.BaseModel;
+import org.apache.oozie.ambari.view.model.Indexed;
-public class ActionAsset extends BaseModel {
- private String id;
- private String name;
- private String description;
- private String assetLocation;
- private String type;
-
- public String getId() {
- return id;
- }
- public void setId(String id) {
- this.id = id;
- }
- public String getName() {
- return name;
- }
- public void setName(String name) {
- this.name = name;
- }
- public String getDescription() {
- return description;
- }
- public void setDescription(String description) {
- this.description = description;
- }
-
- public String getType() {
- return type;
- }
- public void setType(String type) {
- this.type = type;
- }
- public String getAssetLocation() {
- return assetLocation;
- }
- public void setAssetLocation(String assetLocation) {
- this.assetLocation = assetLocation;
- }
+public class ActionAsset extends BaseModel implements Indexed {
+ private static final long serialVersionUID = 1L;
+ private String id;
+ private String name;
+ private String description;
+ private String type;
+ private String definitionRefType = AssetDefinitionRefType.DB.name();//can be db or fs
+ private String definitionRef;//point to dbid or filesystem
+ private String status = EntityStatus.DRAFT.name();
+
+ public String getId() {
+ return id;
+ }
+
+ public void setId(String id) {
+ this.id = id;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ public String getDescription() {
+ return description;
+ }
+
+ public void setDescription(String description) {
+ this.description = description;
+ }
+
+ public String getType() {
+ return type;
+ }
+
+ public void setType(String type) {
+ this.type = type;
+ }
+
+ public String getDefinitionRef() {
+ return definitionRef;
+ }
+
+ public void setDefinitionRef(String definitionRef) {
+ this.definitionRef = definitionRef;
+ }
+
+ public String getDefinitionRefType() {
+ return definitionRefType;
+ }
+
+ public void setDefinitionRefType(String definitionRefType) {
+ this.definitionRefType = definitionRefType;
+ }
+
+ public String getStatus() {
+ return status;
+ }
+
+ public void setStatus(String status) {
+ this.status = status;
+ }
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/47a98824/contrib/views/wfmanager/src/main/java/org/apache/oozie/ambari/view/assets/model/ActionAssetDefinition.java
----------------------------------------------------------------------
diff --git a/contrib/views/wfmanager/src/main/java/org/apache/oozie/ambari/view/assets/model/ActionAssetDefinition.java b/contrib/views/wfmanager/src/main/java/org/apache/oozie/ambari/view/assets/model/ActionAssetDefinition.java
new file mode 100644
index 0000000..0c6e630
--- /dev/null
+++ b/contrib/views/wfmanager/src/main/java/org/apache/oozie/ambari/view/assets/model/ActionAssetDefinition.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.ambari.view.assets.model;
+
+import org.apache.oozie.ambari.view.model.Indexed;
+
+public class ActionAssetDefinition implements Indexed {
+ private String id;
+ private String data;
+
+ public String getId() {
+ return id;
+ }
+
+ public void setId(String id) {
+ this.id = id;
+ }
+
+ public String getData() {
+ return data;
+ }
+
+ public void setData(String data) {
+ this.data = data;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/47a98824/contrib/views/wfmanager/src/main/java/org/apache/oozie/ambari/view/assets/model/AssetDefintion.java
----------------------------------------------------------------------
diff --git a/contrib/views/wfmanager/src/main/java/org/apache/oozie/ambari/view/assets/model/AssetDefintion.java b/contrib/views/wfmanager/src/main/java/org/apache/oozie/ambari/view/assets/model/AssetDefintion.java
new file mode 100644
index 0000000..e80be27
--- /dev/null
+++ b/contrib/views/wfmanager/src/main/java/org/apache/oozie/ambari/view/assets/model/AssetDefintion.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.ambari.view.assets.model;
+
+import org.apache.oozie.ambari.view.AssetDefinitionRefType;
+
+public class AssetDefintion {
+ private String definition = AssetDefinitionRefType.DB.name();
+ private String type;
+ private String name;
+ private String description;
+ private String status;
+
+ public String getDefinition() {
+ return definition;
+ }
+
+ public void setDefinition(String definition) {
+ this.definition = definition;
+ }
+
+ public String getType() {
+ return type;
+ }
+
+ public void setType(String type) {
+ this.type = type;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ public String getDescription() {
+ return description;
+ }
+
+ public void setDescription(String description) {
+ this.description = description;
+ }
+
+ public String getStatus() {
+ return status;
+ }
+
+ public void setStatus(String status) {
+ this.status = status;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/47a98824/contrib/views/wfmanager/src/main/java/org/apache/oozie/ambari/view/model/APIResult.java
----------------------------------------------------------------------
diff --git a/contrib/views/wfmanager/src/main/java/org/apache/oozie/ambari/view/model/APIResult.java b/contrib/views/wfmanager/src/main/java/org/apache/oozie/ambari/view/model/APIResult.java
new file mode 100644
index 0000000..2a8075a
--- /dev/null
+++ b/contrib/views/wfmanager/src/main/java/org/apache/oozie/ambari/view/model/APIResult.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.ambari.view.model;
+
+public class APIResult {
+
+ private Status status;
+ private Object data;
+ private Paging paging = new Paging();
+ ;
+
+ public Status getStatus() {
+ return status;
+ }
+
+ public void setStatus(Status status) {
+ this.status = status;
+ }
+
+
+ public Object getData() {
+ return data;
+ }
+
+ public void setData(Object data) {
+ this.data = data;
+ }
+
+ public Paging getPaging() {
+ return paging;
+
+ }
+
+ public void setPaging(Paging paging) {
+ this.paging = paging;
+ }
+
+
+ public static enum Status {
+ SUCCESS,
+ ERROR
+ }
+
+ public static void main(String[] args) {
+ System.out.println("hello");
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/47a98824/contrib/views/wfmanager/src/main/java/org/apache/oozie/ambari/view/model/BaseModel.java
----------------------------------------------------------------------
diff --git a/contrib/views/wfmanager/src/main/java/org/apache/oozie/ambari/view/model/BaseModel.java b/contrib/views/wfmanager/src/main/java/org/apache/oozie/ambari/view/model/BaseModel.java
index 553ce24..76d4d84 100644
--- a/contrib/views/wfmanager/src/main/java/org/apache/oozie/ambari/view/model/BaseModel.java
+++ b/contrib/views/wfmanager/src/main/java/org/apache/oozie/ambari/view/model/BaseModel.java
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -20,29 +20,34 @@ package org.apache.oozie.ambari.view.model;
import java.io.Serializable;
-public class BaseModel implements Serializable{
- private static final long serialVersionUID = 1L;
- private String createdAt;
- private String updatedAt;
- private String owner;
-
- public String getCreatedAt() {
- return createdAt;
- }
- public void setCreatedAt(String createdAt) {
- this.createdAt = createdAt;
- }
- public String getUpdatedAt() {
- return updatedAt;
- }
- public void setUpdatedAt(String updatedAt) {
- this.updatedAt = updatedAt;
- }
- public String getOwner() {
- return owner;
- }
- public void setOwner(String owner) {
- this.owner = owner;
- }
-
+public class BaseModel implements When, Serializable {
+ private static final long serialVersionUID = 1L;
+ private String createdAt;
+ private String updatedAt;
+ private String owner;
+
+ public String getCreatedAt() {
+ return createdAt;
+ }
+
+ public void setCreatedAt(String createdAt) {
+ this.createdAt = createdAt;
+ }
+
+ public String getUpdatedAt() {
+ return updatedAt;
+ }
+
+ public void setUpdatedAt(String updatedAt) {
+ this.updatedAt = updatedAt;
+ }
+
+ public String getOwner() {
+ return owner;
+ }
+
+ public void setOwner(String owner) {
+ this.owner = owner;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/47a98824/contrib/views/wfmanager/src/main/java/org/apache/oozie/ambari/view/model/Indexed.java
----------------------------------------------------------------------
diff --git a/contrib/views/wfmanager/src/main/java/org/apache/oozie/ambari/view/model/Indexed.java b/contrib/views/wfmanager/src/main/java/org/apache/oozie/ambari/view/model/Indexed.java
new file mode 100644
index 0000000..b391cbe
--- /dev/null
+++ b/contrib/views/wfmanager/src/main/java/org/apache/oozie/ambari/view/model/Indexed.java
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.ambari.view.model;
+
+public interface Indexed {
+ String getId();
+
+ void setId(String id);
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/47a98824/contrib/views/wfmanager/src/main/java/org/apache/oozie/ambari/view/model/Paging.java
----------------------------------------------------------------------
diff --git a/contrib/views/wfmanager/src/main/java/org/apache/oozie/ambari/view/model/Paging.java b/contrib/views/wfmanager/src/main/java/org/apache/oozie/ambari/view/model/Paging.java
new file mode 100644
index 0000000..f6f1468
--- /dev/null
+++ b/contrib/views/wfmanager/src/main/java/org/apache/oozie/ambari/view/model/Paging.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.ambari.view.model;
+
+public class Paging {
+ private Long total;
+
+ public Long getTotal() {
+ return total;
+ }
+
+ public void setTotal(Long total) {
+ this.total = total;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/47a98824/contrib/views/wfmanager/src/main/java/org/apache/oozie/ambari/view/model/When.java
----------------------------------------------------------------------
diff --git a/contrib/views/wfmanager/src/main/java/org/apache/oozie/ambari/view/model/When.java b/contrib/views/wfmanager/src/main/java/org/apache/oozie/ambari/view/model/When.java
new file mode 100644
index 0000000..aa39c84
--- /dev/null
+++ b/contrib/views/wfmanager/src/main/java/org/apache/oozie/ambari/view/model/When.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.ambari.view.model;
+
+public interface When {
+ String getCreatedAt();
+
+ void setCreatedAt(String createdAt);
+
+ String getUpdatedAt();
+
+ void setUpdatedAt(String updatedAt);
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/47a98824/contrib/views/wfmanager/src/main/java/org/apache/oozie/ambari/view/repo/BaseRepo.java
----------------------------------------------------------------------
diff --git a/contrib/views/wfmanager/src/main/java/org/apache/oozie/ambari/view/repo/BaseRepo.java b/contrib/views/wfmanager/src/main/java/org/apache/oozie/ambari/view/repo/BaseRepo.java
new file mode 100644
index 0000000..c05f475
--- /dev/null
+++ b/contrib/views/wfmanager/src/main/java/org/apache/oozie/ambari/view/repo/BaseRepo.java
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.ambari.view.repo;
+
+import java.util.Collection;
+import java.util.Date;
+
+import org.apache.ambari.view.DataStore;
+import org.apache.ambari.view.PersistenceException;
+import org.apache.oozie.ambari.view.model.Indexed;
+import org.apache.oozie.ambari.view.model.When;
+
+public class BaseRepo<T> {
+
+ protected final DataStore dataStore;
+ private final Class type;
+
+ public BaseRepo(Class type, DataStore dataStore) {
+ this.type = type;
+ this.dataStore = dataStore;
+ }
+
+ public String generateId() {
+ return java.util.UUID.randomUUID().toString();
+ }
+
+ public Collection<T> findAll() {
+ try {
+ return dataStore.findAll(type, null);
+ } catch (PersistenceException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public T findById(String id) {
+ try {
+ return (T) dataStore.find(type, id);
+ } catch (PersistenceException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public T create(T obj) {
+ try {
+ if (obj instanceof Indexed) {
+ Indexed idxObj = (Indexed) obj;
+ if (idxObj.getId() == null) {
+ idxObj.setId(this.generateId());
+ } else {
+ T findById = findById(idxObj.getId());
+ if (findById != null) {
+ throw new RuntimeException("Object already exist in db");
+ }
+ }
+ }
+ if (obj instanceof When) {
+ Date now = new Date();
+ When when = (When) obj;
+ when.setCreatedAt(String.valueOf(now.getTime()));
+ when.setUpdatedAt(String.valueOf(now.getTime()));
+ }
+ this.dataStore.store(obj);
+ return obj;
+ } catch (PersistenceException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public void update(T obj) {
+ try {
+ if (obj instanceof When) {
+ Date now = new Date();
+ When when = (When) obj;
+ when.setUpdatedAt(String.valueOf(now.getTime()));
+ }
+ this.dataStore.store(obj);
+ } catch (PersistenceException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public void delete(T obj) {
+ try {
+ this.dataStore.remove(obj);
+ } catch (PersistenceException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public void deleteById(String id) {
+ try {
+ T findById = this.findById(id);
+ this.dataStore.remove(findById);
+ } catch (PersistenceException e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/47a98824/contrib/views/wfmanager/src/main/java/org/apache/oozie/ambari/view/workflowmanager/WorkflowManagerService.java
----------------------------------------------------------------------
diff --git a/contrib/views/wfmanager/src/main/java/org/apache/oozie/ambari/view/workflowmanager/WorkflowManagerService.java b/contrib/views/wfmanager/src/main/java/org/apache/oozie/ambari/view/workflowmanager/WorkflowManagerService.java
index 4c88454..afdee9e 100644
--- a/contrib/views/wfmanager/src/main/java/org/apache/oozie/ambari/view/workflowmanager/WorkflowManagerService.java
+++ b/contrib/views/wfmanager/src/main/java/org/apache/oozie/ambari/view/workflowmanager/WorkflowManagerService.java
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -25,52 +25,64 @@ import org.apache.ambari.view.ViewContext;
import org.apache.oozie.ambari.view.HDFSFileUtils;
import org.apache.oozie.ambari.view.JobType;
import org.apache.oozie.ambari.view.workflowmanager.model.Workflow;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class WorkflowManagerService {
+ private final static Logger LOGGER = LoggerFactory
+ .getLogger(WorkflowManagerService.class);
+ private final WorkflowsRepo workflowsRepository;
+ private final HDFSFileUtils hdfsFileUtils;
- private WorkflowsRepo workflowsRepository;
- private HDFSFileUtils hdfsFileUtils;
+ public WorkflowManagerService(ViewContext viewContext) {
+ workflowsRepository = new WorkflowsRepo(viewContext.getDataStore());
+ hdfsFileUtils = new HDFSFileUtils(viewContext);
+ }
- public WorkflowManagerService(ViewContext viewContext) {
- workflowsRepository = new WorkflowsRepo(viewContext.getDataStore());
- hdfsFileUtils = new HDFSFileUtils(viewContext);
+ public void saveWorkflow(String projectId, String path, JobType jobType,
+ String descripton, String userName, String name) {
+ LOGGER.debug("save workflow called");
+ if (projectId != null) {
+ Workflow workflowById = workflowsRepository.findById(projectId);
+ if (workflowById == null) {
+ throw new RuntimeException("could not find project with id :"
+ + projectId);
+ }
+ setWorkflowAttributes(jobType, userName, name, workflowById);
+ workflowsRepository.update(workflowById);
+ } else {
+ Workflow wf = new Workflow();
+ wf.setId(workflowsRepository.generateId());
+ setWorkflowAttributes(jobType, userName, name, wf);
+ wf.setWorkflowDefinitionPath(path);
+ workflowsRepository.create(wf);
}
+ }
- public void saveWorkflow(String path, JobType jobType, String descripton,
- String userName) {
- // workflowsRepository.getWorkflow(path);
- Workflow workflowByPath = getWorkflowByPath(path);
- if (workflowByPath == null) {
- Workflow wf = new Workflow();
- wf.setOwner(userName);
- wf.setType(jobType.name());
- wf.setWorkflowDefinitionPath(path);
- Date now = new Date();
- wf.setUpdatedAt(String.valueOf(now.getTime()));
- workflowsRepository.updateWorkflow(wf);
- } else {
- Date now = new Date();
- workflowByPath.setUpdatedAt(String.valueOf(now.getTime()));
- workflowsRepository.updateWorkflow(workflowByPath);
- }
- }
+ private void setWorkflowAttributes(JobType jobType, String userName,
+ String name, Workflow wf) {
+ wf.setOwner(userName);
+ wf.setName(name);
+ wf.setType(jobType.name());
+ }
- public Collection<Workflow> getAllWorkflows() {
- return workflowsRepository.getAllWorkflows();
- }
+ public Collection<Workflow> getAllWorkflows() {
+ return workflowsRepository.findAll();
+ }
- public Workflow getWorkflowByPath(String path) {
- return workflowsRepository.getWorkflow(path);
- }
+ public Workflow getWorkflowByPath(String path) {
+ return workflowsRepository.getWorkflowByPath(path);
+ }
- public void deleteWorkflow(String path, Boolean deleteDefinition) {
- if (deleteDefinition) {
- try {
- hdfsFileUtils.deleteFile(path);
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
- workflowsRepository.deleteWorkflow(path);
+ public void deleteWorkflow(String projectId, Boolean deleteDefinition) {
+ Workflow workflow = workflowsRepository.findById(projectId);
+ if (deleteDefinition) {
+ try {
+ hdfsFileUtils.deleteFile(workflow.getWorkflowDefinitionPath());
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
}
+ workflowsRepository.delete(workflow);
+ }
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/47a98824/contrib/views/wfmanager/src/main/java/org/apache/oozie/ambari/view/workflowmanager/WorkflowsManagerResource.java
----------------------------------------------------------------------
diff --git a/contrib/views/wfmanager/src/main/java/org/apache/oozie/ambari/view/workflowmanager/WorkflowsManagerResource.java b/contrib/views/wfmanager/src/main/java/org/apache/oozie/ambari/view/workflowmanager/WorkflowsManagerResource.java
index 17a3296..64e2060a 100644
--- a/contrib/views/wfmanager/src/main/java/org/apache/oozie/ambari/view/workflowmanager/WorkflowsManagerResource.java
+++ b/contrib/views/wfmanager/src/main/java/org/apache/oozie/ambari/view/workflowmanager/WorkflowsManagerResource.java
@@ -23,6 +23,8 @@ import java.util.Map;
import javax.ws.rs.DELETE;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
import javax.ws.rs.QueryParam;
import org.apache.ambari.view.ViewContext;
@@ -44,8 +46,10 @@ public class WorkflowsManagerResource {
@DELETE
- public void deleteWorkflow( @QueryParam("worfkflowPath") String path,
+ @Path("/projectId")
+ public void deleteWorkflow( @PathParam("projectId") String id,
@DefaultValue("false") @QueryParam("deleteDefinition") Boolean deleteDefinition){
- workflowManagerService.deleteWorkflow(path,deleteDefinition);
+ workflowManagerService.deleteWorkflow(id,deleteDefinition);
}
+
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/47a98824/contrib/views/wfmanager/src/main/java/org/apache/oozie/ambari/view/workflowmanager/WorkflowsRepo.java
----------------------------------------------------------------------
diff --git a/contrib/views/wfmanager/src/main/java/org/apache/oozie/ambari/view/workflowmanager/WorkflowsRepo.java b/contrib/views/wfmanager/src/main/java/org/apache/oozie/ambari/view/workflowmanager/WorkflowsRepo.java
index 978c059..7787bda 100644
--- a/contrib/views/wfmanager/src/main/java/org/apache/oozie/ambari/view/workflowmanager/WorkflowsRepo.java
+++ b/contrib/views/wfmanager/src/main/java/org/apache/oozie/ambari/view/workflowmanager/WorkflowsRepo.java
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -17,53 +17,24 @@
*/
package org.apache.oozie.ambari.view.workflowmanager;
-import java.util.Collection;
-
import org.apache.ambari.view.DataStore;
import org.apache.ambari.view.PersistenceException;
+import org.apache.oozie.ambari.view.repo.BaseRepo;
import org.apache.oozie.ambari.view.workflowmanager.model.Workflow;
-public class WorkflowsRepo {
- private final DataStore dataStore;
+public class WorkflowsRepo extends BaseRepo<Workflow> {
- public WorkflowsRepo(DataStore dataStore) {
- super();
- this.dataStore=dataStore;
- }
- public Collection<Workflow> getAllWorkflows(){
- try {
- return dataStore.findAll(Workflow.class,null);
- } catch (PersistenceException e) {
- throw new RuntimeException(e);
- }
- }
- public void deleteWorkflow(String workflowPath){
- try {
- Workflow workflow = this.getWorkflow(workflowPath);
- this.dataStore.remove(workflow);
- } catch (PersistenceException e) {
- throw new RuntimeException(e);
- }
- }
- public void createWorkflow(Workflow wf){
- try {
- this.dataStore.store(wf);
- } catch (PersistenceException e) {
- throw new RuntimeException(e);
- }
- }
- public void updateWorkflow(Workflow wf){
- try {
- this.dataStore.store(wf);
- } catch (PersistenceException e) {
- throw new RuntimeException(e);
- }
- }
- public Workflow getWorkflow(String path) {
- try {
- return this.dataStore.find(Workflow.class, "workflowDefinitionPath='"+path+"'");
- } catch (PersistenceException e) {
- throw new RuntimeException(e);
- }
+ public WorkflowsRepo(DataStore dataStore) {
+ super(Workflow.class, dataStore);
+
+ }
+
+ public Workflow getWorkflowByPath(String path) {
+ try {
+ return this.dataStore.find(Workflow.class,
+ "workflowDefinitionPath='" + path + "'");
+ } catch (PersistenceException e) {
+ throw new RuntimeException(e);
}
+ }
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/47a98824/contrib/views/wfmanager/src/main/java/org/apache/oozie/ambari/view/workflowmanager/model/Workflow.java
----------------------------------------------------------------------
diff --git a/contrib/views/wfmanager/src/main/java/org/apache/oozie/ambari/view/workflowmanager/model/Workflow.java b/contrib/views/wfmanager/src/main/java/org/apache/oozie/ambari/view/workflowmanager/model/Workflow.java
index ad5f48b..cc19c80 100644
--- a/contrib/views/wfmanager/src/main/java/org/apache/oozie/ambari/view/workflowmanager/model/Workflow.java
+++ b/contrib/views/wfmanager/src/main/java/org/apache/oozie/ambari/view/workflowmanager/model/Workflow.java
@@ -18,71 +18,72 @@
package org.apache.oozie.ambari.view.workflowmanager.model;
import org.apache.oozie.ambari.view.model.BaseModel;
+import org.apache.oozie.ambari.view.model.Indexed;
+
+public class Workflow extends BaseModel implements Indexed {
+ private static final long serialVersionUID = 1L;
+ private String id = null;
+ private String name;
+ private String desciption;
+ private String workflowDefinitionPath;
+ private String type;
+ private String isDraft;
+ private String definitionMissing;//true or not if path is fine.
+
+ public String getType() {
+ return type;
+ }
+
+ public void setType(String type) {
+ this.type = type;
+ }
+
+ public String getId() {
+ return id;
+ }
+
+ public void setId(String id) {
+ this.id = id;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ public String getWorkflowDefinitionPath() {
+ return workflowDefinitionPath;
+ }
+
+ public void setWorkflowDefinitionPath(String workflowDefinitionPath) {
+ this.workflowDefinitionPath = workflowDefinitionPath;
+ }
+
+ public String getIsDraft() {
+ return isDraft;
+ }
+
+ public void setIsDraft(String isDraft) {
+ this.isDraft = isDraft;
+ }
+
+ public String getDesciption() {
+ return desciption;
+ }
+
+ public void setDesciption(String desciption) {
+ this.desciption = desciption;
+ }
+
+ public String getDefinitionMissing() {
+ return definitionMissing;
+ }
+
+ public void setDefinitionMissing(String definitionMissing) {
+ this.definitionMissing = definitionMissing;
+ }
-public class Workflow extends BaseModel{
- private static final long serialVersionUID = 1L;
- private String id = null;
- private String name;
- private String desciption;
- private String workflowDefinitionPath;
- private String type;
- private String isDraft;
- private String definitionMissing;//true or not if path is fine.
-
- public String getType() {
- return type;
- }
-
- public void setType(String type) {
- this.type = type;
- }
-
- public String getId() {
- return id;
- }
-
- public void setId(String id) {
- this.id = id;
- }
-
- public String getName() {
- return name;
- }
-
- public void setName(String name) {
- this.name = name;
- }
-
- public String getWorkflowDefinitionPath() {
- return workflowDefinitionPath;
- }
-
- public void setWorkflowDefinitionPath(String workflowDefinitionPath) {
- this.workflowDefinitionPath = workflowDefinitionPath;
- }
-
- public String getIsDraft() {
- return isDraft;
- }
-
- public void setIsDraft(String isDraft) {
- this.isDraft = isDraft;
- }
-
- public String getDesciption() {
- return desciption;
- }
-
- public void setDesciption(String desciption) {
- this.desciption = desciption;
- }
-
- public String getDefinitionMissing() {
- return definitionMissing;
- }
-
- public void setDefinitionMissing(String definitionMissing) {
- this.definitionMissing = definitionMissing;
- }
-
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/47a98824/contrib/views/wfmanager/src/main/resources/view.xml
----------------------------------------------------------------------
diff --git a/contrib/views/wfmanager/src/main/resources/view.xml b/contrib/views/wfmanager/src/main/resources/view.xml
index bfa1253..b8e2fcc 100644
--- a/contrib/views/wfmanager/src/main/resources/view.xml
+++ b/contrib/views/wfmanager/src/main/resources/view.xml
@@ -120,6 +120,23 @@
<required>false</required>
</parameter>
+ <persistence>
+ <!--
+ <entity>
+ <class>org.apache.oozie.ambari.view.workflowmanager.model.Workflow</class>
+ <id-property>id</id-property>
+ </entity> -->
+ <entity>
+ <class>org.apache.oozie.ambari.view.assets.model.ActionAssetDefinition</class>
+ <id-property>id</id-property>
+ </entity>
+ <entity>
+ <class>org.apache.oozie.ambari.view.assets.model.ActionAsset</class>
+ <id-property>id</id-property>
+ </entity>
+
+ </persistence>
+
<!--<auto-instance>
<name>AUTO_OOZIE_VIEW</name>
<label>Oozie UI View</label>
[2/2] ambari git commit: AMBARI-19256 : Asset support Rest API
(Belliraj HB via nitirajrathore)
Posted by ni...@apache.org.
AMBARI-19256 : Asset support Rest API (Belliraj HB via nitirajrathore)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/47a98824
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/47a98824
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/47a98824
Branch: refs/heads/trunk
Commit: 47a98824566ad3015b9c9655a53b34e171f43ae9
Parents: e900093
Author: Nitiraj Rathore <ni...@gmail.com>
Authored: Fri Dec 23 16:11:12 2016 +0530
Committer: Nitiraj Rathore <ni...@gmail.com>
Committed: Fri Dec 23 16:12:35 2016 +0530
----------------------------------------------------------------------
.../ambari/view/AssetDefinitionRefType.java | 23 +
.../org/apache/oozie/ambari/view/Constants.java | 25 +
.../apache/oozie/ambari/view/EntityStatus.java | 23 +
.../apache/oozie/ambari/view/OozieDelegate.java | 243 ++++
.../ambari/view/OozieProxyImpersonator.java | 1087 ++++++++----------
.../apache/oozie/ambari/view/OozieUtils.java | 226 ++--
.../ambari/view/ServiceFormattedException.java | 53 +
.../oozie/ambari/view/WorkflowFilesService.java | 176 +--
.../ambari/view/assets/AssetDefinitionRepo.java | 29 +
.../oozie/ambari/view/assets/AssetRepo.java | 37 +-
.../oozie/ambari/view/assets/AssetResource.java | 197 ++++
.../oozie/ambari/view/assets/AssetService.java | 102 +-
.../ambari/view/assets/model/ActionAsset.java | 112 +-
.../assets/model/ActionAssetDefinition.java | 42 +
.../view/assets/model/AssetDefintion.java | 69 ++
.../oozie/ambari/view/model/APIResult.java | 63 +
.../oozie/ambari/view/model/BaseModel.java | 61 +-
.../apache/oozie/ambari/view/model/Indexed.java | 24 +
.../apache/oozie/ambari/view/model/Paging.java | 30 +
.../apache/oozie/ambari/view/model/When.java | 28 +
.../apache/oozie/ambari/view/repo/BaseRepo.java | 113 ++
.../workflowmanager/WorkflowManagerService.java | 94 +-
.../WorkflowsManagerResource.java | 8 +-
.../view/workflowmanager/WorkflowsRepo.java | 63 +-
.../view/workflowmanager/model/Workflow.java | 133 +--
.../views/wfmanager/src/main/resources/view.xml | 17 +
26 files changed, 2006 insertions(+), 1072 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ambari/blob/47a98824/contrib/views/wfmanager/src/main/java/org/apache/oozie/ambari/view/AssetDefinitionRefType.java
----------------------------------------------------------------------
diff --git a/contrib/views/wfmanager/src/main/java/org/apache/oozie/ambari/view/AssetDefinitionRefType.java b/contrib/views/wfmanager/src/main/java/org/apache/oozie/ambari/view/AssetDefinitionRefType.java
new file mode 100644
index 0000000..8c96504
--- /dev/null
+++ b/contrib/views/wfmanager/src/main/java/org/apache/oozie/ambari/view/AssetDefinitionRefType.java
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.ambari.view;
+
+public enum AssetDefinitionRefType {
+ HDFS,
+ DB
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/47a98824/contrib/views/wfmanager/src/main/java/org/apache/oozie/ambari/view/Constants.java
----------------------------------------------------------------------
diff --git a/contrib/views/wfmanager/src/main/java/org/apache/oozie/ambari/view/Constants.java b/contrib/views/wfmanager/src/main/java/org/apache/oozie/ambari/view/Constants.java
new file mode 100644
index 0000000..238b002
--- /dev/null
+++ b/contrib/views/wfmanager/src/main/java/org/apache/oozie/ambari/view/Constants.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.ambari.view;
+
+public class Constants {
+ public static final String STATUS_FAILED = "failed";
+ public static final String STATUS_OK = "ok";
+ public static final String STATUS_KEY = "status";
+ public static final String MESSAGE_KEY = "message";
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/47a98824/contrib/views/wfmanager/src/main/java/org/apache/oozie/ambari/view/EntityStatus.java
----------------------------------------------------------------------
diff --git a/contrib/views/wfmanager/src/main/java/org/apache/oozie/ambari/view/EntityStatus.java b/contrib/views/wfmanager/src/main/java/org/apache/oozie/ambari/view/EntityStatus.java
new file mode 100644
index 0000000..6447cf2
--- /dev/null
+++ b/contrib/views/wfmanager/src/main/java/org/apache/oozie/ambari/view/EntityStatus.java
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.ambari.view;
+
+public enum EntityStatus {
+ DRAFT,
+ PUBLISHED
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ambari/blob/47a98824/contrib/views/wfmanager/src/main/java/org/apache/oozie/ambari/view/OozieDelegate.java
----------------------------------------------------------------------
diff --git a/contrib/views/wfmanager/src/main/java/org/apache/oozie/ambari/view/OozieDelegate.java b/contrib/views/wfmanager/src/main/java/org/apache/oozie/ambari/view/OozieDelegate.java
new file mode 100644
index 0000000..2779f05
--- /dev/null
+++ b/contrib/views/wfmanager/src/main/java/org/apache/oozie/ambari/view/OozieDelegate.java
@@ -0,0 +1,243 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.ambari.view;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import javax.ws.rs.HttpMethod;
+import javax.ws.rs.core.HttpHeaders;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.MultivaluedMap;
+import javax.ws.rs.core.Response;
+
+import org.apache.ambari.view.ViewContext;
+import org.apache.commons.io.IOUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class OozieDelegate {
+ private final static Logger LOGGER = LoggerFactory
+ .getLogger(OozieDelegate.class);
+ private static final String OOZIEPARAM_PREFIX = "oozieparam.";
+ private static final int OOZIEPARAM_PREFIX_LENGTH = OOZIEPARAM_PREFIX
+ .length();
+ private static final String EQUAL_SYMBOL = "=";
+ private static final String OOZIE_WF_RERUN_FAILNODES_CONF_KEY = "oozie.wf.rerun.failnodes";
+ private static final String OOZIE_USE_SYSTEM_LIBPATH_CONF_KEY = "oozie.use.system.libpath";
+ private static final String USER_NAME_HEADER = "user.name";
+ private static final String USER_OOZIE_SUPER = "oozie";
+ private static final String DO_AS_HEADER = "doAs";
+ private static final String SERVICE_URI_PROP = "oozie.service.uri";
+ private static final String DEFAULT_SERVICE_URI = "http://sandbox.hortonworks.com:11000/oozie";
+
+ private ViewContext viewContext;
+
+ private OozieUtils oozieUtils = new OozieUtils();
+ private final Utils utils = new Utils();
+ private final AmbariIOUtil ambariIOUtil;
+
+ public OozieDelegate(ViewContext viewContext) {
+ super();
+ this.viewContext = viewContext;
+ this.ambariIOUtil = new AmbariIOUtil(viewContext);
+ }
+
+ public String submitWorkflowJobToOozie(HttpHeaders headers,
+ String filePath, MultivaluedMap<String, String> queryParams,
+ JobType jobType) {
+ String nameNode = "hdfs://"
+ + viewContext.getCluster().getConfigurationValue("hdfs-site",
+ "dfs.namenode.rpc-address");
+
+ if (!queryParams.containsKey("config.nameNode")) {
+ ArrayList<String> nameNodes = new ArrayList<String>();
+ LOGGER.info("Namenode===" + nameNode);
+ nameNodes.add(nameNode);
+ queryParams.put("config.nameNode", nameNodes);
+ }
+
+ Map<String, String> workflowConigs = getWorkflowConfigs(filePath,
+ queryParams, jobType, nameNode);
+ String configXMl = oozieUtils.generateConfigXml(workflowConigs);
+ LOGGER.info("Config xml==" + configXMl);
+ HashMap<String, String> customHeaders = new HashMap<String, String>();
+ customHeaders.put("Content-Type", "application/xml;charset=UTF-8");
+ Response serviceResponse = consumeService(headers, getServiceUri()
+ + "/v2/jobs?" + getJobSumbitOozieParams(queryParams),
+ HttpMethod.POST, configXMl, customHeaders);
+
+ LOGGER.info("Resp from oozie status entity=="
+ + serviceResponse.getEntity());
+ if (serviceResponse.getEntity() instanceof String) {
+ return (String) serviceResponse.getEntity();
+ } else {
+ return "success";
+ }
+ }
+
+ public Response consumeService(HttpHeaders headers, String path,
+ MultivaluedMap<String, String> queryParameters, String method,
+ String body) throws Exception {
+ return consumeService(headers, this.buildUri(path, queryParameters),
+ method, body, null);
+ }
+
+ private Response consumeService(HttpHeaders headers, String urlToRead,
+ String method, String body, Map<String, String> customHeaders) {
+ Response response = null;
+ InputStream stream = readFromOozie(headers, urlToRead, method, body,
+ customHeaders);
+ String stringResponse = null;
+ try {
+ stringResponse = IOUtils.toString(stream);
+ } catch (IOException e) {
+ LOGGER.error("Error while converting stream to string", e);
+ throw new RuntimeException(e);
+ }
+ if (stringResponse.contains(Response.Status.BAD_REQUEST.name())) {
+ response = Response.status(Response.Status.BAD_REQUEST)
+ .entity(stringResponse).type(MediaType.TEXT_PLAIN).build();
+ } else {
+ response = Response.status(Response.Status.OK)
+ .entity(stringResponse)
+ .type(utils.deduceType(stringResponse)).build();
+ }
+ return response;
+ }
+
+ public InputStream readFromOozie(HttpHeaders headers, String urlToRead,
+ String method, String body, Map<String, String> customHeaders) {
+
+ Map<String, String> newHeaders = utils.getHeaders(headers);
+ newHeaders.put(USER_NAME_HEADER, USER_OOZIE_SUPER);
+
+ newHeaders.put(DO_AS_HEADER, viewContext.getUsername());
+ newHeaders.put("Accept", MediaType.APPLICATION_JSON);
+ if (customHeaders != null) {
+ newHeaders.putAll(customHeaders);
+ }
+ LOGGER.info(String.format("Proxy request for url: [%s] %s", method,
+ urlToRead));
+
+ return ambariIOUtil.readFromUrl(urlToRead, method, body, newHeaders);
+ }
+
+ private Map<String, String> getWorkflowConfigs(String filePath,
+ MultivaluedMap<String, String> queryParams, JobType jobType,
+ String nameNode) {
+ HashMap<String, String> workflowConigs = new HashMap<String, String>();
+ if (queryParams.containsKey("resourceManager")
+ && "useDefault".equals(queryParams.getFirst("resourceManager"))) {
+ String jobTrackerNode = viewContext.getCluster()
+ .getConfigurationValue("yarn-site",
+ "yarn.resourcemanager.address");
+ LOGGER.info("jobTrackerNode===" + jobTrackerNode);
+ workflowConigs.put("resourceManager", jobTrackerNode);
+ workflowConigs.put("jobTracker", jobTrackerNode);
+ }
+ if (queryParams != null) {
+ for (Map.Entry<String, List<String>> entry : queryParams.entrySet()) {
+ if (entry.getKey().startsWith("config.")) {
+ if (entry.getValue() != null && entry.getValue().size() > 0) {
+ workflowConigs.put(entry.getKey().substring(7), entry
+ .getValue().get(0));
+ }
+ }
+ }
+ }
+
+ if (queryParams.containsKey("oozieconfig.useSystemLibPath")) {
+ String useSystemLibPath = queryParams
+ .getFirst("oozieconfig.useSystemLibPath");
+ workflowConigs.put(OOZIE_USE_SYSTEM_LIBPATH_CONF_KEY,
+ useSystemLibPath);
+ } else {
+ workflowConigs.put(OOZIE_USE_SYSTEM_LIBPATH_CONF_KEY, "true");
+ }
+ if (queryParams.containsKey("oozieconfig.rerunOnFailure")) {
+ String rerunFailnodes = queryParams
+ .getFirst("oozieconfig.rerunOnFailure");
+ workflowConigs.put(OOZIE_WF_RERUN_FAILNODES_CONF_KEY,
+ rerunFailnodes);
+ } else {
+ workflowConigs.put(OOZIE_WF_RERUN_FAILNODES_CONF_KEY, "true");
+ }
+ workflowConigs.put("user.name", viewContext.getUsername());
+ workflowConigs.put(oozieUtils.getJobPathPropertyKey(jobType), nameNode
+ + filePath);
+ return workflowConigs;
+ }
+
+ private String getJobSumbitOozieParams(
+ MultivaluedMap<String, String> queryParams) {
+ StringBuilder query = new StringBuilder();
+ if (queryParams != null) {
+ for (Map.Entry<String, List<String>> entry : queryParams.entrySet()) {
+ if (entry.getKey().startsWith(OOZIEPARAM_PREFIX)) {
+ if (entry.getValue() != null && entry.getValue().size() > 0) {
+ for (String val : entry.getValue()) {
+ query.append(
+ entry.getKey().substring(
+ OOZIEPARAM_PREFIX_LENGTH))
+ .append(EQUAL_SYMBOL).append(val)
+ .append("&");
+ }
+ }
+ }
+ }
+ }
+ return query.toString();
+ }
+
+ private String getServiceUri() {
+ String serviceURI = viewContext.getProperties().get(SERVICE_URI_PROP) != null ? viewContext
+ .getProperties().get(SERVICE_URI_PROP) : DEFAULT_SERVICE_URI;
+ return serviceURI;
+ }
+
+ private String buildUri(String absolutePath,
+ MultivaluedMap<String, String> queryParameters) {
+ int index = absolutePath.indexOf("proxy/") + 5;
+ absolutePath = absolutePath.substring(index);
+ String serviceURI = getServiceUri();
+ serviceURI += absolutePath;
+ MultivaluedMap<String, String> params = addOrReplaceUserName(queryParameters);
+ return serviceURI + utils.convertParamsToUrl(params);
+ }
+
+ private MultivaluedMap<String, String> addOrReplaceUserName(
+ MultivaluedMap<String, String> parameters) {
+ for (Map.Entry<String, List<String>> entry : parameters.entrySet()) {
+ if ("user.name".equals(entry.getKey())) {
+ ArrayList<String> vals = new ArrayList<String>(1);
+ vals.add(viewContext.getUsername());
+ entry.setValue(vals);
+ }
+ }
+ return parameters;
+ }
+
+ public String getDagUrl(String jobid) {
+ return getServiceUri() + "/v2/job/" + jobid + "?show=graph";
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/47a98824/contrib/views/wfmanager/src/main/java/org/apache/oozie/ambari/view/OozieProxyImpersonator.java
----------------------------------------------------------------------
diff --git a/contrib/views/wfmanager/src/main/java/org/apache/oozie/ambari/view/OozieProxyImpersonator.java b/contrib/views/wfmanager/src/main/java/org/apache/oozie/ambari/view/OozieProxyImpersonator.java
index 08a166d..9dd02d4 100644
--- a/contrib/views/wfmanager/src/main/java/org/apache/oozie/ambari/view/OozieProxyImpersonator.java
+++ b/contrib/views/wfmanager/src/main/java/org/apache/oozie/ambari/view/OozieProxyImpersonator.java
@@ -17,12 +17,14 @@
*/
package org.apache.oozie.ambari.view;
+import static org.apache.oozie.ambari.view.Constants.MESSAGE_KEY;
+import static org.apache.oozie.ambari.view.Constants.STATUS_KEY;
+import static org.apache.oozie.ambari.view.Constants.STATUS_OK;
+
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
-import java.util.ArrayList;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
import javax.inject.Inject;
@@ -40,8 +42,8 @@ import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.core.MediaType;
-import javax.ws.rs.core.MultivaluedMap;
import javax.ws.rs.core.Response;
+import javax.ws.rs.core.Response.Status;
import javax.ws.rs.core.StreamingOutput;
import javax.ws.rs.core.UriInfo;
@@ -50,6 +52,7 @@ import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.hadoop.security.AccessControlException;
+import org.apache.oozie.ambari.view.assets.AssetResource;
import org.apache.oozie.ambari.view.workflowmanager.WorkflowManagerService;
import org.apache.oozie.ambari.view.workflowmanager.WorkflowsManagerResource;
import org.slf4j.Logger;
@@ -63,639 +66,449 @@ import com.google.inject.Singleton;
*/
@Singleton
public class OozieProxyImpersonator {
- private final static Logger LOGGER = LoggerFactory
- .getLogger(OozieProxyImpersonator.class);
-
- private static final String OOZIEPARAM_PREFIX = "oozieparam.";
- private static final int OOZIEPARAM_PREFIX_LENGTH = OOZIEPARAM_PREFIX
- .length();
- private static final String EQUAL_SYMBOL = "=";
- private static final String OOZIE_WF_RERUN_FAILNODES_CONF_KEY = "oozie.wf.rerun.failnodes";
- private static final String OOZIE_USE_SYSTEM_LIBPATH_CONF_KEY = "oozie.use.system.libpath";
-
- private ViewContext viewContext;
-
- private static final String USER_NAME_HEADER = "user.name";
- private static final String USER_OOZIE_SUPER = "oozie";
- private static final String DO_AS_HEADER = "doAs";
-
- private static final String SERVICE_URI_PROP = "oozie.service.uri";
- private static final String DEFAULT_SERVICE_URI = "http://sandbox.hortonworks.com:11000/oozie";
- private Utils utils = new Utils();
- private AmbariIOUtil ambariIOUtil;
- private OozieUtils oozieUtils = new OozieUtils();
- private HDFSFileUtils hdfsFileUtils;
- private WorkflowFilesService workflowFilesService;
- //private WorkflowManagerService workflowManagerService;
-
- private static enum ErrorCodes {
- OOZIE_SUBMIT_ERROR("error.oozie.submit", "Oozie Submit error"), OOZIE_IO_ERROR(
- "error.oozie.io", "Oozie I/O error"), FILE_ACCESS_ACL_ERROR(
- "error.file.access.control",
- "Access Error to file due to access control"), FILE_ACCESS_UNKNOWN_ERROR(
- "error.file.access", "Error accessing file"), WORKFLOW_PATH_EXISTS(
- "error.workflow.path.exists", "Worfklow path exists");
- private String errorCode;
- private String description;
-
- ErrorCodes(String errorCode, String description) {
- this.errorCode = errorCode;
- this.description = description;
- }
-
- public String getErrorCode() {
- return errorCode;
- }
-
- public String getDescription() {
- return description;
- }
- }
-
- @Inject
- public OozieProxyImpersonator(ViewContext viewContext) {
- this.viewContext = viewContext;
- hdfsFileUtils = new HDFSFileUtils(viewContext);
- workflowFilesService = new WorkflowFilesService(hdfsFileUtils);
- ambariIOUtil=new AmbariIOUtil(viewContext);
- //workflowManagerService = new WorkflowManagerService(viewContext);
- LOGGER.info(String.format(
- "OozieProxyImpersonator initialized for instance: %s",
- viewContext.getInstanceName()));
-
- }
-
- @Path("/fileServices")
- public FileServices fileServices() {
- return new FileServices(viewContext);
- }
-
- @Path("/wfprojects")
- public WorkflowsManagerResource workflowsManagerResource() {
- return new WorkflowsManagerResource(viewContext);
+ private final static Logger LOGGER = LoggerFactory
+ .getLogger(OozieProxyImpersonator.class);
+
+ private ViewContext viewContext;
+ private final Utils utils = new Utils();
+
+
+ private final HDFSFileUtils hdfsFileUtils;
+ private final WorkflowFilesService workflowFilesService;
+ private WorkflowManagerService workflowManagerService;
+ private static final boolean PROJ_MANAGER_ENABLED = false;
+ private final OozieDelegate oozieDelegate;
+ private final OozieUtils oozieUtils = new OozieUtils();
+ private final AssetResource assetResource;
+ private final AmbariIOUtil ambariIOUtil;
+ private static enum ErrorCodes {
+ OOZIE_SUBMIT_ERROR("error.oozie.submit", "Oozie Submit error"), OOZIE_IO_ERROR(
+ "error.oozie.io", "Oozie I/O error"), FILE_ACCESS_ACL_ERROR(
+ "error.file.access.control",
+ "Access Error to file due to access control"), FILE_ACCESS_UNKNOWN_ERROR(
+ "error.file.access", "Error accessing file"), WORKFLOW_PATH_EXISTS(
+ "error.workflow.path.exists", "Worfklow path exists");
+ private String errorCode;
+ private String description;
+
+ ErrorCodes(String errorCode, String description) {
+ this.errorCode = errorCode;
+ this.description = description;
+ }
+
+ public String getErrorCode() {
+ return errorCode;
+ }
+
+ public String getDescription() {
+ return description;
+ }
+ }
+
+ @Inject
+ public OozieProxyImpersonator(ViewContext viewContext) {
+ this.viewContext = viewContext;
+ hdfsFileUtils = new HDFSFileUtils(viewContext);
+ workflowFilesService = new WorkflowFilesService(hdfsFileUtils);
+ this.oozieDelegate = new OozieDelegate(viewContext);
+ assetResource = new AssetResource(viewContext);
+ if (PROJ_MANAGER_ENABLED) {
+ workflowManagerService = new WorkflowManagerService(viewContext);
+ }
+ ambariIOUtil=new AmbariIOUtil(viewContext);
+
+ LOGGER.info(String.format(
+ "OozieProxyImpersonator initialized for instance: %s",
+ viewContext.getInstanceName()));
+
+ }
+
+ @Path("/fileServices")
+ public FileServices fileServices() {
+ return new FileServices(viewContext);
+ }
+
+ @Path("/wfprojects")
+ public WorkflowsManagerResource workflowsManagerResource() {
+ return new WorkflowsManagerResource(viewContext);
+ }
+
+ @Path("/assets")
+ public AssetResource assetResource() {
+ return this.assetResource;
+ }
+
+ @GET
+ @Path("/getCurrentUserName")
+ public Response getCurrentUserName() {
+ return Response.ok(viewContext.getUsername()).build();
+ }
+
+ @POST
+ @Path("/submitJob")
+ @Consumes({MediaType.TEXT_PLAIN + "," + MediaType.TEXT_XML})
+ public Response submitJob(String postBody, @Context HttpHeaders headers,
+ @Context UriInfo ui, @QueryParam("app.path") String appPath,
+ @QueryParam("projectId") String projectId,
+ @DefaultValue("false") @QueryParam("overwrite") Boolean overwrite,
+ @QueryParam("description") String description,
+ @QueryParam("jobType") String jobType) {
+ LOGGER.info("submit workflow job called");
+ return submitJobInternal(postBody, headers, ui, appPath, overwrite,
+ JobType.valueOf(jobType), projectId, description);
+ }
+
+ @POST
+ @Path("/saveWorkflow")
+ @Consumes({MediaType.TEXT_PLAIN + "," + MediaType.TEXT_XML})
+ public Response saveWorkflow(String postBody, @Context HttpHeaders headers,
+ @Context UriInfo ui, @QueryParam("app.path") String appPath, @QueryParam("description") String description,
+ @QueryParam("projectId") String projectId, @QueryParam("jobType") String jobType,
+ @DefaultValue("false") @QueryParam("overwrite") Boolean overwrite) {
+ LOGGER.info("save workflow called");
+ if (StringUtils.isEmpty(appPath)) {
+ throw new RuntimeException("app path can't be empty.");
+ }
+ appPath = appPath.trim();
+ if (!overwrite) {
+ boolean fileExists = hdfsFileUtils.fileExists(appPath);
+ if (fileExists) {
+ return getFileExistsResponse();
+ }
+ }
+ postBody = utils.formatXml(postBody);
+ try {
+ String filePath = workflowFilesService.createWorkflowFile(appPath,
+ postBody, overwrite);
+ LOGGER.info(String.format(
+ "submit workflow job done. filePath=[%s]", filePath));
+ if (PROJ_MANAGER_ENABLED) {
+ JobType deducedJobType = oozieUtils.deduceJobType(postBody);
+ String workflowName = oozieUtils.deduceWorkflowNameFromXml(postBody);
+ workflowManagerService.saveWorkflow(projectId, appPath,
+ deducedJobType, description,
+ viewContext.getUsername(), workflowName);
+ }
+
+ return Response.ok().build();
+ } catch (Exception ex) {
+ LOGGER.error(ex.getMessage(), ex);
+ return getRespCodeForException(ex);
+
+ }
+ }
+
+ @POST
+ @Path("/saveWorkflowDraft")
+ @Consumes({MediaType.TEXT_PLAIN + "," + MediaType.TEXT_XML})
+ public Response saveDraft(String postBody, @Context HttpHeaders headers,
+ @Context UriInfo ui, @QueryParam("app.path") String appPath,
+ @QueryParam("projectId") String projectId, @QueryParam("description") String description,
+ @DefaultValue("false") @QueryParam("overwrite") Boolean overwrite, @QueryParam("jobType") String jobTypeStr)
+ throws IOException {
+ LOGGER.info("save workflow called");
+ if (StringUtils.isEmpty(appPath)) {
+ throw new RuntimeException("app path can't be empty.");
+ }
+ appPath = appPath.trim();
+ workflowFilesService.saveDraft(appPath, postBody, overwrite);
+ if (PROJ_MANAGER_ENABLED) {
+ JobType jobType = StringUtils.isEmpty(jobTypeStr) ? JobType.WORKFLOW : JobType.valueOf(jobTypeStr);
+ String name = oozieUtils.deduceWorkflowNameFromJson(postBody);
+ workflowManagerService.saveWorkflow(projectId, appPath,
+ jobType, description,
+ viewContext.getUsername(), name);
+ }
+ return Response.ok().build();
+ }
+
+ @POST
+ @Path("/publishAsset")
+ @Consumes({MediaType.TEXT_PLAIN + "," + MediaType.TEXT_XML})
+ public Response publishAsset(String postBody, @Context HttpHeaders headers,
+ @Context UriInfo ui, @QueryParam("uploadPath") String uploadPath,
+ @DefaultValue("false") @QueryParam("overwrite") Boolean overwrite) {
+ LOGGER.info("publish asset called");
+ if (StringUtils.isEmpty(uploadPath)) {
+ throw new RuntimeException("upload path can't be empty.");
+ }
+ uploadPath = uploadPath.trim();
+ Map<String, String> validateAsset = assetResource.validateAsset(headers, postBody,
+ ui.getQueryParameters());
+ if (!STATUS_OK.equals(validateAsset.get(STATUS_KEY))) {
+ return Response.status(Status.BAD_REQUEST).entity(
+ validateAsset.get(MESSAGE_KEY)).build();
+ }
+ return saveAsset(postBody, uploadPath, overwrite);
+ }
+
+ private Response saveAsset(String postBody, String uploadPath,
+ Boolean overwrite) {
+ uploadPath = workflowFilesService.getAssetFileName(uploadPath);
+ if (!overwrite) {
+ boolean fileExists = hdfsFileUtils.fileExists(uploadPath);
+ if (fileExists) {
+ return getFileExistsResponse();
+ }
+ }
+ postBody = utils.formatXml(postBody);
+ try {
+ String filePath = workflowFilesService.createAssetFile(uploadPath,
+ postBody, overwrite);
+ LOGGER.info(String.format("publish asset job done. filePath=[%s]",
+ filePath));
+ return Response.ok().build();
+ } catch (Exception ex) {
+ LOGGER.error(ex.getMessage(), ex);
+ return getRespCodeForException(ex);
+ }
+ }
+
+
+ @GET
+ @Path("/readWorkflowDraft")
+ public Response readDraft(@QueryParam("workflowXmlPath") String workflowPath) {
+ if (StringUtils.isEmpty(workflowPath)) {
+ throw new RuntimeException("workflowXmlPath can't be empty.");
+ }
+ try {
+ final InputStream is = workflowFilesService.readDraft(workflowPath);
+ StreamingOutput streamer = new StreamingOutput() {
+ @Override
+ public void write(OutputStream os) throws IOException,
+ WebApplicationException {
+ IOUtils.copy(is, os);
+ is.close();
+ os.close();
+ }
+ };
+ return Response.ok(streamer).status(200).build();
+ } catch (IOException e) {
+ return getRespCodeForException(e);
+ }
+ }
+
+ @POST
+ @Path("/discardWorkflowDraft")
+ public Response discardDraft(
+ @QueryParam("workflowXmlPath") String workflowPath)
+ throws IOException {
+ workflowFilesService.discardDraft(workflowPath);
+ return Response.ok().build();
+ }
+
+ private Response submitJobInternal(String postBody, HttpHeaders headers,
+ UriInfo ui, String appPath, Boolean overwrite, JobType jobType,
+ String projectId, String description) {
+ if (StringUtils.isEmpty(appPath)) {
+ throw new RuntimeException("app path can't be empty.");
+ }
+ appPath = appPath.trim();
+ if (!overwrite) {
+ boolean fileExists = hdfsFileUtils.fileExists(appPath);
+ if (fileExists) {
+ return getFileExistsResponse();
+ }
+ }
+ postBody = utils.formatXml(postBody);
+ try {
+ String filePath = hdfsFileUtils.writeToFile(appPath, postBody,
+ overwrite);
+ LOGGER.info(String.format(
+ "submit workflow job done. filePath=[%s]", filePath));
+ } catch (Exception ex) {
+ LOGGER.error(ex.getMessage(), ex);
+ return getRespCodeForException(ex);
+
+ }
+ if (PROJ_MANAGER_ENABLED) {
+ String name = oozieUtils.deduceWorkflowNameFromXml(postBody);
+ workflowManagerService.saveWorkflow(projectId, appPath, jobType,
+ "todo description", viewContext.getUsername(), name);
+ }
+
+ String response = oozieDelegate.submitWorkflowJobToOozie(headers,
+ appPath, ui.getQueryParameters(), jobType);
+ if (response != null && response.trim().startsWith("{")) {
+ // dealing with oozie giving error but with 200 response.
+ return Response.status(Response.Status.OK).entity(response).build();
+ } else {
+ HashMap<String, String> resp = new HashMap<String, String>();
+ resp.put("status", ErrorCodes.OOZIE_SUBMIT_ERROR.getErrorCode());
+ resp.put("message", response);
+ return Response.status(Response.Status.BAD_REQUEST).entity(resp)
+ .build();
+ }
+
+ }
+
+ private Response getRespCodeForException(Exception ex) {
+ if (ex instanceof AccessControlException) {
+ HashMap<String, String> errorDetails = getErrorDetails(
+ ErrorCodes.FILE_ACCESS_ACL_ERROR.getErrorCode(),
+ ErrorCodes.FILE_ACCESS_ACL_ERROR.getDescription(), ex);
+ return Response.status(Response.Status.BAD_REQUEST)
+ .entity(errorDetails).build();
+ } else if (ex instanceof IOException) {
+ HashMap<String, String> errorDetails = getErrorDetails(
+ ErrorCodes.FILE_ACCESS_UNKNOWN_ERROR.getErrorCode(),
+ ErrorCodes.FILE_ACCESS_UNKNOWN_ERROR.getDescription(), ex);
+ return Response.status(Response.Status.INTERNAL_SERVER_ERROR)
+ .entity(errorDetails).build();
+ } else {
+ HashMap<String, String> errorDetails = getErrorDetails(
+ ErrorCodes.FILE_ACCESS_UNKNOWN_ERROR.getErrorCode(),
+ ErrorCodes.FILE_ACCESS_UNKNOWN_ERROR.getDescription(), ex);
+ return Response.status(Response.Status.INTERNAL_SERVER_ERROR)
+ .entity(errorDetails).build();
+ }
+
+ }
+
+ private Response getFileExistsResponse() {
+ HashMap<String, String> resp = new HashMap<String, String>();
+ resp.put("status", ErrorCodes.WORKFLOW_PATH_EXISTS.getErrorCode());
+ resp.put("message", ErrorCodes.WORKFLOW_PATH_EXISTS.getDescription());
+ return Response.status(Response.Status.BAD_REQUEST).entity(resp)
+ .build();
+ }
+
+ @GET
+ @Path("/readWorkflowDetail")
+ public Response isDraftAvailable(
+ @QueryParam("workflowXmlPath") String workflowPath) {
+ WorkflowFileInfo workflowDetails = workflowFilesService
+ .getWorkflowDetails(workflowPath);
+ return Response.ok(workflowDetails).build();
+ }
+
+ @GET
+ @Path("/readWorkflowXml")
+ public Response readWorkflowXxml(
+ @QueryParam("workflowXmlPath") String workflowPath) {
+ if (StringUtils.isEmpty(workflowPath)) {
+ throw new RuntimeException("workflowXmlPath can't be empty.");
+ }
+ try {
+ final InputStream is = workflowFilesService
+ .readWorkflowXml(workflowPath);
+ StreamingOutput streamer = new StreamingOutput() {
+ @Override
+ public void write(OutputStream os) throws IOException,
+ WebApplicationException {
+ IOUtils.copy(is, os);
+ is.close();
+ os.close();
+ }
+ };
+ return Response.ok(streamer).status(200).build();
+ } catch (IOException e) {
+ return getRespCodeForException(e);
+ }
+ }
+
+ private HashMap<String, String> getErrorDetails(String status,
+ String message, Exception ex) {
+ HashMap<String, String> resp = new HashMap<String, String>();
+ resp.put("status", status);
+ if (message != null) {
+ resp.put("message", message);
+ }
+ if (ex != null) {
+ resp.put("stackTrace", ExceptionUtils.getFullStackTrace(ex));
+ }
+ return resp;
+ }
+
+ @GET
+ @Path("/getDag")
+ @Produces("image/png")
+ public Response getDag(@Context HttpHeaders headers,
+ @Context UriInfo ui, @QueryParam("jobid") String jobid) {
+ Map<String, String> newHeaders = utils.getHeaders(headers);
+ final InputStream is = oozieDelegate.readFromOozie(headers,
+ oozieDelegate.getDagUrl(jobid), HttpMethod.GET, null,
+ newHeaders);
+ StreamingOutput streamer = new StreamingOutput() {
+ @Override
+ public void write(OutputStream os) throws IOException,
+ WebApplicationException {
+ IOUtils.copy(is, os);
+ is.close();
+ os.close();
+ }
+
+ };
+ return Response.ok(streamer).status(200).build();
+ }
+
+ @GET
+ @Path("/{path: .*}")
+ public Response handleGet(@Context HttpHeaders headers, @Context UriInfo ui) {
+ try {
+ return oozieDelegate.consumeService(headers, ui.getAbsolutePath()
+ .getPath(), ui.getQueryParameters(), HttpMethod.GET, null);
+ } catch (Exception ex) {
+ LOGGER.error("Error in GET proxy", ex);
+ return Response.status(Response.Status.INTERNAL_SERVER_ERROR)
+ .entity(getErrorDetailsForException("Oozie", ex)).build();
+ }
+ }
+
+ @POST
+ @Path("/{path: .*}")
+ public Response handlePost(String xml, @Context HttpHeaders headers,
+ @Context UriInfo ui) {
+ try {
+
+ return oozieDelegate.consumeService(headers, ui.getAbsolutePath()
+ .getPath(), ui.getQueryParameters(), HttpMethod.POST, xml);
+ } catch (Exception ex) {
+ LOGGER.error("Error in POST proxy", ex);
+ return Response.status(Response.Status.INTERNAL_SERVER_ERROR)
+ .entity(getErrorDetailsForException("Oozie", ex)).build();
+ }
+ }
+
+ @DELETE
+ @Path("/{path: .*}")
+ public Response handleDelete(@Context HttpHeaders headers,
+ @Context UriInfo ui) {
+ try {
+ return oozieDelegate.consumeService(headers, ui.getAbsolutePath()
+ .getPath(), ui.getQueryParameters(), HttpMethod.POST, null);
+ } catch (Exception ex) {
+ LOGGER.error("Error in DELETE proxy", ex);
+ return Response.status(Response.Status.INTERNAL_SERVER_ERROR)
+ .entity(getErrorDetailsForException("Oozie", ex)).build();
+ }
+ }
+
+ @PUT
+ @Path("/{path: .*}")
+ public Response handlePut(String body, @Context HttpHeaders headers,
+ @Context UriInfo ui) {
+ try {
+ return oozieDelegate.consumeService(headers, ui.getAbsolutePath()
+ .getPath(), ui.getQueryParameters(), HttpMethod.PUT, body);
+ } catch (Exception ex) {
+ LOGGER.error("Error in PUT proxy", ex);
+ return Response.status(Response.Status.INTERNAL_SERVER_ERROR)
+ .entity(getErrorDetailsForException("Oozie", ex)).build();
+ }
+ }
+
+ private Map<String, String> getErrorDetailsForException(String component,
+ Exception ex) {
+ String errorCode = component + "exception";
+ String errorMessage = component + " Exception";
+ if (ex instanceof RuntimeException) {
+ Throwable cause = ex.getCause();
+ if (cause instanceof IOException) {
+ errorCode = component + "io.exception";
+ errorMessage = component + "IO Exception";
+ }
}
-
- @GET
- @Path("/getCurrentUserName")
- public Response getCurrentUserName() {
- return Response.ok(viewContext.getUsername()).build();
- }
-
- @POST
- @Path("/submitJob")
- @Consumes({ MediaType.TEXT_PLAIN + "," + MediaType.TEXT_XML })
- public Response submitJob(String postBody, @Context HttpHeaders headers,
- @Context UriInfo ui, @QueryParam("app.path") String appPath,
- @DefaultValue("false") @QueryParam("overwrite") Boolean overwrite,
- @QueryParam("jobType") String jobType) {
- LOGGER.info("submit workflow job called");
- return submitJobInternal(postBody, headers, ui, appPath, overwrite,
- JobType.valueOf(jobType));
- }
-
- @POST
- @Path("/submitWorkflow")
- @Consumes({ MediaType.TEXT_PLAIN + "," + MediaType.TEXT_XML })
- public Response submitWorkflow(String postBody,
- @Context HttpHeaders headers, @Context UriInfo ui,
- @QueryParam("app.path") String appPath,
- @DefaultValue("false") @QueryParam("overwrite") Boolean overwrite) {
- LOGGER.info("submit workflow job called");
- return submitJobInternal(postBody, headers, ui, appPath, overwrite,
- JobType.WORKFLOW);
- }
-
- @POST
- @Path("/saveWorkflow")
- @Consumes({ MediaType.TEXT_PLAIN + "," + MediaType.TEXT_XML })
- public Response saveWorkflow(String postBody, @Context HttpHeaders headers,
- @Context UriInfo ui, @QueryParam("app.path") String appPath,
- @DefaultValue("false") @QueryParam("overwrite") Boolean overwrite) {
- LOGGER.info("save workflow called");
- if (StringUtils.isEmpty(appPath)) {
- throw new RuntimeException("app path can't be empty.");
- }
- appPath = appPath.trim();
- if (!overwrite) {
- boolean fileExists = hdfsFileUtils.fileExists(appPath);
- if (fileExists) {
- return getFileExistsResponse();
- }
- }
- postBody = utils.formatXml(postBody);
- try {
- String filePath = workflowFilesService.createWorkflowFile(appPath,
- postBody, overwrite);
- LOGGER.info(String.format(
- "submit workflow job done. filePath=[%s]", filePath));
- /* workflowManagerService.saveWorkflow(appPath, JobType.WORKFLOW,
- "todo description", viewContext.getUsername());*/
- return Response.ok().build();
- } catch (Exception ex) {
- LOGGER.error(ex.getMessage(), ex);
- return getRespCodeForException(ex);
-
- }
- }
-
- @POST
- @Path("/publishAsset")
- @Consumes({ MediaType.TEXT_PLAIN + "," + MediaType.TEXT_XML })
- public Response publishAsset(String postBody, @Context HttpHeaders headers,
- @Context UriInfo ui, @QueryParam("uploadPath") String uploadPath,
- @DefaultValue("false") @QueryParam("overwrite") Boolean overwrite) {
- LOGGER.info("publish asset called");
- if (StringUtils.isEmpty(uploadPath)) {
- throw new RuntimeException("upload path can't be empty.");
- }
- uploadPath = uploadPath.trim();
- Response dryRunResponse = validateAsset(headers, postBody, ui.getQueryParameters());
- if (dryRunResponse.getStatus() == 200) {
- return saveAsset(postBody, uploadPath, overwrite);
- }
- return dryRunResponse;
- }
-
- private Response validateAsset(HttpHeaders headers, String postBody, MultivaluedMap<String, String> queryParams) {
- String workflowXml = oozieUtils.generateWorkflowXml(postBody);
- try {
- String tempWfPath = "/user/"+viewContext.getUsername()+"/tmpooziewfs/tempwf.xml";
- hdfsFileUtils.writeToFile(tempWfPath, workflowXml, true);
- queryParams.put("oozieparam.action",getAsList("dryrun"));
- queryParams.put("oozieconfig.rerunOnFailure",getAsList("false"));
- queryParams.put("oozieconfig.useSystemLibPath",getAsList("true"));
- queryParams.put("resourceManager",getAsList("useDefault"));
- String dryRunResp = submitWorkflowJobToOozie(headers,tempWfPath,queryParams,JobType.WORKFLOW);
- LOGGER.info(String.format("resp from validating asset=[%s]",dryRunResp));
- if (dryRunResp != null && dryRunResp.trim().startsWith("{")) {
- return Response.status(Response.Status.OK).entity(dryRunResp).build();
- } else {
- HashMap<String, String> resp = new HashMap<String, String>();
- resp.put("status", ErrorCodes.OOZIE_SUBMIT_ERROR.getErrorCode());
- resp.put("message", dryRunResp);
- //resp.put("stackTrace", dryRunResp);
- return Response.status(Response.Status.BAD_REQUEST).entity(resp).build();
- }
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
-
- private List<String> getAsList(String string) {
- ArrayList<String> li=new ArrayList<>(1);
- li.add(string);
- return li;
- }
-
- private Response saveAsset(String postBody, String uploadPath,
- Boolean overwrite) {
- uploadPath = workflowFilesService.getAssetFileName(uploadPath);
- if (!overwrite) {
- boolean fileExists = hdfsFileUtils.fileExists(uploadPath);
- if (fileExists) {
- return getFileExistsResponse();
- }
- }
- postBody = utils.formatXml(postBody);
- try {
- String filePath = workflowFilesService.createAssetFile(uploadPath,
- postBody, overwrite);
- LOGGER.info(String.format(
- "publish asset job done. filePath=[%s]", filePath));
- return Response.ok().build();
- } catch (Exception ex) {
- LOGGER.error(ex.getMessage(), ex);
- return getRespCodeForException(ex);
- }
- }
-
- @POST
- @Path("/saveWorkflowDraft")
- @Consumes({ MediaType.TEXT_PLAIN + "," + MediaType.TEXT_XML })
- public Response saveDraft(String postBody, @Context HttpHeaders headers,
- @Context UriInfo ui, @QueryParam("app.path") String appPath,
- @DefaultValue("false") @QueryParam("overwrite") Boolean overwrite)
- throws IOException {
- LOGGER.info("save workflow called");
- if (StringUtils.isEmpty(appPath)) {
- throw new RuntimeException("app path can't be empty.");
- }
- appPath = appPath.trim();
- workflowFilesService.saveDraft(appPath, postBody, overwrite);
- /* workflowManagerService.saveWorkflow(appPath, JobType.WORKFLOW,
- "todo description", viewContext.getUsername());*/
- return Response.ok().build();
- }
-
- @GET
- @Path("/readWorkflowDraft")
- public Response readDraft(@QueryParam("workflowXmlPath") String workflowPath) {
- if (StringUtils.isEmpty(workflowPath)) {
- throw new RuntimeException("workflowXmlPath can't be empty.");
- }
- try {
- final InputStream is = workflowFilesService.readDraft(workflowPath);
- StreamingOutput streamer = new StreamingOutput() {
- @Override
- public void write(OutputStream os) throws IOException,
- WebApplicationException {
- IOUtils.copy(is, os);
- is.close();
- os.close();
- }
- };
- return Response.ok(streamer).status(200).build();
- } catch (IOException e) {
- return getRespCodeForException(e);
- }
- }
-
- @POST
- @Path("/discardWorkflowDraft")
- public Response discardDraft(@QueryParam("workflowXmlPath") String workflowPath) throws IOException{
- workflowFilesService.discardDraft(workflowPath);
- return Response.ok().build();
- }
-
- private Response submitJobInternal(String postBody, HttpHeaders headers,
- UriInfo ui, String appPath, Boolean overwrite, JobType jobType) {
- if (StringUtils.isEmpty(appPath)) {
- throw new RuntimeException("app path can't be empty.");
- }
- appPath = appPath.trim();
- if (!overwrite) {
- boolean fileExists = hdfsFileUtils.fileExists(appPath);
- if (fileExists) {
- return getFileExistsResponse();
- }
- }
- postBody = utils.formatXml(postBody);
- try {
- String filePath = hdfsFileUtils.writeToFile(appPath, postBody,
- overwrite);
- LOGGER.info(String.format(
- "submit workflow job done. filePath=[%s]", filePath));
- } catch (Exception ex) {
- LOGGER.error(ex.getMessage(), ex);
- return getRespCodeForException(ex);
-
- }
- /* workflowManagerService.saveWorkflow(appPath, jobType,
- "todo description", viewContext.getUsername());*/
- String response = submitWorkflowJobToOozie(headers, appPath,
- ui.getQueryParameters(), jobType);
- if (response != null && response.trim().startsWith("{")) {
- // dealing with oozie giving error but with 200 response.
- return Response.status(Response.Status.OK).entity(response).build();
- } else {
- HashMap<String, String> resp = new HashMap<String, String>();
- resp.put("status", ErrorCodes.OOZIE_SUBMIT_ERROR.getErrorCode());
- resp.put("message", response);
- return Response.status(Response.Status.BAD_REQUEST).entity(resp)
- .build();
- }
-
- }
-
- private Response getRespCodeForException(Exception ex) {
- if (ex instanceof AccessControlException) {
- HashMap<String, String> errorDetails = getErrorDetails(
- ErrorCodes.FILE_ACCESS_ACL_ERROR.getErrorCode(),
- ErrorCodes.FILE_ACCESS_ACL_ERROR.getDescription(), ex);
- return Response.status(Response.Status.BAD_REQUEST)
- .entity(errorDetails).build();
- } else if (ex instanceof IOException) {
- HashMap<String, String> errorDetails = getErrorDetails(
- ErrorCodes.FILE_ACCESS_UNKNOWN_ERROR.getErrorCode(),
- ErrorCodes.FILE_ACCESS_UNKNOWN_ERROR.getDescription(), ex);
- return Response.status(Response.Status.INTERNAL_SERVER_ERROR)
- .entity(errorDetails).build();
- } else {
- HashMap<String, String> errorDetails = getErrorDetails(
- ErrorCodes.FILE_ACCESS_UNKNOWN_ERROR.getErrorCode(),
- ErrorCodes.FILE_ACCESS_UNKNOWN_ERROR.getDescription(), ex);
- return Response.status(Response.Status.INTERNAL_SERVER_ERROR)
- .entity(errorDetails).build();
- }
-
- }
-
- private Response getFileExistsResponse() {
- HashMap<String, String> resp = new HashMap<String, String>();
- resp.put("status", ErrorCodes.WORKFLOW_PATH_EXISTS.getErrorCode());
- resp.put("message", ErrorCodes.WORKFLOW_PATH_EXISTS.getDescription());
- return Response.status(Response.Status.BAD_REQUEST).entity(resp)
- .build();
- }
-
- @GET
- @Path("/readWorkflowDetail")
- public Response isDraftAvailable(@QueryParam("workflowXmlPath") String workflowPath){
- WorkflowFileInfo workflowDetails = workflowFilesService.getWorkflowDetails(workflowPath);
- return Response.ok(workflowDetails).build();
- }
-
- @GET
- @Path("/readWorkflowXml")
- public Response readWorkflowXxml(
- @QueryParam("workflowXmlPath") String workflowPath) {
- if (StringUtils.isEmpty(workflowPath)) {
- throw new RuntimeException("workflowXmlPath can't be empty.");
- }
- try {
- final InputStream is = workflowFilesService.readWorkflowXml(workflowPath);
- StreamingOutput streamer = new StreamingOutput() {
- @Override
- public void write(OutputStream os) throws IOException,
- WebApplicationException {
- IOUtils.copy(is, os);
- is.close();
- os.close();
- }
- };
- return Response.ok(streamer).status(200).build();
- } catch (IOException e) {
- return getRespCodeForException(e);
- }
- }
-
- private HashMap<String, String> getErrorDetails(String status,
- String message, Exception ex) {
- HashMap<String, String> resp = new HashMap<String, String>();
- resp.put("status", status);
- if (message != null) {
- resp.put("message", message);
- }
- if (ex != null) {
- resp.put("stackTrace", ExceptionUtils.getFullStackTrace(ex));
- }
- return resp;
- }
-
- @GET
- @Path("/getDag")
- @Produces("image/png")
- public Response submitWorkflow(@Context HttpHeaders headers,
- @Context UriInfo ui, @QueryParam("jobid") String jobid) {
- String imgUrl = getServiceUri() + "/v2/job/" + jobid + "?show=graph";
- Map<String, String> newHeaders = utils.getHeaders(headers);
- final InputStream is = readFromOozie(headers, imgUrl, HttpMethod.GET,
- null, newHeaders);
- StreamingOutput streamer = new StreamingOutput() {
-
- @Override
- public void write(OutputStream os) throws IOException,
- WebApplicationException {
- IOUtils.copy(is, os);
- is.close();
- os.close();
- }
-
- };
- return Response.ok(streamer).status(200).build();
- }
-
- @GET
- @Path("/{path: .*}")
- public Response handleGet(@Context HttpHeaders headers, @Context UriInfo ui) {
- try {
- String serviceURI = buildURI(ui);
- return consumeService(headers, serviceURI, HttpMethod.GET, null);
- } catch (Exception ex) {
- LOGGER.error("Error in GET proxy", ex);
- return Response.status(Response.Status.INTERNAL_SERVER_ERROR)
- .entity(getErrorDetailsForException("Oozie", ex)).build();
- }
- }
-
- @POST
- @Path("/{path: .*}")
- public Response handlePost(String xml, @Context HttpHeaders headers,
- @Context UriInfo ui) {
- try {
- String serviceURI = buildURI(ui);
- return consumeService(headers, serviceURI, HttpMethod.POST, xml);
- } catch (Exception ex) {
- LOGGER.error("Error in POST proxy", ex);
- return Response.status(Response.Status.INTERNAL_SERVER_ERROR)
- .entity(getErrorDetailsForException("Oozie", ex)).build();
- }
- }
-
- @DELETE
- @Path("/{path: .*}")
- public Response handleDelete(@Context HttpHeaders headers,
- @Context UriInfo ui) {
- try {
- String serviceURI = buildURI(ui);
- return consumeService(headers, serviceURI, HttpMethod.POST, null);
- } catch (Exception ex) {
- LOGGER.error("Error in DELETE proxy", ex);
- return Response.status(Response.Status.INTERNAL_SERVER_ERROR)
- .entity(getErrorDetailsForException("Oozie", ex)).build();
- }
- }
-
- @PUT
- @Path("/{path: .*}")
- public Response handlePut(String body, @Context HttpHeaders headers,
- @Context UriInfo ui) {
- try {
- String serviceURI = buildURI(ui);
- return consumeService(headers, serviceURI, HttpMethod.PUT, body);
- } catch (Exception ex) {
- LOGGER.error("Error in PUT proxy", ex);
- return Response.status(Response.Status.INTERNAL_SERVER_ERROR)
- .entity(getErrorDetailsForException("Oozie", ex)).build();
- }
- }
-
- private Map<String, String> getErrorDetailsForException(String component,
- Exception ex) {
- String errorCode = component + "exception";
- String errorMessage = component + " Exception";
- if (ex instanceof RuntimeException) {
- Throwable cause = ex.getCause();
- if (cause instanceof IOException) {
- errorCode = component + "io.exception";
- errorMessage = component + "IO Exception";
- }
- }
- return getErrorDetails(errorCode, errorMessage, ex);
- }
-
- private String submitWorkflowJobToOozie(HttpHeaders headers,
- String filePath, MultivaluedMap<String, String> queryParams,
- JobType jobType) {
- String nameNode = "hdfs://"
- + viewContext.getCluster().getConfigurationValue("hdfs-site",
- "dfs.namenode.rpc-address");
-
- if (!queryParams.containsKey("config.nameNode")) {
- ArrayList<String> nameNodes = new ArrayList<String>();
- LOGGER.info("Namenode===" + nameNode);
- nameNodes.add(nameNode);
- queryParams.put("config.nameNode", nameNodes);
- }
-
- Map<String, String> workflowConigs = getWorkflowConfigs(filePath,
- queryParams, jobType, nameNode);
- String configXMl = oozieUtils.generateConfigXml(workflowConigs);
- LOGGER.info("Config xml==" + configXMl);
- HashMap<String, String> customHeaders = new HashMap<String, String>();
- customHeaders.put("Content-Type", "application/xml;charset=UTF-8");
- Response serviceResponse = consumeService(headers, getServiceUri()
- + "/v2/jobs?" + getJobSumbitOozieParams(queryParams),
- HttpMethod.POST, configXMl, customHeaders);
-
- LOGGER.info("Resp from oozie status entity=="
- + serviceResponse.getEntity());
- if (serviceResponse.getEntity() instanceof String) {
- return (String) serviceResponse.getEntity();
- } else {
- return "success";
- }
-
- }
-
- private Map<String, String> getWorkflowConfigs(String filePath,
- MultivaluedMap<String, String> queryParams, JobType jobType,
- String nameNode) {
- HashMap<String, String> workflowConigs = new HashMap<String, String>();
- if (queryParams.containsKey("resourceManager")
- && "useDefault".equals(queryParams.getFirst("resourceManager"))) {
- String jobTrackerNode = viewContext.getCluster()
- .getConfigurationValue("yarn-site",
- "yarn.resourcemanager.address");
- LOGGER.info("jobTrackerNode===" + jobTrackerNode);
- workflowConigs.put("resourceManager", jobTrackerNode);
- workflowConigs.put("jobTracker", jobTrackerNode);
- }
- if (queryParams != null) {
- for (Map.Entry<String, List<String>> entry : queryParams.entrySet()) {
- if (entry.getKey().startsWith("config.")) {
- if (entry.getValue() != null && entry.getValue().size() > 0) {
- workflowConigs.put(entry.getKey().substring(7), entry
- .getValue().get(0));
- }
- }
- }
- }
-
- if (queryParams.containsKey("oozieconfig.useSystemLibPath")) {
- String useSystemLibPath = queryParams
- .getFirst("oozieconfig.useSystemLibPath");
- workflowConigs.put(OOZIE_USE_SYSTEM_LIBPATH_CONF_KEY,
- useSystemLibPath);
- } else {
- workflowConigs.put(OOZIE_USE_SYSTEM_LIBPATH_CONF_KEY, "true");
- }
- if (queryParams.containsKey("oozieconfig.rerunOnFailure")) {
- String rerunFailnodes = queryParams
- .getFirst("oozieconfig.rerunOnFailure");
- workflowConigs.put(OOZIE_WF_RERUN_FAILNODES_CONF_KEY,
- rerunFailnodes);
- } else {
- workflowConigs.put(OOZIE_WF_RERUN_FAILNODES_CONF_KEY, "true");
- }
- workflowConigs.put("user.name", viewContext.getUsername());
- workflowConigs.put(oozieUtils.getJobPathPropertyKey(jobType), nameNode
- + filePath);
- return workflowConigs;
- }
-
- private String getJobSumbitOozieParams(
- MultivaluedMap<String, String> queryParams) {
- StringBuilder query = new StringBuilder();
- if (queryParams != null) {
- for (Map.Entry<String, List<String>> entry : queryParams.entrySet()) {
- if (entry.getKey().startsWith(OOZIEPARAM_PREFIX)) {
- if (entry.getValue() != null && entry.getValue().size() > 0) {
- for (String val : entry.getValue()) {
- query.append(
- entry.getKey().substring(
- OOZIEPARAM_PREFIX_LENGTH))
- .append(EQUAL_SYMBOL).append(val)
- .append("&");
- }
- }
- }
- }
- }
- return query.toString();
- }
-
- private String buildURI(UriInfo ui) {
- String uiURI = ui.getAbsolutePath().getPath();
- int index = uiURI.indexOf("proxy/") + 5;
- uiURI = uiURI.substring(index);
- String serviceURI = getServiceUri();
- serviceURI += uiURI;
- MultivaluedMap<String, String> params = addOrReplaceUserName(ui
- .getQueryParameters());
- return serviceURI + utils.convertParamsToUrl(params);
- }
-
- private MultivaluedMap<String, String> addOrReplaceUserName(
- MultivaluedMap<String, String> parameters) {
- for (Map.Entry<String, List<String>> entry : parameters.entrySet()) {
- if ("user.name".equals(entry.getKey())) {
- ArrayList<String> vals = new ArrayList<String>(1);
- vals.add(viewContext.getUsername());
- entry.setValue(vals);
- }
- }
- return parameters;
- }
-
- private String getServiceUri() {
- String serviceURI = viewContext.getProperties().get(SERVICE_URI_PROP) != null ? viewContext
- .getProperties().get(SERVICE_URI_PROP) : DEFAULT_SERVICE_URI;
- return serviceURI;
- }
-
- private Response consumeService(HttpHeaders headers, String urlToRead,
- String method, String body) throws Exception {
- return consumeService(headers, urlToRead, method, body, null);
- }
-
- private Response consumeService(HttpHeaders headers, String urlToRead,
- String method, String body, Map<String, String> customHeaders) {
- Response response = null;
- InputStream stream = readFromOozie(headers, urlToRead, method, body,
- customHeaders);
- String stringResponse = null;
- try {
- stringResponse = IOUtils.toString(stream);
- } catch (IOException e) {
- LOGGER.error("Error while converting stream to string", e);
- throw new RuntimeException(e);
- }
- if (stringResponse.contains(Response.Status.BAD_REQUEST.name())) {
- response = Response.status(Response.Status.BAD_REQUEST)
- .entity(stringResponse).type(MediaType.TEXT_PLAIN).build();
- } else {
- response = Response.status(Response.Status.OK)
- .entity(stringResponse)
- .type(utils.deduceType(stringResponse)).build();
- }
- return response;
- }
-
- private InputStream readFromOozie(HttpHeaders headers, String urlToRead,
- String method, String body, Map<String, String> customHeaders) {
-
- Map<String, String> newHeaders = utils.getHeaders(headers);
- newHeaders.put(USER_NAME_HEADER, USER_OOZIE_SUPER);
-
- newHeaders.put(DO_AS_HEADER, viewContext.getUsername());
- newHeaders.put("Accept", MediaType.APPLICATION_JSON);
- if (customHeaders != null) {
- newHeaders.putAll(customHeaders);
- }
- LOGGER.info(String.format("Proxy request for url: [%s] %s", method,
- urlToRead));
-
- return ambariIOUtil.readFromUrl(urlToRead, method, body, newHeaders);
- }
+ return getErrorDetails(errorCode, errorMessage, ex);
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ambari/blob/47a98824/contrib/views/wfmanager/src/main/java/org/apache/oozie/ambari/view/OozieUtils.java
----------------------------------------------------------------------
diff --git a/contrib/views/wfmanager/src/main/java/org/apache/oozie/ambari/view/OozieUtils.java b/contrib/views/wfmanager/src/main/java/org/apache/oozie/ambari/view/OozieUtils.java
index f002102..9791c47 100644
--- a/contrib/views/wfmanager/src/main/java/org/apache/oozie/ambari/view/OozieUtils.java
+++ b/contrib/views/wfmanager/src/main/java/org/apache/oozie/ambari/view/OozieUtils.java
@@ -32,92 +32,144 @@ import org.w3c.dom.Element;
import org.xml.sax.InputSource;
import org.xml.sax.SAXException;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonParser;
+
public class OozieUtils {
- private final static Logger LOGGER = LoggerFactory
- .getLogger(OozieUtils.class);
- private Utils utils = new Utils();
-
- public String generateConfigXml(Map<String, String> map) {
- DocumentBuilderFactory dbf = DocumentBuilderFactory.newInstance();
- DocumentBuilder db;
- try {
- db = dbf.newDocumentBuilder();
- Document doc = db.newDocument();
- Element configElement = doc.createElement("configuration");
- doc.appendChild(configElement);
- for (Map.Entry<String, String> entry : map.entrySet()) {
- Element propElement = doc.createElement("property");
- configElement.appendChild(propElement);
- Element nameElem = doc.createElement("name");
- nameElem.setTextContent(entry.getKey());
- Element valueElem = doc.createElement("value");
- valueElem.setTextContent(entry.getValue());
- propElement.appendChild(nameElem);
- propElement.appendChild(valueElem);
- }
- return utils.generateXml(doc);
- } catch (ParserConfigurationException e) {
- LOGGER.error("error in generating config xml", e);
- throw new RuntimeException(e);
- }
- }
- public String getJobPathPropertyKey(JobType jobType) {
- switch (jobType) {
- case WORKFLOW:
- return "oozie.wf.application.path";
- case COORDINATOR:
- return "oozie.coord.application.path";
- case BUNDLE:
- return "oozie.bundle.application.path";
- }
- throw new RuntimeException("Unknown Job Type");
- }
-
- public String generateWorkflowXml(String actionNodeXml) {
- DocumentBuilderFactory dbf = DocumentBuilderFactory.newInstance();
- DocumentBuilder db;
- try {
- db = dbf.newDocumentBuilder();
- Document doc = db.newDocument();
-
- Element workflowElement = doc.createElement("workflow-app");
- workflowElement.setAttribute("name", "testWorkflow");
- workflowElement.setAttribute("xmlns", "uri:oozie:workflow:0.5");
- doc.appendChild(workflowElement);
-
- Element startElement = doc.createElement("start");
- startElement.setAttribute("to", "testAction");
- workflowElement.appendChild(startElement);
-
- Element actionElement = doc.createElement("action");
- actionElement.setAttribute("name", "testAction");
- Element actionSettingsElement = db.parse(new InputSource(new StringReader(actionNodeXml))).getDocumentElement();
- actionElement.appendChild(doc.importNode(actionSettingsElement, true));
- workflowElement.appendChild(actionElement);
-
- Element actionOkTransitionElement = doc.createElement("ok");
- actionOkTransitionElement.setAttribute("to", "end");
- actionElement.appendChild(actionOkTransitionElement);
-
- Element actionErrorTransitionElement = doc.createElement("error");
- actionErrorTransitionElement.setAttribute("to", "kill");
- actionElement.appendChild(actionErrorTransitionElement);
-
- Element killElement = doc.createElement("kill");
- killElement.setAttribute("name", "kill");
- Element killMessageElement = doc.createElement("message");
- killMessageElement.setTextContent("Kill node message");
- killElement.appendChild(killMessageElement);
- workflowElement.appendChild(killElement);
-
- Element endElement = doc.createElement("end");
- endElement.setAttribute("name", "end");
- workflowElement.appendChild(endElement);
-
- return utils.generateXml(doc);
- } catch (ParserConfigurationException | SAXException | IOException e) {
- LOGGER.error("error in generating workflow xml", e);
- throw new RuntimeException(e);
- }
- }
+ private final static Logger LOGGER = LoggerFactory
+ .getLogger(OozieUtils.class);
+ private Utils utils = new Utils();
+
+ public String generateConfigXml(Map<String, String> map) {
+ DocumentBuilderFactory dbf = DocumentBuilderFactory.newInstance();
+ DocumentBuilder db;
+ try {
+ db = dbf.newDocumentBuilder();
+ Document doc = db.newDocument();
+ Element configElement = doc.createElement("configuration");
+ doc.appendChild(configElement);
+ for (Map.Entry<String, String> entry : map.entrySet()) {
+ Element propElement = doc.createElement("property");
+ configElement.appendChild(propElement);
+ Element nameElem = doc.createElement("name");
+ nameElem.setTextContent(entry.getKey());
+ Element valueElem = doc.createElement("value");
+ valueElem.setTextContent(entry.getValue());
+ propElement.appendChild(nameElem);
+ propElement.appendChild(valueElem);
+ }
+ return utils.generateXml(doc);
+ } catch (ParserConfigurationException e) {
+ LOGGER.error("error in generating config xml", e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ public String getJobPathPropertyKey(JobType jobType) {
+ switch (jobType) {
+ case WORKFLOW:
+ return "oozie.wf.application.path";
+ case COORDINATOR:
+ return "oozie.coord.application.path";
+ case BUNDLE:
+ return "oozie.bundle.application.path";
+ }
+ throw new RuntimeException("Unknown Job Type");
+ }
+
+ public JobType deduceJobType(String xml) {
+ try {
+ DocumentBuilderFactory dbf = DocumentBuilderFactory.newInstance();
+ DocumentBuilder db = null;
+
+ db = dbf.newDocumentBuilder();
+ InputSource is = new InputSource();
+ is.setCharacterStream(new StringReader(xml));
+
+ Document doc = db.parse(is);
+ String rootNode = doc.getDocumentElement().getNodeName();
+ if ("workflow-app".equals(rootNode)) {
+ return JobType.WORKFLOW;
+ } else if ("coordinator-app".equals(rootNode)) {
+ return JobType.COORDINATOR;
+ } else if ("bundle-app".equals(rootNode)) {
+ return JobType.BUNDLE;
+ }
+ throw new RuntimeException("invalid xml submitted");
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public String deduceWorkflowNameFromJson(String json) {
+ JsonElement jsonElement = new JsonParser().parse(json);
+ String name = jsonElement.getAsJsonObject().get("name").getAsString();
+ return name;
+ }
+
+ public String deduceWorkflowNameFromXml(String xml) {
+ try {
+ DocumentBuilderFactory dbf = DocumentBuilderFactory.newInstance();
+ DocumentBuilder db = dbf.newDocumentBuilder();
+ InputSource is = new InputSource();
+ is.setCharacterStream(new StringReader(xml));
+ Document doc = db.parse(is);
+ String name = doc.getDocumentElement().getAttributeNode("name").getValue();
+ return name;
+
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public String generateWorkflowXml(String actionNodeXml) {
+ DocumentBuilderFactory dbf = DocumentBuilderFactory.newInstance();
+ DocumentBuilder db;
+ try {
+ db = dbf.newDocumentBuilder();
+ Document doc = db.newDocument();
+
+ Element workflowElement = doc.createElement("workflow-app");
+ workflowElement.setAttribute("name", "testWorkflow");
+ workflowElement.setAttribute("xmlns", "uri:oozie:workflow:0.5");
+ doc.appendChild(workflowElement);
+
+ Element startElement = doc.createElement("start");
+ startElement.setAttribute("to", "testAction");
+ workflowElement.appendChild(startElement);
+
+ Element actionElement = doc.createElement("action");
+ actionElement.setAttribute("name", "testAction");
+ Element actionSettingsElement = db.parse(
+ new InputSource(new StringReader(actionNodeXml)))
+ .getDocumentElement();
+ actionElement.appendChild(doc.importNode(actionSettingsElement,
+ true));
+ workflowElement.appendChild(actionElement);
+
+ Element actionOkTransitionElement = doc.createElement("ok");
+ actionOkTransitionElement.setAttribute("to", "end");
+ actionElement.appendChild(actionOkTransitionElement);
+
+ Element actionErrorTransitionElement = doc.createElement("error");
+ actionErrorTransitionElement.setAttribute("to", "kill");
+ actionElement.appendChild(actionErrorTransitionElement);
+
+ Element killElement = doc.createElement("kill");
+ killElement.setAttribute("name", "kill");
+ Element killMessageElement = doc.createElement("message");
+ killMessageElement.setTextContent("Kill node message");
+ killElement.appendChild(killMessageElement);
+ workflowElement.appendChild(killElement);
+
+ Element endElement = doc.createElement("end");
+ endElement.setAttribute("name", "end");
+ workflowElement.appendChild(endElement);
+
+ return utils.generateXml(doc);
+ } catch (ParserConfigurationException | SAXException | IOException e) {
+ LOGGER.error("error in generating workflow xml", e);
+ throw new RuntimeException(e);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/47a98824/contrib/views/wfmanager/src/main/java/org/apache/oozie/ambari/view/ServiceFormattedException.java
----------------------------------------------------------------------
diff --git a/contrib/views/wfmanager/src/main/java/org/apache/oozie/ambari/view/ServiceFormattedException.java b/contrib/views/wfmanager/src/main/java/org/apache/oozie/ambari/view/ServiceFormattedException.java
new file mode 100644
index 0000000..3a57d63
--- /dev/null
+++ b/contrib/views/wfmanager/src/main/java/org/apache/oozie/ambari/view/ServiceFormattedException.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.ambari.view;
+
+import java.util.HashMap;
+
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+
+import org.apache.commons.lang.exception.ExceptionUtils;
+import org.json.simple.JSONObject;
+
+public class ServiceFormattedException extends WebApplicationException {
+ private final static int STATUS = 500;
+
+ public ServiceFormattedException(Throwable exception) {
+ super(errorEntity(exception.getMessage(), exception));
+ }
+
+ public ServiceFormattedException(String message, Throwable exception) {
+ super(errorEntity(message, exception));
+ }
+
+ protected static Response errorEntity(String message, Throwable e) {
+ HashMap<String, Object> response = new HashMap<String, Object>();
+ response.put("message", message);
+ String trace = null;
+ if (e != null) {
+ trace = ExceptionUtils.getStackTrace(e);
+ }
+ response.put("trace", trace);
+ response.put("status", STATUS);
+ return Response.status(STATUS).entity(new JSONObject(response))
+ .type(MediaType.APPLICATION_JSON).build();
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/47a98824/contrib/views/wfmanager/src/main/java/org/apache/oozie/ambari/view/WorkflowFilesService.java
----------------------------------------------------------------------
diff --git a/contrib/views/wfmanager/src/main/java/org/apache/oozie/ambari/view/WorkflowFilesService.java b/contrib/views/wfmanager/src/main/java/org/apache/oozie/ambari/view/WorkflowFilesService.java
index 01bde47..f98df0d 100644
--- a/contrib/views/wfmanager/src/main/java/org/apache/oozie/ambari/view/WorkflowFilesService.java
+++ b/contrib/views/wfmanager/src/main/java/org/apache/oozie/ambari/view/WorkflowFilesService.java
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -19,98 +19,104 @@ package org.apache.oozie.ambari.view;
import java.io.IOException;
import java.io.InputStream;
+
import org.apache.hadoop.fs.FileStatus;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class WorkflowFilesService {
- private HDFSFileUtils hdfsFileUtils;
+ private final static Logger LOGGER = LoggerFactory
+ .getLogger(WorkflowFilesService.class);
+ private HDFSFileUtils hdfsFileUtils;
+
+ public WorkflowFilesService(HDFSFileUtils hdfsFileUtils) {
+ super();
+ this.hdfsFileUtils = hdfsFileUtils;
+ }
+
+ public String createWorkflowFile(String appPath, String content,
+ boolean overwrite) throws IOException {
+ return hdfsFileUtils.writeToFile(getWorkflowFileName(appPath), content,
+ overwrite);
+ }
+
+ public String createAssetFile(String appPath, String content,
+ boolean overwrite) throws IOException {
+ return hdfsFileUtils.writeToFile(appPath, content,
+ overwrite);
+ }
- public WorkflowFilesService(HDFSFileUtils hdfsFileUtils) {
- super();
- this.hdfsFileUtils = hdfsFileUtils;
- }
+ public String saveDraft(String appPath, String content, boolean overwrite)
+ throws IOException {
+ return hdfsFileUtils.writeToFile(getWorkflowDrafFileName(appPath),
+ content, overwrite);
+ }
- public String createWorkflowFile(String appPath, String content,
- boolean overwrite) throws IOException {
- return hdfsFileUtils.writeToFile(getWorkflowFileName(appPath), content,
- overwrite);
- }
-
- public String createAssetFile(String appPath, String content,
- boolean overwrite) throws IOException {
- return hdfsFileUtils.writeToFile(appPath, content,
- overwrite);
- }
+ public InputStream readDraft(String appPath) throws IOException {
+ return hdfsFileUtils.read(getWorkflowDrafFileName(appPath));
+ }
- public String saveDraft(String appPath, String content, boolean overwrite)
- throws IOException {
- return hdfsFileUtils.writeToFile(getWorkflowDrafFileName(appPath),
- content, overwrite);
- }
+ public InputStream readWorkflowXml(String appPath) throws IOException {
+ return hdfsFileUtils.read(getWorkflowFileName(appPath));
+ }
- public InputStream readDraft(String appPath) throws IOException {
- return hdfsFileUtils.read(getWorkflowDrafFileName(appPath));
- }
- public InputStream readWorkflowXml(String appPath) throws IOException {
- return hdfsFileUtils.read(getWorkflowFileName(appPath));
- }
+ private String getWorkflowDrafFileName(String appPath) {
+ return getWorkflowFileName(appPath).concat(".draft.json");
+ }
- private String getWorkflowDrafFileName(String appPath) {
- return getWorkflowFileName(appPath).concat(".draft.json");
- }
+ private String getWorkflowFileName(String appPath) {
+ String workflowFile = null;
+ if (appPath.endsWith(".xml")) {
+ workflowFile = appPath;
+ } else {
+ workflowFile = appPath + (appPath.endsWith("/") ? "" : "/")
+ + "workflow.xml";
+ }
+ return workflowFile;
+ }
- private String getWorkflowFileName(String appPath) {
- String workflowFile = null;
- if (appPath.endsWith(".xml")) {
- workflowFile = appPath;
- } else {
- workflowFile = appPath + (appPath.endsWith("/") ? "" : "/")
- + "workflow.xml";
- }
- return workflowFile;
- }
-
- public String getAssetFileName(String appPath) {
- String assetFile = null;
- if (appPath.endsWith(".xml")) {
- assetFile = appPath;
- } else {
- assetFile = appPath + (appPath.endsWith("/") ? "" : "/")
- + "asset.xml";
- }
- return assetFile;
- }
+ public String getAssetFileName(String appPath) {
+ String assetFile = null;
+ if (appPath.endsWith(".xml")) {
+ assetFile = appPath;
+ } else {
+ assetFile = appPath + (appPath.endsWith("/") ? "" : "/")
+ + "asset.xml";
+ }
+ return assetFile;
+ }
- public void discardDraft(String workflowPath) throws IOException {
- hdfsFileUtils.deleteFile(getWorkflowDrafFileName(workflowPath));
+ public void discardDraft(String workflowPath) throws IOException {
+ hdfsFileUtils.deleteFile(getWorkflowDrafFileName(workflowPath));
- }
+ }
- public WorkflowFileInfo getWorkflowDetails(String appPath) {
- WorkflowFileInfo workflowInfo = new WorkflowFileInfo();
- workflowInfo.setWorkflowPath(getWorkflowFileName(appPath));
- boolean draftExists = hdfsFileUtils
- .fileExists(getWorkflowDrafFileName(appPath));
- workflowInfo.setDraftExists(draftExists);
- boolean workflowExists = hdfsFileUtils.fileExists(getWorkflowFileName(appPath));
- FileStatus workflowFileStatus = null;
- if (workflowExists){
- workflowFileStatus = hdfsFileUtils
- .getFileStatus(getWorkflowFileName(appPath));
- workflowInfo.setWorkflowModificationTime(workflowFileStatus
- .getModificationTime());
- }
- if (draftExists) {
- FileStatus draftFileStatus = hdfsFileUtils
- .getFileStatus(getWorkflowDrafFileName(appPath));
- workflowInfo.setDraftModificationTime(draftFileStatus
- .getModificationTime());
- if (!workflowExists){
- workflowInfo.setIsDraftCurrent(true);
- }else{
- workflowInfo.setIsDraftCurrent(draftFileStatus.getModificationTime()
- - workflowFileStatus.getModificationTime() > 0);
- }
- }
- return workflowInfo;
- }
+ public WorkflowFileInfo getWorkflowDetails(String appPath) {
+ WorkflowFileInfo workflowInfo = new WorkflowFileInfo();
+ workflowInfo.setWorkflowPath(getWorkflowFileName(appPath));
+ boolean draftExists = hdfsFileUtils
+ .fileExists(getWorkflowDrafFileName(appPath));
+ workflowInfo.setDraftExists(draftExists);
+ boolean workflowExists = hdfsFileUtils.fileExists(getWorkflowFileName(appPath));
+ FileStatus workflowFileStatus = null;
+ if (workflowExists) {
+ workflowFileStatus = hdfsFileUtils
+ .getFileStatus(getWorkflowFileName(appPath));
+ workflowInfo.setWorkflowModificationTime(workflowFileStatus
+ .getModificationTime());
+ }
+ if (draftExists) {
+ FileStatus draftFileStatus = hdfsFileUtils
+ .getFileStatus(getWorkflowDrafFileName(appPath));
+ workflowInfo.setDraftModificationTime(draftFileStatus
+ .getModificationTime());
+ if (!workflowExists) {
+ workflowInfo.setIsDraftCurrent(true);
+ } else {
+ workflowInfo.setIsDraftCurrent(draftFileStatus.getModificationTime()
+ - workflowFileStatus.getModificationTime() > 0);
+ }
+ }
+ return workflowInfo;
+ }
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/47a98824/contrib/views/wfmanager/src/main/java/org/apache/oozie/ambari/view/assets/AssetDefinitionRepo.java
----------------------------------------------------------------------
diff --git a/contrib/views/wfmanager/src/main/java/org/apache/oozie/ambari/view/assets/AssetDefinitionRepo.java b/contrib/views/wfmanager/src/main/java/org/apache/oozie/ambari/view/assets/AssetDefinitionRepo.java
new file mode 100644
index 0000000..cebc7ea
--- /dev/null
+++ b/contrib/views/wfmanager/src/main/java/org/apache/oozie/ambari/view/assets/AssetDefinitionRepo.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.ambari.view.assets;
+
+import org.apache.ambari.view.DataStore;
+import org.apache.oozie.ambari.view.assets.model.ActionAssetDefinition;
+import org.apache.oozie.ambari.view.repo.BaseRepo;
+
+public class AssetDefinitionRepo extends BaseRepo<ActionAssetDefinition> {
+ public AssetDefinitionRepo(DataStore dataStore) {
+ super(ActionAssetDefinition.class, dataStore);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/47a98824/contrib/views/wfmanager/src/main/java/org/apache/oozie/ambari/view/assets/AssetRepo.java
----------------------------------------------------------------------
diff --git a/contrib/views/wfmanager/src/main/java/org/apache/oozie/ambari/view/assets/AssetRepo.java b/contrib/views/wfmanager/src/main/java/org/apache/oozie/ambari/view/assets/AssetRepo.java
index 1390ec0..df936a4 100644
--- a/contrib/views/wfmanager/src/main/java/org/apache/oozie/ambari/view/assets/AssetRepo.java
+++ b/contrib/views/wfmanager/src/main/java/org/apache/oozie/ambari/view/assets/AssetRepo.java
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -17,26 +17,23 @@
*/
package org.apache.oozie.ambari.view.assets;
-import java.util.Collection;
-
import org.apache.ambari.view.DataStore;
+import org.apache.ambari.view.PersistenceException;
import org.apache.oozie.ambari.view.assets.model.ActionAsset;
+import org.apache.oozie.ambari.view.repo.BaseRepo;
-public class AssetRepo {
- private final DataStore dataStore;
+import java.util.Collection;
- public AssetRepo(DataStore dataStore) {
- super();
- this.dataStore = dataStore;
- }
- public void saveAsset(ActionAsset asset){
-
- }
- public void deleteAsset(ActionAsset asset){
-
- }
- public Collection<ActionAsset>listAllAssets(){
- return null;
- }
+public class AssetRepo extends BaseRepo<ActionAsset> {
+ public AssetRepo(DataStore dataStore) {
+ super(ActionAsset.class, dataStore);
+ }
+ public Collection<ActionAsset> getMyAsets(String userName) {
+ try {
+ return dataStore.findAll(ActionAsset.class, " owner='" + userName + "'");
+ } catch (PersistenceException e) {
+ throw new RuntimeException(e);
+ }
+ }
}