You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ka...@apache.org on 2014/06/30 22:23:39 UTC
git commit: [HELIX-455] Add REST API for submitting jobs
Repository: helix
Updated Branches:
refs/heads/helix-0.6.x 7ecee11d6 -> 7e93690ee
[HELIX-455] Add REST API for submitting jobs
Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/7e93690e
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/7e93690e
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/7e93690e
Branch: refs/heads/helix-0.6.x
Commit: 7e93690eeaf9d0f336703cda2d1acf8517d084b3
Parents: 7ecee11
Author: Kanak Biscuitwala <ka...@apache.org>
Authored: Fri Jun 27 11:39:39 2014 -0700
Committer: Kanak Biscuitwala <ka...@apache.org>
Committed: Fri Jun 27 11:39:39 2014 -0700
----------------------------------------------------------------------
.../helix/webapp/RestAdminApplication.java | 2 +
.../webapp/resources/WorkflowsResource.java | 138 +++++++++++++++++++
2 files changed, 140 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/helix/blob/7e93690e/helix-admin-webapp/src/main/java/org/apache/helix/webapp/RestAdminApplication.java
----------------------------------------------------------------------
diff --git a/helix-admin-webapp/src/main/java/org/apache/helix/webapp/RestAdminApplication.java b/helix-admin-webapp/src/main/java/org/apache/helix/webapp/RestAdminApplication.java
index 0940c39..7e7a3b9 100644
--- a/helix-admin-webapp/src/main/java/org/apache/helix/webapp/RestAdminApplication.java
+++ b/helix-admin-webapp/src/main/java/org/apache/helix/webapp/RestAdminApplication.java
@@ -50,6 +50,7 @@ import org.apache.helix.webapp.resources.StateModelResource;
import org.apache.helix.webapp.resources.StateModelsResource;
import org.apache.helix.webapp.resources.StatusUpdateResource;
import org.apache.helix.webapp.resources.StatusUpdatesResource;
+import org.apache.helix.webapp.resources.WorkflowsResource;
import org.apache.helix.webapp.resources.ZkChildResource;
import org.apache.helix.webapp.resources.ZkPathResource;
import org.restlet.Application;
@@ -90,6 +91,7 @@ public class RestAdminApplication extends Application {
router.attach("/clusters/{clusterName}/resourceGroups", ResourceGroupsResource.class);
router.attach("/clusters/{clusterName}/resourceGroups/{resourceName}",
ResourceGroupResource.class);
+ router.attach("/clusters/{clusterName}/workflows", WorkflowsResource.class);
router.attach("/clusters/{clusterName}/instances", InstancesResource.class);
router.attach("/clusters/{clusterName}/instances/{instanceName}", InstanceResource.class);
router.attach("/clusters/{clusterName}/instances/{instanceName}/currentState/{resourceName}",
http://git-wip-us.apache.org/repos/asf/helix/blob/7e93690e/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/WorkflowsResource.java
----------------------------------------------------------------------
diff --git a/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/WorkflowsResource.java b/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/WorkflowsResource.java
new file mode 100644
index 0000000..f09155b
--- /dev/null
+++ b/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/WorkflowsResource.java
@@ -0,0 +1,138 @@
+package org.apache.helix.webapp.resources;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixException;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.HelixProperty;
+import org.apache.helix.InstanceType;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.task.TaskDriver;
+import org.apache.helix.task.Workflow;
+import org.apache.helix.task.WorkflowConfig;
+import org.apache.helix.webapp.RestAdminApplication;
+import org.apache.log4j.Logger;
+import org.codehaus.jackson.JsonGenerationException;
+import org.codehaus.jackson.map.JsonMappingException;
+import org.restlet.data.Form;
+import org.restlet.data.MediaType;
+import org.restlet.data.Parameter;
+import org.restlet.data.Status;
+import org.restlet.representation.Representation;
+import org.restlet.representation.StringRepresentation;
+import org.restlet.representation.Variant;
+import org.restlet.resource.ServerResource;
+
+import com.google.common.collect.Lists;
+
+public class WorkflowsResource extends ServerResource {
+ private final static Logger LOG = Logger.getLogger(WorkflowsResource.class);
+
+ public WorkflowsResource() {
+ getVariants().add(new Variant(MediaType.TEXT_PLAIN));
+ getVariants().add(new Variant(MediaType.APPLICATION_JSON));
+ setNegotiated(false);
+ }
+
+ @Override
+ public Representation get() {
+ StringRepresentation presentation = null;
+ try {
+ String clusterName = (String) getRequest().getAttributes().get("clusterName");
+ presentation = getHostedEntitiesRepresentation(clusterName);
+ }
+
+ catch (Exception e) {
+ String error = ClusterRepresentationUtil.getErrorAsJsonStringFromException(e);
+ presentation = new StringRepresentation(error, MediaType.APPLICATION_JSON);
+
+ LOG.error("", e);
+ }
+ return presentation;
+ }
+
+ StringRepresentation getHostedEntitiesRepresentation(String clusterName)
+ throws JsonGenerationException, JsonMappingException, IOException {
+ // Get all resources
+ ZkClient zkClient = (ZkClient) getContext().getAttributes().get(RestAdminApplication.ZKCLIENT);
+ HelixDataAccessor accessor =
+ ClusterRepresentationUtil.getClusterDataAccessor(zkClient, clusterName);
+ PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+ Map<String, HelixProperty> resourceConfigMap =
+ accessor.getChildValuesMap(keyBuilder.resourceConfigs());
+
+ // Create the result
+ ZNRecord hostedEntitiesRecord = new ZNRecord("Workflows");
+
+ // Filter out non-workflow resources
+ Iterator<Map.Entry<String, HelixProperty>> it = resourceConfigMap.entrySet().iterator();
+ while (it.hasNext()) {
+ Map.Entry<String, HelixProperty> e = it.next();
+ HelixProperty resource = e.getValue();
+ Map<String, String> simpleFields = resource.getRecord().getSimpleFields();
+ if (!simpleFields.containsKey(WorkflowConfig.TARGET_STATE)
+ || !simpleFields.containsKey(WorkflowConfig.DAG)) {
+ it.remove();
+ }
+ }
+
+ // Populate the result
+ List<String> allResources = Lists.newArrayList(resourceConfigMap.keySet());
+ hostedEntitiesRecord.setListField("WorkflowList", allResources);
+
+ StringRepresentation representation =
+ new StringRepresentation(ClusterRepresentationUtil.ZNRecordToJson(hostedEntitiesRecord),
+ MediaType.APPLICATION_JSON);
+
+ return representation;
+ }
+
+ @Override
+ public Representation post(Representation entity) {
+ try {
+ String clusterName = (String) getRequest().getAttributes().get("clusterName");
+ Form form = new Form(entity);
+
+ // Get the workflow and submit it
+ if (form.size() < 1) {
+ throw new HelixException("yaml workflow is required!");
+ }
+ Parameter payload = form.get(0);
+ String yamlPayload = payload.getName();
+ if (yamlPayload == null) {
+ throw new HelixException("yaml workflow is required!");
+ }
+ String zkAddr =
+ (String) getContext().getAttributes().get(RestAdminApplication.ZKSERVERADDRESS);
+ HelixManager manager =
+ HelixManagerFactory.getZKHelixManager(clusterName, null, InstanceType.ADMINISTRATOR,
+ zkAddr);
+ manager.connect();
+ try {
+ Workflow workflow = Workflow.parse(yamlPayload);
+ TaskDriver driver = new TaskDriver(manager);
+ driver.start(workflow);
+ } finally {
+ manager.disconnect();
+ }
+
+ getResponse().setEntity(getHostedEntitiesRepresentation(clusterName));
+ getResponse().setStatus(Status.SUCCESS_OK);
+ }
+
+ catch (Exception e) {
+ getResponse().setEntity(ClusterRepresentationUtil.getErrorAsJsonStringFromException(e),
+ MediaType.APPLICATION_JSON);
+ getResponse().setStatus(Status.SUCCESS_OK);
+ LOG.error("Error in posting " + entity, e);
+ }
+ return null;
+ }
+}