You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ka...@apache.org on 2014/06/30 22:23:39 UTC

git commit: [HELIX-455] Add REST API for submitting jobs

Repository: helix
Updated Branches:
  refs/heads/helix-0.6.x 7ecee11d6 -> 7e93690ee


[HELIX-455] Add REST API for submitting jobs


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/7e93690e
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/7e93690e
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/7e93690e

Branch: refs/heads/helix-0.6.x
Commit: 7e93690eeaf9d0f336703cda2d1acf8517d084b3
Parents: 7ecee11
Author: Kanak Biscuitwala <ka...@apache.org>
Authored: Fri Jun 27 11:39:39 2014 -0700
Committer: Kanak Biscuitwala <ka...@apache.org>
Committed: Fri Jun 27 11:39:39 2014 -0700

----------------------------------------------------------------------
 .../helix/webapp/RestAdminApplication.java      |   2 +
 .../webapp/resources/WorkflowsResource.java     | 138 +++++++++++++++++++
 2 files changed, 140 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/7e93690e/helix-admin-webapp/src/main/java/org/apache/helix/webapp/RestAdminApplication.java
----------------------------------------------------------------------
diff --git a/helix-admin-webapp/src/main/java/org/apache/helix/webapp/RestAdminApplication.java b/helix-admin-webapp/src/main/java/org/apache/helix/webapp/RestAdminApplication.java
index 0940c39..7e7a3b9 100644
--- a/helix-admin-webapp/src/main/java/org/apache/helix/webapp/RestAdminApplication.java
+++ b/helix-admin-webapp/src/main/java/org/apache/helix/webapp/RestAdminApplication.java
@@ -50,6 +50,7 @@ import org.apache.helix.webapp.resources.StateModelResource;
 import org.apache.helix.webapp.resources.StateModelsResource;
 import org.apache.helix.webapp.resources.StatusUpdateResource;
 import org.apache.helix.webapp.resources.StatusUpdatesResource;
+import org.apache.helix.webapp.resources.WorkflowsResource;
 import org.apache.helix.webapp.resources.ZkChildResource;
 import org.apache.helix.webapp.resources.ZkPathResource;
 import org.restlet.Application;
@@ -90,6 +91,7 @@ public class RestAdminApplication extends Application {
     router.attach("/clusters/{clusterName}/resourceGroups", ResourceGroupsResource.class);
     router.attach("/clusters/{clusterName}/resourceGroups/{resourceName}",
         ResourceGroupResource.class);
+    router.attach("/clusters/{clusterName}/workflows", WorkflowsResource.class);
     router.attach("/clusters/{clusterName}/instances", InstancesResource.class);
     router.attach("/clusters/{clusterName}/instances/{instanceName}", InstanceResource.class);
     router.attach("/clusters/{clusterName}/instances/{instanceName}/currentState/{resourceName}",

http://git-wip-us.apache.org/repos/asf/helix/blob/7e93690e/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/WorkflowsResource.java
----------------------------------------------------------------------
diff --git a/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/WorkflowsResource.java b/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/WorkflowsResource.java
new file mode 100644
index 0000000..f09155b
--- /dev/null
+++ b/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/WorkflowsResource.java
@@ -0,0 +1,138 @@
+package org.apache.helix.webapp.resources;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixException;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.HelixProperty;
+import org.apache.helix.InstanceType;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.task.TaskDriver;
+import org.apache.helix.task.Workflow;
+import org.apache.helix.task.WorkflowConfig;
+import org.apache.helix.webapp.RestAdminApplication;
+import org.apache.log4j.Logger;
+import org.codehaus.jackson.JsonGenerationException;
+import org.codehaus.jackson.map.JsonMappingException;
+import org.restlet.data.Form;
+import org.restlet.data.MediaType;
+import org.restlet.data.Parameter;
+import org.restlet.data.Status;
+import org.restlet.representation.Representation;
+import org.restlet.representation.StringRepresentation;
+import org.restlet.representation.Variant;
+import org.restlet.resource.ServerResource;
+
+import com.google.common.collect.Lists;
+
+public class WorkflowsResource extends ServerResource {
+  private final static Logger LOG = Logger.getLogger(WorkflowsResource.class);
+
+  public WorkflowsResource() {
+    getVariants().add(new Variant(MediaType.TEXT_PLAIN));
+    getVariants().add(new Variant(MediaType.APPLICATION_JSON));
+    setNegotiated(false);
+  }
+
+  @Override
+  public Representation get() {
+    StringRepresentation presentation = null;
+    try {
+      String clusterName = (String) getRequest().getAttributes().get("clusterName");
+      presentation = getHostedEntitiesRepresentation(clusterName);
+    }
+
+    catch (Exception e) {
+      String error = ClusterRepresentationUtil.getErrorAsJsonStringFromException(e);
+      presentation = new StringRepresentation(error, MediaType.APPLICATION_JSON);
+
+      LOG.error("", e);
+    }
+    return presentation;
+  }
+
+  StringRepresentation getHostedEntitiesRepresentation(String clusterName)
+      throws JsonGenerationException, JsonMappingException, IOException {
+    // Get all resources
+    ZkClient zkClient = (ZkClient) getContext().getAttributes().get(RestAdminApplication.ZKCLIENT);
+    HelixDataAccessor accessor =
+        ClusterRepresentationUtil.getClusterDataAccessor(zkClient, clusterName);
+    PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+    Map<String, HelixProperty> resourceConfigMap =
+        accessor.getChildValuesMap(keyBuilder.resourceConfigs());
+
+    // Create the result
+    ZNRecord hostedEntitiesRecord = new ZNRecord("Workflows");
+
+    // Filter out non-workflow resources
+    Iterator<Map.Entry<String, HelixProperty>> it = resourceConfigMap.entrySet().iterator();
+    while (it.hasNext()) {
+      Map.Entry<String, HelixProperty> e = it.next();
+      HelixProperty resource = e.getValue();
+      Map<String, String> simpleFields = resource.getRecord().getSimpleFields();
+      if (!simpleFields.containsKey(WorkflowConfig.TARGET_STATE)
+          || !simpleFields.containsKey(WorkflowConfig.DAG)) {
+        it.remove();
+      }
+    }
+
+    // Populate the result
+    List<String> allResources = Lists.newArrayList(resourceConfigMap.keySet());
+    hostedEntitiesRecord.setListField("WorkflowList", allResources);
+
+    StringRepresentation representation =
+        new StringRepresentation(ClusterRepresentationUtil.ZNRecordToJson(hostedEntitiesRecord),
+            MediaType.APPLICATION_JSON);
+
+    return representation;
+  }
+
+  @Override
+  public Representation post(Representation entity) {
+    try {
+      String clusterName = (String) getRequest().getAttributes().get("clusterName");
+      Form form = new Form(entity);
+
+      // Get the workflow and submit it
+      if (form.size() < 1) {
+        throw new HelixException("yaml workflow is required!");
+      }
+      Parameter payload = form.get(0);
+      String yamlPayload = payload.getName();
+      if (yamlPayload == null) {
+        throw new HelixException("yaml workflow is required!");
+      }
+      String zkAddr =
+          (String) getContext().getAttributes().get(RestAdminApplication.ZKSERVERADDRESS);
+      HelixManager manager =
+          HelixManagerFactory.getZKHelixManager(clusterName, null, InstanceType.ADMINISTRATOR,
+              zkAddr);
+      manager.connect();
+      try {
+        Workflow workflow = Workflow.parse(yamlPayload);
+        TaskDriver driver = new TaskDriver(manager);
+        driver.start(workflow);
+      } finally {
+        manager.disconnect();
+      }
+
+      getResponse().setEntity(getHostedEntitiesRepresentation(clusterName));
+      getResponse().setStatus(Status.SUCCESS_OK);
+    }
+
+    catch (Exception e) {
+      getResponse().setEntity(ClusterRepresentationUtil.getErrorAsJsonStringFromException(e),
+          MediaType.APPLICATION_JSON);
+      getResponse().setStatus(Status.SUCCESS_OK);
+      LOG.error("Error in posting " + entity, e);
+    }
+    return null;
+  }
+}