You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by zz...@apache.org on 2014/12/02 23:07:55 UTC
helix git commit: [HELIX-546] Add REST API for Helix job queue
management - second part, rb=28584
Repository: helix
Updated Branches:
refs/heads/helix-0.6.x 059ab387b -> aa2e968f7
[HELIX-546] Add REST API for Helix job queue management - second part, rb=28584
Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/aa2e968f
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/aa2e968f
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/aa2e968f
Branch: refs/heads/helix-0.6.x
Commit: aa2e968f7ac4944ce60f9defdeee2fb9b8638cb9
Parents: 059ab38
Author: zzhang <zz...@apache.org>
Authored: Tue Dec 2 14:07:34 2014 -0800
Committer: zzhang <zz...@apache.org>
Committed: Tue Dec 2 14:07:34 2014 -0800
----------------------------------------------------------------------
.../apache/helix/webapp/HelixAdminWebApp.java | 16 +-
.../helix/webapp/RestAdminApplication.java | 6 +
.../resources/ClusterRepresentationUtil.java | 4 +-
.../webapp/resources/JobQueueResource.java | 173 +++++++++++++++++
.../webapp/resources/JobQueuesResource.java | 148 +++++++++++++++
.../helix/webapp/resources/JobResource.java | 107 +++++++++++
.../helix/webapp/resources/JsonParameters.java | 21 +-
.../helix/webapp/resources/ResourceUtil.java | 95 ++++++++++
.../org/apache/helix/webapp/AdminTestBase.java | 7 +-
.../apache/helix/webapp/AdminTestHelper.java | 57 ++++++
.../java/resources/TestJobQueuesResource.java | 190 +++++++++++++++++++
.../test/java/resources/TestJsonParameters.java | 44 +++++
.../apache/helix/manager/zk/ZKHelixManager.java | 2 +-
.../java/org/apache/helix/task/TaskDriver.java | 9 +-
.../java/org/apache/helix/task/Workflow.java | 2 +-
pom.xml | 2 +-
16 files changed, 852 insertions(+), 31 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/helix/blob/aa2e968f/helix-admin-webapp/src/main/java/org/apache/helix/webapp/HelixAdminWebApp.java
----------------------------------------------------------------------
diff --git a/helix-admin-webapp/src/main/java/org/apache/helix/webapp/HelixAdminWebApp.java b/helix-admin-webapp/src/main/java/org/apache/helix/webapp/HelixAdminWebApp.java
index 1988636..991886c 100644
--- a/helix-admin-webapp/src/main/java/org/apache/helix/webapp/HelixAdminWebApp.java
+++ b/helix-admin-webapp/src/main/java/org/apache/helix/webapp/HelixAdminWebApp.java
@@ -28,12 +28,12 @@ import org.restlet.data.Protocol;
public class HelixAdminWebApp {
public final Logger LOG = Logger.getLogger(HelixAdminWebApp.class);
- RestAdminApplication _adminApp = null;
- Component _component = null;
+ private RestAdminApplication _adminApp = null;
+ private Component _component = null;
- int _helixAdminPort;
- String _zkServerAddress;
- ZkClient _zkClient;
+ private final int _helixAdminPort;
+ private final String _zkServerAddress;
+ private ZkClient _zkClient = null;
public HelixAdminWebApp(String zkServerAddress, int adminPort) {
_zkServerAddress = zkServerAddress;
@@ -58,14 +58,16 @@ public class HelixAdminWebApp {
_component.getDefaultHost().attach(_adminApp);
_component.start();
}
- LOG.info("helixAdminWebApp started on port " + _helixAdminPort);
+ LOG.info("helixAdminWebApp started on port: " + _helixAdminPort);
}
public synchronized void stop() {
+ LOG.info("Stopping helixAdminWebApp");
try {
_component.stop();
+ LOG.info("Stopped helixAdminWebApp");
} catch (Exception e) {
- LOG.error("", e);
+ LOG.error("Exception in stopping helixAdminWebApp", e);
} finally {
if (_zkClient != null) {
_zkClient.close();
http://git-wip-us.apache.org/repos/asf/helix/blob/aa2e968f/helix-admin-webapp/src/main/java/org/apache/helix/webapp/RestAdminApplication.java
----------------------------------------------------------------------
diff --git a/helix-admin-webapp/src/main/java/org/apache/helix/webapp/RestAdminApplication.java b/helix-admin-webapp/src/main/java/org/apache/helix/webapp/RestAdminApplication.java
index 7e7a3b9..9842a3d 100644
--- a/helix-admin-webapp/src/main/java/org/apache/helix/webapp/RestAdminApplication.java
+++ b/helix-admin-webapp/src/main/java/org/apache/helix/webapp/RestAdminApplication.java
@@ -43,6 +43,9 @@ import org.apache.helix.webapp.resources.ExternalViewResource;
import org.apache.helix.webapp.resources.IdealStateResource;
import org.apache.helix.webapp.resources.InstanceResource;
import org.apache.helix.webapp.resources.InstancesResource;
+import org.apache.helix.webapp.resources.JobQueuesResource;
+import org.apache.helix.webapp.resources.JobQueueResource;
+import org.apache.helix.webapp.resources.JobResource;
import org.apache.helix.webapp.resources.ResourceGroupResource;
import org.apache.helix.webapp.resources.ResourceGroupsResource;
import org.apache.helix.webapp.resources.SchedulerTasksResource;
@@ -92,6 +95,9 @@ public class RestAdminApplication extends Application {
router.attach("/clusters/{clusterName}/resourceGroups/{resourceName}",
ResourceGroupResource.class);
router.attach("/clusters/{clusterName}/workflows", WorkflowsResource.class);
+ router.attach("/clusters/{clusterName}/jobQueues", JobQueuesResource.class);
+ router.attach("/clusters/{clusterName}/jobQueues/{jobQueue}", JobQueueResource.class);
+ router.attach("/clusters/{clusterName}/jobQueues/{jobQueue}/{job}", JobResource.class);
router.attach("/clusters/{clusterName}/instances", InstancesResource.class);
router.attach("/clusters/{clusterName}/instances/{instanceName}", InstanceResource.class);
router.attach("/clusters/{clusterName}/instances/{instanceName}/currentState/{resourceName}",
http://git-wip-us.apache.org/repos/asf/helix/blob/aa2e968f/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/ClusterRepresentationUtil.java
----------------------------------------------------------------------
diff --git a/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/ClusterRepresentationUtil.java b/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/ClusterRepresentationUtil.java
index f227801..5e458c4 100644
--- a/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/ClusterRepresentationUtil.java
+++ b/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/ClusterRepresentationUtil.java
@@ -53,9 +53,7 @@ public class ClusterRepresentationUtil {
private static final ZNRecord EMPTY_ZNRECORD = new ZNRecord("EMPTY_ZNRECORD");
public static String getClusterPropertyAsString(ZkClient zkClient, String clusterName,
- PropertyKey propertyKey,
- // String key,
- MediaType mediaType)
+ PropertyKey propertyKey, MediaType mediaType)
throws JsonGenerationException, JsonMappingException, IOException {
return getClusterPropertyAsString(zkClient, clusterName, mediaType, propertyKey);
http://git-wip-us.apache.org/repos/asf/helix/blob/aa2e968f/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/JobQueueResource.java
----------------------------------------------------------------------
diff --git a/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/JobQueueResource.java b/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/JobQueueResource.java
new file mode 100644
index 0000000..3ff9a37
--- /dev/null
+++ b/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/JobQueueResource.java
@@ -0,0 +1,173 @@
+package org.apache.helix.webapp.resources;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixException;
+import org.apache.helix.HelixProperty;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.PropertyPathConfig;
+import org.apache.helix.PropertyType;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.store.HelixPropertyStore;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
+import org.apache.helix.task.JobConfig;
+import org.apache.helix.task.TaskDriver;
+import org.apache.helix.task.TaskUtil;
+import org.apache.helix.task.Workflow;
+import org.apache.helix.task.WorkflowContext;
+import org.apache.log4j.Logger;
+import org.restlet.data.Form;
+import org.restlet.data.MediaType;
+import org.restlet.data.Status;
+import org.restlet.representation.Representation;
+import org.restlet.representation.StringRepresentation;
+import org.restlet.representation.Variant;
+import org.restlet.resource.ServerResource;
+
+import java.util.Map;
+
+public class JobQueueResource extends ServerResource {
+ private final static Logger LOG = Logger.getLogger(JobQueueResource.class);
+
+ public JobQueueResource() {
+ getVariants().add(new Variant(MediaType.TEXT_PLAIN));
+ getVariants().add(new Variant(MediaType.APPLICATION_JSON));
+ setNegotiated(false);
+ }
+
+ @Override
+ public Representation get() {
+ StringRepresentation presentation;
+ try {
+ String clusterName =
+ ResourceUtil.getAttributeFromRequest(getRequest(), ResourceUtil.RequestKey.CLUSTER_NAME);
+ String jobQueueName =
+ ResourceUtil.getAttributeFromRequest(getRequest(), ResourceUtil.RequestKey.JOB_QUEUE);
+ presentation = getHostedEntitiesRepresentation(clusterName, jobQueueName);
+ } catch (Exception e) {
+ String error = ClusterRepresentationUtil.getErrorAsJsonStringFromException(e);
+ presentation = new StringRepresentation(error, MediaType.APPLICATION_JSON);
+ LOG.error("Fail to get job queue", e);
+ }
+ return presentation;
+ }
+
+ StringRepresentation getHostedEntitiesRepresentation(String clusterName, String jobQueueName)
+ throws Exception {
+ ZkClient zkClient =
+ ResourceUtil.getAttributeFromCtx(getContext(), ResourceUtil.ContextKey.ZKCLIENT);
+ HelixDataAccessor accessor =
+ ClusterRepresentationUtil.getClusterDataAccessor(zkClient, clusterName);
+ PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+
+ // Get job queue config
+ HelixProperty jobQueueConfig = accessor.getProperty(keyBuilder.resourceConfig(jobQueueName));
+
+ // Get job queue context
+ String path = PropertyPathConfig.getPath(PropertyType.PROPERTYSTORE, clusterName);
+ HelixPropertyStore<ZNRecord> propertyStore =
+ new ZkHelixPropertyStore<ZNRecord>(new ZkBaseDataAccessor<ZNRecord>(zkClient), path, null);
+ WorkflowContext ctx = TaskUtil.getWorkflowContext(propertyStore, jobQueueName);
+
+ // Create the result
+ ZNRecord hostedEntitiesRecord = new ZNRecord(jobQueueName);
+ if (jobQueueConfig != null) {
+ hostedEntitiesRecord.merge(jobQueueConfig.getRecord());
+ }
+ if (ctx != null) {
+ hostedEntitiesRecord.merge(ctx.getRecord());
+ }
+
+ StringRepresentation representation =
+ new StringRepresentation(ClusterRepresentationUtil.ZNRecordToJson(hostedEntitiesRecord),
+ MediaType.APPLICATION_JSON);
+
+ return representation;
+ }
+
+ @Override
+ public Representation post(Representation entity) {
+ String clusterName =
+ ResourceUtil.getAttributeFromRequest(getRequest(), ResourceUtil.RequestKey.CLUSTER_NAME);
+ String jobQueueName =
+ ResourceUtil.getAttributeFromRequest(getRequest(), ResourceUtil.RequestKey.JOB_QUEUE);
+ ZkClient zkClient =
+ ResourceUtil.getAttributeFromCtx(getContext(), ResourceUtil.ContextKey.ZKCLIENT);
+ try {
+ TaskDriver driver = new TaskDriver(zkClient, clusterName);
+
+ Form form = new Form(entity);
+ JsonParameters jsonParameters = new JsonParameters(form);
+
+ TaskDriver.DriverCommand cmd = TaskDriver.DriverCommand.valueOf(jsonParameters.getCommand());
+ switch (cmd) {
+ case start: {
+ // Get the job queue and submit it
+ String yamlPayload = ResourceUtil.getYamlParameters(form, ResourceUtil.YamlParamKey.NEW_JOB);
+ if (yamlPayload == null) {
+ throw new HelixException("Yaml job config is required!");
+ }
+ Workflow workflow = Workflow.parse(yamlPayload);
+
+ for (String jobName : workflow.getJobConfigs().keySet()) {
+ Map<String, String> jobCfgMap = workflow.getJobConfigs().get(jobName);
+ JobConfig.Builder jobCfgBuilder = JobConfig.Builder.fromMap(jobCfgMap);
+ if (workflow.getTaskConfigs() != null && workflow.getTaskConfigs().containsKey(jobName)) {
+ jobCfgBuilder.addTaskConfigs(workflow.getTaskConfigs().get(jobName));
+ }
+ driver.enqueueJob(jobQueueName, TaskUtil.getDenamespacedJobName(jobQueueName, jobName),
+ jobCfgBuilder);
+ }
+ break;
+ }
+ case stop: {
+ driver.stop(jobQueueName);
+ break;
+ }
+ case resume: {
+ driver.resume(jobQueueName);
+ break;
+ }
+ case flush: {
+ driver.flushQueue(jobQueueName);
+ break;
+ }
+ case delete: {
+ driver.delete(jobQueueName);
+ break;
+ }
+ default:
+ throw new HelixException("Unsupported job queue command: " + cmd);
+ }
+ getResponse().setEntity(getHostedEntitiesRepresentation(clusterName, jobQueueName));
+ getResponse().setStatus(Status.SUCCESS_OK);
+
+ } catch (Exception e) {
+ getResponse().setEntity(ClusterRepresentationUtil.getErrorAsJsonStringFromException(e),
+ MediaType.APPLICATION_JSON);
+ getResponse().setStatus(Status.SUCCESS_OK);
+ LOG.error("Error in posting job queue: " + entity, e);
+ }
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/helix/blob/aa2e968f/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/JobQueuesResource.java
----------------------------------------------------------------------
diff --git a/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/JobQueuesResource.java b/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/JobQueuesResource.java
new file mode 100644
index 0000000..24a4387
--- /dev/null
+++ b/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/JobQueuesResource.java
@@ -0,0 +1,148 @@
+package org.apache.helix.webapp.resources;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import com.google.common.collect.Lists;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixException;
+import org.apache.helix.HelixProperty;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.task.JobQueue;
+import org.apache.helix.task.TaskDriver;
+import org.apache.helix.task.Workflow;
+import org.apache.helix.task.WorkflowConfig;
+import org.apache.log4j.Logger;
+import org.codehaus.jackson.JsonGenerationException;
+import org.codehaus.jackson.map.JsonMappingException;
+import org.restlet.data.Form;
+import org.restlet.data.MediaType;
+import org.restlet.data.Parameter;
+import org.restlet.data.Status;
+import org.restlet.representation.Representation;
+import org.restlet.representation.StringRepresentation;
+import org.restlet.representation.Variant;
+import org.restlet.resource.ServerResource;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+public class JobQueuesResource extends ServerResource {
+ private final static Logger LOG = Logger.getLogger(JobQueuesResource.class);
+
+ public JobQueuesResource() {
+ getVariants().add(new Variant(MediaType.TEXT_PLAIN));
+ getVariants().add(new Variant(MediaType.APPLICATION_JSON));
+ setNegotiated(false);
+ }
+
+ @Override
+ public Representation get() {
+ StringRepresentation presentation = null;
+ try {
+ String clusterName =
+ ResourceUtil.getAttributeFromRequest(getRequest(), ResourceUtil.RequestKey.CLUSTER_NAME);
+ presentation = getHostedEntitiesRepresentation(clusterName);
+ } catch (Exception e) {
+ String error = ClusterRepresentationUtil.getErrorAsJsonStringFromException(e);
+ presentation = new StringRepresentation(error, MediaType.APPLICATION_JSON);
+
+ LOG.error("Fail to get all job queues", e);
+ }
+ return presentation;
+ }
+
+ StringRepresentation getHostedEntitiesRepresentation(String clusterName)
+ throws JsonGenerationException, JsonMappingException, IOException {
+ // Get all resources
+ ZkClient zkClient =
+ ResourceUtil.getAttributeFromCtx(getContext(), ResourceUtil.ContextKey.ZKCLIENT);
+ HelixDataAccessor accessor =
+ ClusterRepresentationUtil.getClusterDataAccessor(zkClient, clusterName);
+ PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+ Map<String, HelixProperty> resourceConfigMap =
+ accessor.getChildValuesMap(keyBuilder.resourceConfigs());
+
+ // Create the result
+ ZNRecord hostedEntitiesRecord = new ZNRecord("JobQueues");
+
+ // Filter out non-workflow resources
+ Iterator<Map.Entry<String, HelixProperty>> it = resourceConfigMap.entrySet().iterator();
+ while (it.hasNext()) {
+ Map.Entry<String, HelixProperty> e = it.next();
+ HelixProperty resource = e.getValue();
+ Map<String, String> simpleFields = resource.getRecord().getSimpleFields();
+ boolean isTerminable = resource.getRecord().getBooleanField(WorkflowConfig.TERMINABLE, true);
+ if (!simpleFields.containsKey(WorkflowConfig.TARGET_STATE)
+ || !simpleFields.containsKey(WorkflowConfig.DAG) || isTerminable) {
+ it.remove();
+ }
+ }
+
+ // Populate the result
+ List<String> allResources = Lists.newArrayList(resourceConfigMap.keySet());
+ hostedEntitiesRecord.setListField("JobQueues", allResources);
+
+ StringRepresentation representation =
+ new StringRepresentation(ClusterRepresentationUtil.ZNRecordToJson(hostedEntitiesRecord),
+ MediaType.APPLICATION_JSON);
+
+ return representation;
+ }
+
+ @Override
+ public Representation post(Representation entity) {
+ try {
+ String clusterName =
+ ResourceUtil.getAttributeFromRequest(getRequest(), ResourceUtil.RequestKey.CLUSTER_NAME);
+ ZkClient zkClient =
+ ResourceUtil.getAttributeFromCtx(getContext(), ResourceUtil.ContextKey.ZKCLIENT);
+
+ Form form = new Form(entity);
+ // Get the job queue and submit it
+ if (form.size() < 1) {
+ throw new HelixException("Yaml job queue config is required!");
+ }
+ Parameter payload = form.get(0);
+ String yamlPayload = payload.getName();
+ if (yamlPayload == null) {
+ throw new HelixException("Yaml job queue config is required!");
+ }
+
+ Workflow workflow = Workflow.parse(yamlPayload);
+ JobQueue.Builder jobQueueCfgBuilder = new JobQueue.Builder(workflow.getName());
+ jobQueueCfgBuilder.fromMap(workflow.getWorkflowConfig().getResourceConfigMap());
+ TaskDriver driver = new TaskDriver(zkClient, clusterName);
+ driver.createQueue(jobQueueCfgBuilder.build());
+
+ getResponse().setEntity(getHostedEntitiesRepresentation(clusterName));
+ getResponse().setStatus(Status.SUCCESS_OK);
+ } catch (Exception e) {
+ getResponse().setEntity(ClusterRepresentationUtil.getErrorAsJsonStringFromException(e),
+ MediaType.APPLICATION_JSON);
+ getResponse().setStatus(Status.SUCCESS_OK);
+ LOG.error("Exception in posting job queue: " + entity, e);
+ }
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/helix/blob/aa2e968f/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/JobResource.java
----------------------------------------------------------------------
diff --git a/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/JobResource.java b/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/JobResource.java
new file mode 100644
index 0000000..a58e223
--- /dev/null
+++ b/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/JobResource.java
@@ -0,0 +1,107 @@
+package org.apache.helix.webapp.resources;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixProperty;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.PropertyPathConfig;
+import org.apache.helix.PropertyType;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.store.HelixPropertyStore;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
+import org.apache.helix.task.JobContext;
+import org.apache.helix.task.TaskUtil;
+import org.apache.log4j.Logger;
+import org.restlet.data.MediaType;
+import org.restlet.representation.Representation;
+import org.restlet.representation.StringRepresentation;
+import org.restlet.representation.Variant;
+import org.restlet.resource.ServerResource;
+
+public class JobResource extends ServerResource {
+ private final static Logger LOG = Logger.getLogger(JobResource.class);
+
+ public JobResource() {
+ getVariants().add(new Variant(MediaType.TEXT_PLAIN));
+ getVariants().add(new Variant(MediaType.APPLICATION_JSON));
+ setNegotiated(false);
+ }
+
+ @Override
+ public Representation get() {
+ StringRepresentation presentation;
+ String clusterName =
+ ResourceUtil.getAttributeFromRequest(getRequest(), ResourceUtil.RequestKey.CLUSTER_NAME);
+ String jobQueueName =
+ ResourceUtil.getAttributeFromRequest(getRequest(), ResourceUtil.RequestKey.JOB_QUEUE);
+ String jobName =
+ ResourceUtil.getAttributeFromRequest(getRequest(), ResourceUtil.RequestKey.JOB);
+
+ try {
+ presentation = getHostedEntitiesRepresentation(clusterName, jobQueueName, jobName);
+ } catch (Exception e) {
+ String error = ClusterRepresentationUtil.getErrorAsJsonStringFromException(e);
+ presentation = new StringRepresentation(error, MediaType.APPLICATION_JSON);
+
+ LOG.error("Fail to get job: " + jobName, e);
+ }
+ return presentation;
+ }
+
+ StringRepresentation getHostedEntitiesRepresentation(String clusterName, String jobQueueName,
+ String jobName) throws Exception {
+
+ ZkClient zkClient =
+ ResourceUtil.getAttributeFromCtx(getContext(), ResourceUtil.ContextKey.ZKCLIENT);
+ HelixDataAccessor accessor =
+ ClusterRepresentationUtil.getClusterDataAccessor(zkClient, clusterName);
+ PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+
+ // Get job queue config
+ String namespacedJobName = TaskUtil.getNamespacedJobName(jobQueueName, jobName);
+ HelixProperty jobConfig = accessor.getProperty(keyBuilder.resourceConfig(namespacedJobName));
+
+ // Get job queue context
+ JobContext ctx = null;
+ String path = PropertyPathConfig.getPath(PropertyType.PROPERTYSTORE, clusterName);
+ HelixPropertyStore<ZNRecord> propertyStore =
+ new ZkHelixPropertyStore<ZNRecord>(new ZkBaseDataAccessor<ZNRecord>(zkClient), path, null);
+
+ ctx = TaskUtil.getJobContext(propertyStore, namespacedJobName);
+
+ // Create the result
+ ZNRecord hostedEntitiesRecord = new ZNRecord(namespacedJobName);
+ if (jobConfig != null) {
+ hostedEntitiesRecord.merge(jobConfig.getRecord());
+ }
+ if (ctx != null) {
+ hostedEntitiesRecord.merge(ctx.getRecord());
+ }
+
+ StringRepresentation representation =
+ new StringRepresentation(ClusterRepresentationUtil.ZNRecordToJson(hostedEntitiesRecord),
+ MediaType.APPLICATION_JSON);
+
+ return representation;
+ }
+}
http://git-wip-us.apache.org/repos/asf/helix/blob/aa2e968f/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/JsonParameters.java
----------------------------------------------------------------------
diff --git a/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/JsonParameters.java b/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/JsonParameters.java
index 19ac71a..41d9a77 100644
--- a/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/JsonParameters.java
+++ b/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/JsonParameters.java
@@ -96,8 +96,11 @@ public class JsonParameters {
final Map<String, ZNRecord> _extraParameterMap = new HashMap<String, ZNRecord>();
public JsonParameters(Representation entity) throws Exception {
- Form form = new Form(entity);
+ this(new Form(entity));
+ }
+
+ public JsonParameters(Form form) throws Exception {
// get parameters in String format
String jsonPayload = form.getFirstValue(JSON_PARAMETERS, true);
if (jsonPayload == null || jsonPayload.isEmpty()) {
@@ -151,7 +154,7 @@ public class JsonParameters {
}
if (!_parameterMap.containsKey(MANAGEMENT_COMMAND)) {
- throw new HelixException("Missing management paramater '" + MANAGEMENT_COMMAND + "'");
+ throw new HelixException("Missing management parameter '" + MANAGEMENT_COMMAND + "'");
}
if (!_parameterMap.get(MANAGEMENT_COMMAND).equalsIgnoreCase(command)
@@ -217,25 +220,17 @@ public class JsonParameters {
}
} else if (command.equalsIgnoreCase(ClusterSetup.addResource)) {
if (!_parameterMap.containsKey(RESOURCE_GROUP_NAME)) {
- throw new HelixException("Missing Json paramaters: '" + RESOURCE_GROUP_NAME + "'");
+ throw new HelixException("Missing Json parameters: '" + RESOURCE_GROUP_NAME + "'");
}
if (!_parameterMap.containsKey(PARTITIONS)) {
- throw new HelixException("Missing Json paramaters: '" + PARTITIONS + "'");
+ throw new HelixException("Missing Json parameters: '" + PARTITIONS + "'");
}
if (!_parameterMap.containsKey(STATE_MODEL_DEF_REF)) {
- throw new HelixException("Missing Json paramaters: '" + STATE_MODEL_DEF_REF + "'");
+ throw new HelixException("Missing Json parameters: '" + STATE_MODEL_DEF_REF + "'");
}
}
}
-
- // temp test
- public static void main(String[] args) throws Exception {
- String jsonPayload =
- "{\"command\":\"resetPartition\",\"resource\": \"DB-1\",\"partition\":\"DB-1_22 DB-1_23\"}";
- Map<String, String> map = ClusterRepresentationUtil.JsonToMap(jsonPayload);
- System.out.println(map);
- }
}
http://git-wip-us.apache.org/repos/asf/helix/blob/aa2e968f/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/ResourceUtil.java
----------------------------------------------------------------------
diff --git a/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/ResourceUtil.java b/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/ResourceUtil.java
new file mode 100644
index 0000000..969bdf5
--- /dev/null
+++ b/helix-admin-webapp/src/main/java/org/apache/helix/webapp/resources/ResourceUtil.java
@@ -0,0 +1,95 @@
+package org.apache.helix.webapp.resources;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.webapp.RestAdminApplication;
+import org.restlet.Context;
+import org.restlet.Request;
+import org.restlet.data.Form;
+import org.restlet.representation.Representation;
+
+public class ResourceUtil {
+ /**
+ * Key enums for getting values from request
+ */
+ public enum RequestKey {
+ CLUSTER_NAME("clusterName"),
+ JOB_QUEUE("jobQueue"),
+ JOB("job");
+
+ private final String _key;
+
+ RequestKey(String key) {
+ _key = key;
+ }
+
+ public String toString() {
+ return _key;
+ }
+ }
+
+ /**
+ * Key enums for getting values from context
+ */
+ public enum ContextKey {
+ ZK_ADDR(RestAdminApplication.ZKSERVERADDRESS),
+ ZKCLIENT(RestAdminApplication.ZKCLIENT);
+
+ private final String _key;
+
+ ContextKey(String key) {
+ _key = key;
+ }
+
+ public String toString() {
+ return _key;
+ }
+ }
+
+ /**
+ * Key enums for getting yaml format parameters
+ */
+ public enum YamlParamKey {
+ NEW_JOB("newJob");
+
+ private final String _key;
+ YamlParamKey(String key) {
+ _key = key;
+ }
+
+ public String toString() {
+ return _key;
+ }
+ }
+
+ public static String getAttributeFromRequest(Request r, RequestKey key) {
+ return (String) r.getAttributes().get(key.toString());
+ }
+
+
+ public static ZkClient getAttributeFromCtx(Context ctx, ContextKey key) {
+ return (ZkClient) ctx.getAttributes().get(key.toString());
+ }
+
+ public static String getYamlParameters(Form form, YamlParamKey key) {
+ return form.getFirstValue(key.toString());
+ }
+}
http://git-wip-us.apache.org/repos/asf/helix/blob/aa2e968f/helix-admin-webapp/src/test/java/org/apache/helix/webapp/AdminTestBase.java
----------------------------------------------------------------------
diff --git a/helix-admin-webapp/src/test/java/org/apache/helix/webapp/AdminTestBase.java b/helix-admin-webapp/src/test/java/org/apache/helix/webapp/AdminTestBase.java
index 5b4411b..fdccee9 100644
--- a/helix-admin-webapp/src/test/java/org/apache/helix/webapp/AdminTestBase.java
+++ b/helix-admin-webapp/src/test/java/org/apache/helix/webapp/AdminTestBase.java
@@ -58,9 +58,10 @@ public class AdminTestBase {
AssertJUnit.assertTrue(_zkServer != null);
ZKClientPool.reset();
- _gZkClient = new ZkClient(ZK_ADDR);
- _gZkClient.setZkSerializer(new ZNRecordSerializer());
- _gSetupTool = new ClusterSetup(ZK_ADDR);
+ _gZkClient =
+ new ZkClient(ZK_ADDR, ZkClient.DEFAULT_CONNECTION_TIMEOUT,
+ ZkClient.DEFAULT_SESSION_TIMEOUT, new ZNRecordSerializer());
+ _gSetupTool = new ClusterSetup(_gZkClient);
// start admin
_adminThread = new AdminThread(ZK_ADDR, ADMIN_PORT);
http://git-wip-us.apache.org/repos/asf/helix/blob/aa2e968f/helix-admin-webapp/src/test/java/org/apache/helix/webapp/AdminTestHelper.java
----------------------------------------------------------------------
diff --git a/helix-admin-webapp/src/test/java/org/apache/helix/webapp/AdminTestHelper.java b/helix-admin-webapp/src/test/java/org/apache/helix/webapp/AdminTestHelper.java
index 9f6946d..5b371cc 100644
--- a/helix-admin-webapp/src/test/java/org/apache/helix/webapp/AdminTestHelper.java
+++ b/helix-admin-webapp/src/test/java/org/apache/helix/webapp/AdminTestHelper.java
@@ -19,9 +19,26 @@ package org.apache.helix.webapp;
* under the License.
*/
+import java.io.IOException;
+import java.io.StringReader;
+import java.io.StringWriter;
+import java.util.Map;
import java.util.concurrent.CountDownLatch;
+import org.apache.helix.ZNRecord;
import org.apache.helix.webapp.HelixAdminWebApp;
+import org.apache.helix.webapp.resources.ClusterRepresentationUtil;
+import org.apache.helix.webapp.resources.JsonParameters;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.restlet.Client;
+import org.restlet.Request;
+import org.restlet.Response;
+import org.restlet.data.MediaType;
+import org.restlet.data.Method;
+import org.restlet.data.Reference;
+import org.restlet.data.Status;
+import org.restlet.representation.Representation;
+import org.testng.Assert;
public class AdminTestHelper {
@@ -66,4 +83,44 @@ public class AdminTestHelper {
}
}
+ public static ZNRecord get(Client client, String url) throws IOException {
+ Reference resourceRef = new Reference(url);
+ Request request = new Request(Method.GET, resourceRef);
+ Response response = client.handle(request);
+ Assert.assertEquals(response.getStatus(), Status.SUCCESS_OK);
+ Representation result = response.getEntity();
+ StringWriter sw = new StringWriter();
+ result.write(sw);
+
+ String responseStr = sw.toString();
+ Assert.assertTrue(responseStr.toLowerCase().indexOf("error") == -1);
+ Assert.assertTrue(responseStr.toLowerCase().indexOf("exception") == -1);
+ ObjectMapper mapper = new ObjectMapper();
+ ZNRecord record = mapper.readValue(new StringReader(responseStr), ZNRecord.class);
+ return record;
+ }
+
+ public static ZNRecord post(Client client, String url, String body)
+ throws IOException {
+ Reference resourceRef = new Reference(url);
+ Request request = new Request(Method.POST, resourceRef);
+
+ request.setEntity(body, MediaType.APPLICATION_ALL);
+
+ Response response = client.handle(request);
+ Assert.assertEquals(response.getStatus(), Status.SUCCESS_OK);
+ Representation result = response.getEntity();
+ StringWriter sw = new StringWriter();
+
+ if (result != null) {
+ result.write(sw);
+ }
+ String responseStr = sw.toString();
+ Assert.assertTrue(responseStr.toLowerCase().indexOf("error") == -1);
+ Assert.assertTrue(responseStr.toLowerCase().indexOf("exception") == -1);
+
+ ObjectMapper mapper = new ObjectMapper();
+ ZNRecord record = mapper.readValue(new StringReader(responseStr), ZNRecord.class);
+ return record;
+ }
}
http://git-wip-us.apache.org/repos/asf/helix/blob/aa2e968f/helix-admin-webapp/src/test/java/resources/TestJobQueuesResource.java
----------------------------------------------------------------------
diff --git a/helix-admin-webapp/src/test/java/resources/TestJobQueuesResource.java b/helix-admin-webapp/src/test/java/resources/TestJobQueuesResource.java
new file mode 100644
index 0000000..6c0e0e1
--- /dev/null
+++ b/helix-admin-webapp/src/test/java/resources/TestJobQueuesResource.java
@@ -0,0 +1,190 @@
+package resources;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+
+import com.google.common.collect.Lists;
+
+import org.apache.helix.TestHelper;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.integration.task.DummyTask;
+import org.apache.helix.integration.task.WorkflowGenerator;
+import org.apache.helix.participant.StateMachineEngine;
+import org.apache.helix.task.Task;
+import org.apache.helix.task.TaskCallbackContext;
+import org.apache.helix.task.TaskDriver;
+import org.apache.helix.task.TaskFactory;
+import org.apache.helix.task.TaskStateModelFactory;
+import org.apache.helix.task.beans.JobBean;
+import org.apache.helix.task.beans.WorkflowBean;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.apache.helix.webapp.AdminTestBase;
+import org.apache.helix.webapp.AdminTestHelper;
+import org.apache.helix.webapp.resources.ClusterRepresentationUtil;
+import org.apache.helix.webapp.resources.JsonParameters;
+import org.apache.helix.webapp.resources.ResourceUtil;
+import org.apache.log4j.Logger;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import org.yaml.snakeyaml.Yaml;
+
+public class TestJobQueuesResource extends AdminTestBase {
+ private static final Logger LOG = Logger.getLogger(TestJobQueuesResource.class);
+
+ @Test
+ public void test() throws Exception {
+
+ String className = TestHelper.getTestClassName();
+ String methodName = TestHelper.getTestMethodName();
+ String clusterName = className + "_" + methodName;
+ final int n = 5;
+ final int p = 20;
+ final int r = 3;
+
+ System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
+
+ _gSetupTool.addCluster(clusterName, true);
+ for (int i = 0; i < n; i++) {
+ String instanceName = "localhost_" + (12918 + i);
+ _gSetupTool.addInstanceToCluster(clusterName, instanceName);
+ }
+
+ // Set up target db
+ _gSetupTool.addResourceToCluster(clusterName, WorkflowGenerator.DEFAULT_TGT_DB, p,
+ "MasterSlave");
+ _gSetupTool.rebalanceStorageCluster(clusterName, WorkflowGenerator.DEFAULT_TGT_DB, r);
+
+ Map<String, TaskFactory> taskFactoryReg = new HashMap<String, TaskFactory>();
+ taskFactoryReg.put("DummyTask", new TaskFactory() {
+ @Override
+ public Task createNewTask(TaskCallbackContext context) {
+ return new DummyTask(context);
+ }
+ });
+
+ // Start dummy participants
+ MockParticipantManager[] participants = new MockParticipantManager[n];
+ for (int i = 0; i < n; i++) {
+ String instanceName = "localhost_" + (12918 + i);
+ participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
+
+ // Register a Task state model factory.
+ StateMachineEngine stateMachine = participants[i].getStateMachineEngine();
+ stateMachine.registerStateModelFactory("Task", new TaskStateModelFactory(participants[i],
+ taskFactoryReg));
+ participants[i].syncStart();
+ }
+
+ // start controller
+ String controllerName = "controller";
+ ClusterControllerManager controller =
+ new ClusterControllerManager(ZK_ADDR, clusterName, controllerName);
+ controller.syncStart();
+
+ boolean result =
+ ClusterStateVerifier
+ .verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
+ clusterName));
+ Assert.assertTrue(result);
+
+ // Start a queue
+ String queueName = "myQueue1";
+ LOG.info("Starting job-queue: " + queueName);
+ String jobQueueYamlConfig = "name: " + queueName;
+
+ String resourceUrl =
+ "http://localhost:" + ADMIN_PORT + "/clusters/" + clusterName + "/jobQueues";
+ ZNRecord postRet = AdminTestHelper.post(_gClient, resourceUrl, jobQueueYamlConfig);
+ LOG.info("Started job-queue: " + queueName + ", ret: " + postRet);
+
+ LOG.info("Getting all job-queues");
+ ZNRecord getRet = AdminTestHelper.get(_gClient, resourceUrl);
+ LOG.info("Got job-queues: " + getRet);
+
+ // Enqueue job
+ resourceUrl =
+ "http://localhost:" + ADMIN_PORT + "/clusters/" + clusterName + "/jobQueues/" + queueName;
+
+ WorkflowBean wfBean = new WorkflowBean();
+ wfBean.name = queueName;
+ JobBean jBean = new JobBean();
+ jBean.name = "myJob1";
+ jBean.command = "DummyTask";
+ jBean.targetResource = WorkflowGenerator.DEFAULT_TGT_DB;
+ jBean.targetPartitionStates = Lists.newArrayList("MASTER");
+ wfBean.jobs = Lists.newArrayList(jBean);
+ String jobYamlConfig = new Yaml().dump(wfBean);
+ LOG.info("Enqueuing a job: " + jobQueueYamlConfig);
+
+ Map<String, String> paraMap = new HashMap<String, String>();
+ paraMap.put(JsonParameters.MANAGEMENT_COMMAND, TaskDriver.DriverCommand.start.toString());
+
+ String postBody =
+ String.format("%s=%s&%s=%s", JsonParameters.JSON_PARAMETERS,
+ ClusterRepresentationUtil.ObjectToJson(paraMap), ResourceUtil.YamlParamKey.NEW_JOB.toString(),
+ jobYamlConfig);
+ postRet = AdminTestHelper.post(_gClient, resourceUrl, postBody);
+ LOG.info("Enqueued job, ret: " + postRet);
+
+ // Get job
+ resourceUrl =
+ "http://localhost:" + ADMIN_PORT + "/clusters/" + clusterName + "/jobQueues/" + queueName
+ + "/" + jBean.name;
+ getRet = AdminTestHelper.get(_gClient, resourceUrl);
+ LOG.info("Got job: " + getRet);
+
+ // Stop job queue
+ resourceUrl =
+ "http://localhost:" + ADMIN_PORT + "/clusters/" + clusterName + "/jobQueues/" + queueName;
+ paraMap.put(JsonParameters.MANAGEMENT_COMMAND, TaskDriver.DriverCommand.stop.toString());
+ postBody = String.format("%s=%s", JsonParameters.JSON_PARAMETERS, ClusterRepresentationUtil.ObjectToJson(paraMap));
+ postRet = AdminTestHelper.post(_gClient, resourceUrl, postBody);
+ LOG.info("Stopped job-queue, ret: " + postRet);
+
+ // Resume job queue
+ paraMap.put(JsonParameters.MANAGEMENT_COMMAND, TaskDriver.DriverCommand.resume.toString());
+ postBody = String.format("%s=%s", JsonParameters.JSON_PARAMETERS, ClusterRepresentationUtil.ObjectToJson(paraMap));
+ postRet = AdminTestHelper.post(_gClient, resourceUrl, postBody);
+ LOG.info("Resumed job-queue, ret: " + postRet);
+
+ // Flush job queue
+ paraMap.put(JsonParameters.MANAGEMENT_COMMAND, "flush");
+ postBody =
+ JsonParameters.JSON_PARAMETERS + "=" + ClusterRepresentationUtil.ObjectToJson(paraMap);
+ postRet = AdminTestHelper.post(_gClient, resourceUrl, postBody);
+ LOG.info("Flushed job-queue, ret: " + postRet);
+
+ // clean up
+ controller.syncStop();
+ for (int i = 0; i < n; i++) {
+ if (participants[i] != null && participants[i].isConnected()) {
+ participants[i].syncStop();
+ }
+ }
+
+ System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
+ }
+}
http://git-wip-us.apache.org/repos/asf/helix/blob/aa2e968f/helix-admin-webapp/src/test/java/resources/TestJsonParameters.java
----------------------------------------------------------------------
diff --git a/helix-admin-webapp/src/test/java/resources/TestJsonParameters.java b/helix-admin-webapp/src/test/java/resources/TestJsonParameters.java
new file mode 100644
index 0000000..383ac21
--- /dev/null
+++ b/helix-admin-webapp/src/test/java/resources/TestJsonParameters.java
@@ -0,0 +1,44 @@
+package resources;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Map;
+
+import org.apache.helix.tools.ClusterSetup;
+import org.apache.helix.webapp.resources.ClusterRepresentationUtil;
+import org.apache.helix.webapp.resources.JsonParameters;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+public class TestJsonParameters {
+ @Test
+ public void test() throws Exception {
+ String jsonPayload =
+ "{\"command\":\"resetPartition\",\"resource\": \"DB-1\",\"partition\":\"DB-1_22 DB-1_23\"}";
+ Map<String, String> map = ClusterRepresentationUtil.JsonToMap(jsonPayload);
+ Assert.assertNotNull(map.get(JsonParameters.MANAGEMENT_COMMAND));
+ Assert.assertEquals(ClusterSetup.resetPartition, map.get(JsonParameters.MANAGEMENT_COMMAND));
+ Assert.assertNotNull(map.get(JsonParameters.RESOURCE));
+ Assert.assertEquals("DB-1", map.get(JsonParameters.RESOURCE));
+ Assert.assertNotNull(map.get(JsonParameters.PARTITION));
+ Assert.assertEquals("DB-1_22 DB-1_23", map.get(JsonParameters.PARTITION));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/helix/blob/aa2e968f/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
index 6a1fb72..3328279 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
@@ -792,7 +792,7 @@ public class ZKHelixManager implements HelixManager, IZkStateListener {
*/
_disconnectTimeHistory.add(System.currentTimeMillis());
if (isFlapping()) {
- LOG.error("instanceName: " + _instanceName + " is flapping. diconnect it. "
+ LOG.error("instanceName: " + _instanceName + " is flapping. disconnect it. "
+ " maxDisconnectThreshold: " + _maxDisconnectThreshold + " disconnects in "
+ _flappingTimeWindowMs + "ms.");
disconnect();
http://git-wip-us.apache.org/repos/asf/helix/blob/aa2e968f/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
index 0bd060a..60dc22c 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
@@ -90,12 +90,13 @@ public class TaskDriver {
private final String _clusterName;
/** Commands which may be parsed from the first argument to main */
- private enum DriverCommand {
+ public enum DriverCommand {
start,
stop,
delete,
resume,
- list
+ list,
+ flush
}
public TaskDriver(HelixManager manager) {
@@ -166,6 +167,10 @@ public class TaskDriver {
break;
case list:
driver.list(resource);
+ break;
+ case flush:
+ driver.flushQueue(resource);
+ break;
default:
throw new IllegalArgumentException("Unknown command " + args[0]);
}
http://git-wip-us.apache.org/repos/asf/helix/blob/aa2e968f/helix-core/src/main/java/org/apache/helix/task/Workflow.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/Workflow.java b/helix-core/src/main/java/org/apache/helix/task/Workflow.java
index 1c0ef40..4ca6e68 100644
--- a/helix-core/src/main/java/org/apache/helix/task/Workflow.java
+++ b/helix-core/src/main/java/org/apache/helix/task/Workflow.java
@@ -132,7 +132,7 @@ public class Workflow {
WorkflowBean wf = (WorkflowBean) yaml.load(reader);
Builder builder = new Builder(wf.name);
- if (wf != null) {
+ if (wf != null && wf.jobs != null) {
for (JobBean job : wf.jobs) {
if (job.name == null) {
throw new IllegalArgumentException("A job must have a name.");
http://git-wip-us.apache.org/repos/asf/helix/blob/aa2e968f/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index df0c74c..92c1d7d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -255,7 +255,7 @@ under the License.
<dependency>
<groupId>org.restlet.jse</groupId>
<artifactId>org.restlet</artifactId>
- <version>2.2.1</version>
+ <version>2.2.3</version>
</dependency>
<dependency>
<groupId>org.apache.helix</groupId>