You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by jx...@apache.org on 2018/01/25 21:49:26 UTC
[35/50] [abbrv] helix git commit: HELIX-661: implement GET
namespace(s)
HELIX-661: implement GET namespace(s)
Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/40710b27
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/40710b27
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/40710b27
Branch: refs/heads/master
Commit: 40710b2713ea0e4f1d4a936396c98ef01f8e2b68
Parents: 4ff98fb
Author: hrzhang <hr...@linkedin.com>
Authored: Wed Dec 13 18:57:40 2017 -0800
Committer: Junkai Xue <jx...@linkedin.com>
Committed: Wed Jan 24 18:32:39 2018 -0800
----------------------------------------------------------------------
.../helix/rest/common/ContextPropertyKeys.java | 3 +-
.../helix/rest/common/HelixRestNamespace.java | 17 +-
.../helix/rest/common/HelixRestUtils.java | 15 +-
.../apache/helix/rest/common/ServletType.java | 56 ++
.../helix/rest/server/HelixRestServer.java | 78 +--
.../rest/server/resources/AbstractResource.java | 40 --
.../rest/server/resources/ClusterAccessor.java | 400 --------------
.../rest/server/resources/InstanceAccessor.java | 545 -------------------
.../rest/server/resources/JobAccessor.java | 200 -------
.../rest/server/resources/ResourceAccessor.java | 278 ----------
.../server/resources/UIResourceAccessor.java | 62 ---
.../rest/server/resources/WorkflowAccessor.java | 325 -----------
.../resources/helix/AbstractHelixResource.java | 79 +++
.../server/resources/helix/ClusterAccessor.java | 400 ++++++++++++++
.../resources/helix/InstanceAccessor.java | 545 +++++++++++++++++++
.../server/resources/helix/JobAccessor.java | 200 +++++++
.../resources/helix/MetadataAccessor.java | 45 ++
.../resources/helix/ResourceAccessor.java | 278 ++++++++++
.../resources/helix/WorkflowAccessor.java | 325 +++++++++++
.../resources/metadata/NamespacesAccessor.java | 47 ++
.../helix/rest/server/TestClusterAccessor.java | 2 +-
.../helix/rest/server/TestHelixRestServer.java | 5 +-
.../helix/rest/server/TestInstanceAccessor.java | 4 +-
.../helix/rest/server/TestJobAccessor.java | 4 +-
.../rest/server/TestNamespacedAPIAccess.java | 55 ++
.../helix/rest/server/TestResourceAccessor.java | 2 +-
.../helix/rest/server/TestWorkflowAccessor.java | 2 +-
27 files changed, 2099 insertions(+), 1913 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/helix/blob/40710b27/helix-rest/src/main/java/org/apache/helix/rest/common/ContextPropertyKeys.java
----------------------------------------------------------------------
diff --git a/helix-rest/src/main/java/org/apache/helix/rest/common/ContextPropertyKeys.java b/helix-rest/src/main/java/org/apache/helix/rest/common/ContextPropertyKeys.java
index ce59abc..ffe1283 100644
--- a/helix-rest/src/main/java/org/apache/helix/rest/common/ContextPropertyKeys.java
+++ b/helix-rest/src/main/java/org/apache/helix/rest/common/ContextPropertyKeys.java
@@ -21,5 +21,6 @@ package org.apache.helix.rest.common;
public enum ContextPropertyKeys {
SERVER_CONTEXT,
- NAMESPACE
+ METADATA,
+ ALL_NAMESPACES
}
http://git-wip-us.apache.org/repos/asf/helix/blob/40710b27/helix-rest/src/main/java/org/apache/helix/rest/common/HelixRestNamespace.java
----------------------------------------------------------------------
diff --git a/helix-rest/src/main/java/org/apache/helix/rest/common/HelixRestNamespace.java b/helix-rest/src/main/java/org/apache/helix/rest/common/HelixRestNamespace.java
index 5d1c8f3..a2fb52c 100644
--- a/helix-rest/src/main/java/org/apache/helix/rest/common/HelixRestNamespace.java
+++ b/helix-rest/src/main/java/org/apache/helix/rest/common/HelixRestNamespace.java
@@ -19,10 +19,15 @@ package org.apache.helix.rest.common;
* under the License.
*/
+import java.util.HashMap;
+import java.util.Map;
+
+
public class HelixRestNamespace {
public enum HelixMetadataStoreType {
- ZOOKEEPER
+ ZOOKEEPER,
+ NO_METADATA_STORE
}
public enum HelixRestNamespaceProperty {
@@ -78,7 +83,8 @@ public class HelixRestNamespace {
if (_name == null || _name.length() == 0) {
throw new IllegalArgumentException("Name of namespace not provided");
}
- if (_metadataStoreAddress == null || _metadataStoreAddress.isEmpty()) {
+ if (_metadataStoreType != HelixMetadataStoreType.NO_METADATA_STORE && (_metadataStoreAddress == null
+ || _metadataStoreAddress.isEmpty())) {
throw new IllegalArgumentException(
String.format("Metadata store address \"%s\" is not valid for namespace %s", _metadataStoreAddress, _name));
}
@@ -96,4 +102,11 @@ public class HelixRestNamespace {
return _metadataStoreAddress;
}
+ public Map<String, String> getRestInfo() {
+ // In REST APIs we currently don't expose metadata store information
+ Map<String, String> ret = new HashMap<>();
+ ret.put(HelixRestNamespaceProperty.NAME.name(), _name);
+ ret.put(HelixRestNamespaceProperty.IS_DEFAULT.name(), String.valueOf(_isDefault));
+ return ret;
+ }
}
http://git-wip-us.apache.org/repos/asf/helix/blob/40710b27/helix-rest/src/main/java/org/apache/helix/rest/common/HelixRestUtils.java
----------------------------------------------------------------------
diff --git a/helix-rest/src/main/java/org/apache/helix/rest/common/HelixRestUtils.java b/helix-rest/src/main/java/org/apache/helix/rest/common/HelixRestUtils.java
index 6c4a3df..39491ab 100644
--- a/helix-rest/src/main/java/org/apache/helix/rest/common/HelixRestUtils.java
+++ b/helix-rest/src/main/java/org/apache/helix/rest/common/HelixRestUtils.java
@@ -21,17 +21,6 @@ package org.apache.helix.rest.common;
public class HelixRestUtils {
/**
- * Generate servlet path spec for a given namespace.
- * @param namespace Name of the namespace
- * @param isDefaultServlet mark this as true to get path spec for the special servlet for default namespace
- * @return servlet path spec
- */
- public static String makeServletPathSpec(String namespace, boolean isDefaultServlet) {
- return isDefaultServlet ? HelixRestNamespace.DEFAULT_NAMESPACE_PATH_SPEC
- : String.format("/namespaces/%s/*", namespace);
- }
-
- /**
* Extract namespace information from servlet path. There are 3 cases:
* 1. /namespaces/namespaceName -> return namespaceName
* 2. /namespaces -> return ""
@@ -40,7 +29,7 @@ public class HelixRestUtils {
* @return Namespace name retrieved from servlet spec.
*/
public static String getNamespaceFromServletPath(String servletPath) {
- if (isDefaultNamespaceServlet(servletPath)) {
+ if (isDefaultServlet(servletPath)) {
return HelixRestNamespace.DEFAULT_NAMESPACE_NAME;
}
@@ -52,7 +41,7 @@ public class HelixRestUtils {
}
}
- private static boolean isDefaultNamespaceServlet(String servletPath) {
+ public static boolean isDefaultServlet(String servletPath) {
// Special servlet for default namespace has path spec "/*", so servletPath is empty
return servletPath == null || servletPath.isEmpty();
}
http://git-wip-us.apache.org/repos/asf/helix/blob/40710b27/helix-rest/src/main/java/org/apache/helix/rest/common/ServletType.java
----------------------------------------------------------------------
diff --git a/helix-rest/src/main/java/org/apache/helix/rest/common/ServletType.java b/helix-rest/src/main/java/org/apache/helix/rest/common/ServletType.java
new file mode 100644
index 0000000..bbff2d6
--- /dev/null
+++ b/helix-rest/src/main/java/org/apache/helix/rest/common/ServletType.java
@@ -0,0 +1,56 @@
+package org.apache.helix.rest.common;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.rest.server.resources.helix.AbstractHelixResource;
+import org.apache.helix.rest.server.resources.metadata.NamespacesAccessor;
+
+public enum ServletType {
+ /**
+ * Servlet serving default API endpoints (/admin/v2/clusters/...)
+ */
+ DEFAULT_SERVLET(HelixRestNamespace.DEFAULT_NAMESPACE_PATH_SPEC,
+ new String[] { AbstractHelixResource.class.getPackage().getName(),
+ NamespacesAccessor.class.getPackage().getName()
+ }),
+
+ /**
+ * Servlet serving namespaced API endpoints (/admin/v2/namespaces/{namespaceName})
+ */
+ COMMON_SERVLET("/namespaces/%s/*",
+ new String[] { AbstractHelixResource.class.getPackage().getName(),
+ });
+
+ private final String _servletPathSpecTemplate;
+ private final String[] _servletPackageArray;
+
+ ServletType(String servletPathSpecTemplate, String[] servletPackageArray) {
+ _servletPathSpecTemplate = servletPathSpecTemplate;
+ _servletPackageArray = servletPackageArray;
+ }
+
+ public String getServletPathSpecTemplate() {
+ return _servletPathSpecTemplate;
+ }
+
+ public String[] getServletPackageArray() {
+ return _servletPackageArray;
+ }
+}
http://git-wip-us.apache.org/repos/asf/helix/blob/40710b27/helix-rest/src/main/java/org/apache/helix/rest/server/HelixRestServer.java
----------------------------------------------------------------------
diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/HelixRestServer.java b/helix-rest/src/main/java/org/apache/helix/rest/server/HelixRestServer.java
index 3737308..e0c1c4e 100644
--- a/helix-rest/src/main/java/org/apache/helix/rest/server/HelixRestServer.java
+++ b/helix-rest/src/main/java/org/apache/helix/rest/server/HelixRestServer.java
@@ -27,11 +27,10 @@ import java.util.Map;
import org.apache.helix.HelixException;
import org.apache.helix.rest.common.ContextPropertyKeys;
import org.apache.helix.rest.common.HelixRestNamespace;
-import org.apache.helix.rest.common.HelixRestUtils;
+import org.apache.helix.rest.common.ServletType;
import org.apache.helix.rest.server.auditlog.AuditLogger;
import org.apache.helix.rest.server.filters.AuditLogFilter;
import org.apache.helix.rest.server.filters.CORSFilter;
-import org.apache.helix.rest.server.resources.AbstractResource;
import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.server.HttpConfiguration;
import org.eclipse.jetty.server.HttpConnectionFactory;
@@ -53,18 +52,13 @@ public class HelixRestServer {
private int _port;
private String _urlPrefix;
private Server _server;
+ private List<HelixRestNamespace> _helixNamespaces;
private ServletContextHandler _servletContextHandler;
private List<AuditLogger> _auditLoggers;
// Key is name of namespace, value of the resource config of that namespace
private Map<String, ResourceConfig> _resourceConfigMap;
- // In additional to regular servlets serving namespaced API endpoints, We have a default servlet
- // serving un-namespaced API (/admin/v2/clusters/...) for default namespace as well. We use this
- // literal as a key in _resourceConfigMap to keep records for default servlet.
- // TODO: try to find a way to serve 2 sets of endpoints of default namespace in 1 servlet
- private static final String DEFAULT_SERVLET_KEY = "DefaultServlet";
-
public HelixRestServer(String zkAddr, int port, String urlPrefix) {
this(zkAddr, port, urlPrefix, Collections.<AuditLogger>emptyList());
}
@@ -77,7 +71,8 @@ public class HelixRestServer {
init(namespaces, port, urlPrefix, auditLoggers);
}
- public HelixRestServer(List<HelixRestNamespace> namespaces, int port, String urlPrefix, List<AuditLogger> auditLoggers) {
+ public HelixRestServer(List<HelixRestNamespace> namespaces, int port, String urlPrefix,
+ List<AuditLogger> auditLoggers) {
init(namespaces, port, urlPrefix, auditLoggers);
}
@@ -93,32 +88,16 @@ public class HelixRestServer {
_auditLoggers = auditLoggers;
_resourceConfigMap = new HashMap<>();
_servletContextHandler = new ServletContextHandler(_server, _urlPrefix);
+ _helixNamespaces = namespaces;
// Initialize all namespaces
try {
- for (HelixRestNamespace namespace : namespaces) {
+ for (HelixRestNamespace namespace : _helixNamespaces) {
LOG.info("Initializing namespace " + namespace.getName());
- if (_resourceConfigMap.containsKey(namespace.getName())) {
- throw new IllegalArgumentException(String.format("Duplicated namespace name \"%s\"", namespace.getName()));
- }
-
- // Create resource and context for namespaced servlet
- _resourceConfigMap.put(namespace.getName(),
- makeResourceConfig(namespace, AbstractResource.class.getPackage().getName()));
- LOG.info("Initializing servlet for namespace " + namespace.getName());
- initServlet(_resourceConfigMap.get(namespace.getName()),
- HelixRestUtils.makeServletPathSpec(namespace.getName(), false));
-
- // Create special resource and context for default namespace servlet
+ prepareServlet(namespace, ServletType.COMMON_SERVLET);
if (namespace.isDefault()) {
- if (_resourceConfigMap.containsKey(DEFAULT_SERVLET_KEY)) {
- throw new IllegalArgumentException("More than 1 default namespaces are provided");
- }
- LOG.info("Creating special servlet for default namespace");
- _resourceConfigMap.put(DEFAULT_SERVLET_KEY,
- makeResourceConfig(namespace, AbstractResource.class.getPackage().getName()));
- initServlet(_resourceConfigMap.get(DEFAULT_SERVLET_KEY),
- HelixRestUtils.makeServletPathSpec(namespace.getName(), true));
+ LOG.info("Creating default servlet for default namespace");
+ prepareServlet(namespace, ServletType.DEFAULT_SERVLET);
}
}
} catch (Exception e) {
@@ -135,12 +114,39 @@ public class HelixRestServer {
}));
}
- private ResourceConfig makeResourceConfig(HelixRestNamespace ns, String... packages) {
+ private void prepareServlet(HelixRestNamespace namespace, ServletType type) {
+ String resourceConfigMapKey = getResourceConfigMapKey(type, namespace);
+ if (_resourceConfigMap.containsKey(resourceConfigMapKey)) {
+ throw new IllegalArgumentException(
+ String.format("Duplicated namespace name \"%s\"", namespace.getName()));
+ }
+
+ // Prepare resource config
+ ResourceConfig config = getResourceConfig(namespace, type);
+ _resourceConfigMap.put(resourceConfigMapKey, config);
+
+ // Initialize servlet
+ initServlet(config, String.format(type.getServletPathSpecTemplate(), namespace.getName()));
+ }
+
+ private String getResourceConfigMapKey(ServletType type, HelixRestNamespace namespace) {
+ return String.format("%s_%s", type.name(), namespace.getName());
+ }
+
+ private ResourceConfig getResourceConfig(HelixRestNamespace namespace, ServletType type) {
ResourceConfig cfg = new ResourceConfig();
- cfg.packages(packages)
- .property(ContextPropertyKeys.SERVER_CONTEXT.name(), new ServerContext(ns.getMetadataStoreAddress()))
- .register(new CORSFilter())
- .register(new AuditLogFilter(_auditLoggers));
+ cfg.packages(type.getServletPackageArray());
+
+ cfg.property(ContextPropertyKeys.SERVER_CONTEXT.name(),
+ new ServerContext(namespace.getMetadataStoreAddress()));
+ if (type == ServletType.DEFAULT_SERVLET) {
+ cfg.property(ContextPropertyKeys.ALL_NAMESPACES.name(), _helixNamespaces);
+ } else {
+ cfg.property(ContextPropertyKeys.METADATA.name(), namespace);
+ }
+
+ cfg.register(new CORSFilter());
+ cfg.register(new AuditLogFilter(_auditLoggers));
return cfg;
}
@@ -186,7 +192,7 @@ public class HelixRestServer {
for (Map.Entry<String, ResourceConfig> e : _resourceConfigMap.entrySet()) {
ServerContext ctx = (ServerContext) e.getValue().getProperty(ContextPropertyKeys.SERVER_CONTEXT.name());
if (ctx == null) {
- LOG.warn("Server context for servlet " + e.getKey() + " is null.");
+ LOG.info("Server context for servlet " + e.getKey() + " is null.");
} else {
LOG.info("Closing context for servlet " + e.getKey());
ctx.close();
http://git-wip-us.apache.org/repos/asf/helix/blob/40710b27/helix-rest/src/main/java/org/apache/helix/rest/server/resources/AbstractResource.java
----------------------------------------------------------------------
diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/AbstractResource.java b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/AbstractResource.java
index a89ae5d..e3c565d 100644
--- a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/AbstractResource.java
+++ b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/AbstractResource.java
@@ -83,42 +83,6 @@ public class AbstractResource {
protected HttpServletRequest _servletRequest;
protected AuditLog.Builder _auditLogBuilder;
- public ZkClient getZkClient() {
- ServerContext serverContext = (ServerContext) _application.getProperties()
- .get(ContextPropertyKeys.SERVER_CONTEXT.name());
- return serverContext.getZkClient();
- }
-
- public HelixAdmin getHelixAdmin() {
- ServerContext serverContext = (ServerContext) _application.getProperties()
- .get(ContextPropertyKeys.SERVER_CONTEXT.name());
- return serverContext.getHelixAdmin();
- }
-
- public ClusterSetup getClusterSetup() {
- ServerContext serverContext = (ServerContext) _application.getProperties()
- .get(ContextPropertyKeys.SERVER_CONTEXT.name());
- return serverContext.getClusterSetup();
- }
-
- public TaskDriver getTaskDriver(String clusterName) {
- ServerContext serverContext = (ServerContext) _application.getProperties()
- .get(ContextPropertyKeys.SERVER_CONTEXT.name());
- return serverContext.getTaskDriver(clusterName);
- }
-
- public ConfigAccessor getConfigAccessor() {
- ServerContext serverContext = (ServerContext) _application.getProperties()
- .get(ContextPropertyKeys.SERVER_CONTEXT.name());
- return serverContext.getConfigAccessor();
- }
-
- public HelixDataAccessor getDataAccssor(String clusterName) {
- ServerContext serverContext = (ServerContext) _application.getProperties()
- .get(ContextPropertyKeys.SERVER_CONTEXT.name());
- return serverContext.getDataAccssor(clusterName);
- }
-
protected void addExceptionToAuditLog(Exception ex) {
if (_auditLogBuilder == null) {
_auditLogBuilder =
@@ -196,10 +160,6 @@ public class AbstractResource {
return sw.toString();
}
- protected static ZNRecord toZNRecord(String data) throws IOException {
- return OBJECT_MAPPER.reader(ZNRecord.class).readValue(data);
- }
-
protected Command getCommand(String commandStr) throws HelixException {
if (commandStr == null) {
throw new HelixException("Unknown command " + commandStr);
http://git-wip-us.apache.org/repos/asf/helix/blob/40710b27/helix-rest/src/main/java/org/apache/helix/rest/server/resources/ClusterAccessor.java
----------------------------------------------------------------------
diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/ClusterAccessor.java b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/ClusterAccessor.java
deleted file mode 100644
index 1c998b7..0000000
--- a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/ClusterAccessor.java
+++ /dev/null
@@ -1,400 +0,0 @@
-package org.apache.helix.rest.server.resources;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.io.IOException;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import javax.ws.rs.DELETE;
-import javax.ws.rs.GET;
-import javax.ws.rs.POST;
-import javax.ws.rs.DefaultValue;
-import javax.ws.rs.PUT;
-import javax.ws.rs.Path;
-import javax.ws.rs.PathParam;
-import javax.ws.rs.QueryParam;
-import javax.ws.rs.core.Response;
-import org.apache.helix.ConfigAccessor;
-import org.apache.helix.HelixAdmin;
-import org.apache.helix.HelixDataAccessor;
-import org.apache.helix.HelixException;
-import org.apache.helix.PropertyKey;
-import org.apache.helix.ZNRecord;
-import org.apache.helix.manager.zk.ZKUtil;
-import org.apache.helix.manager.zk.ZkClient;
-import org.apache.helix.model.ClusterConfig;
-import org.apache.helix.model.HelixConfigScope;
-import org.apache.helix.model.LeaderHistory;
-import org.apache.helix.model.LiveInstance;
-import org.apache.helix.model.Message;
-import org.apache.helix.model.StateModelDefinition;
-import org.apache.helix.model.builder.HelixConfigScopeBuilder;
-import org.apache.helix.tools.ClusterSetup;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-@Path("/clusters")
-public class ClusterAccessor extends AbstractResource {
- private static Logger _logger = LoggerFactory.getLogger(ClusterAccessor.class.getName());
-
- public enum ClusterProperties {
- controller,
- instances,
- liveInstances,
- resources,
- paused,
- maintenance,
- messages,
- stateModelDefinitions,
- clusters
- }
-
- @GET
- public Response getClusters() {
- HelixAdmin helixAdmin = getHelixAdmin();
- List<String> clusters = helixAdmin.getClusters();
-
- Map<String, List<String>> dataMap = new HashMap<>();
- dataMap.put(ClusterProperties.clusters.name(), clusters);
-
- return JSONRepresentation(dataMap);
- }
-
- @GET
- @Path("{clusterId}")
- public Response getClusterInfo(@PathParam("clusterId") String clusterId) {
- if (!isClusterExist(clusterId)) {
- return notFound();
- }
-
- HelixDataAccessor dataAccessor = getDataAccssor(clusterId);
- PropertyKey.Builder keyBuilder = dataAccessor.keyBuilder();
-
- Map<String, Object> clusterInfo = new HashMap<>();
- clusterInfo.put(Properties.id.name(), clusterId);
-
- LiveInstance controller = dataAccessor.getProperty(keyBuilder.controllerLeader());
- if (controller != null) {
- clusterInfo.put(ClusterProperties.controller.name(), controller.getInstanceName());
- } else {
- clusterInfo.put(ClusterProperties.controller.name(), "No Lead Controller!");
- }
-
- boolean paused = (dataAccessor.getProperty(keyBuilder.pause()) == null ? false : true);
- clusterInfo.put(ClusterProperties.paused.name(), paused);
- boolean maintenance =
- (dataAccessor.getProperty(keyBuilder.maintenance()) == null ? false : true);
- clusterInfo.put(ClusterProperties.maintenance.name(), maintenance);
-
- List<String> idealStates = dataAccessor.getChildNames(keyBuilder.idealStates());
- clusterInfo.put(ClusterProperties.resources.name(), idealStates);
- List<String> instances = dataAccessor.getChildNames(keyBuilder.instanceConfigs());
- clusterInfo.put(ClusterProperties.instances.name(), instances);
- List<String> liveInstances = dataAccessor.getChildNames(keyBuilder.liveInstances());
- clusterInfo.put(ClusterProperties.liveInstances.name(), liveInstances);
-
- return JSONRepresentation(clusterInfo);
- }
-
-
- @PUT
- @Path("{clusterId}")
- public Response createCluster(@PathParam("clusterId") String clusterId,
- @DefaultValue("false") @QueryParam("recreate") String recreate) {
- boolean recreateIfExists = Boolean.valueOf(recreate);
- ClusterSetup clusterSetup = getClusterSetup();
-
- try {
- clusterSetup.addCluster(clusterId, recreateIfExists);
- } catch (Exception ex) {
- _logger.error("Failed to create cluster " + clusterId + ", exception: " + ex);
- return serverError(ex);
- }
-
- return created();
- }
-
- @DELETE
- @Path("{clusterId}")
- public Response deleteCluster(@PathParam("clusterId") String clusterId) {
- ClusterSetup clusterSetup = getClusterSetup();
-
- try {
- clusterSetup.deleteCluster(clusterId);
- } catch (HelixException ex) {
- _logger.info(
- "Failed to delete cluster " + clusterId + ", cluster is still in use. Exception: " + ex);
- return badRequest(ex.getMessage());
- } catch (Exception ex) {
- _logger.error("Failed to delete cluster " + clusterId + ", exception: " + ex);
- return serverError(ex);
- }
-
- return OK();
- }
-
- @POST
- @Path("{clusterId}")
- public Response updateCluster(@PathParam("clusterId") String clusterId,
- @QueryParam("command") String commandStr, @QueryParam("superCluster") String superCluster,
- String content) {
- Command command;
- try {
- command = getCommand(commandStr);
- } catch (HelixException ex) {
- return badRequest(ex.getMessage());
- }
-
- ClusterSetup clusterSetup = getClusterSetup();
- HelixAdmin helixAdmin = getHelixAdmin();
-
- switch (command) {
- case activate:
- if (superCluster == null) {
- return badRequest("Super Cluster name is missing!");
- }
- try {
- clusterSetup.activateCluster(clusterId, superCluster, true);
- } catch (Exception ex) {
- _logger.error("Failed to add cluster " + clusterId + " to super cluster " + superCluster);
- return serverError(ex);
- }
- break;
-
- case expand:
- try {
- clusterSetup.expandCluster(clusterId);
- } catch (Exception ex) {
- _logger.error("Failed to expand cluster " + clusterId);
- return serverError(ex);
- }
- break;
-
- case enable:
- try {
- helixAdmin.enableCluster(clusterId, true);
- } catch (Exception ex) {
- _logger.error("Failed to enable cluster " + clusterId);
- return serverError(ex);
- }
- break;
-
- case disable:
- try {
- helixAdmin.enableCluster(clusterId, false);
- } catch (Exception ex) {
- _logger.error("Failed to disable cluster " + clusterId);
- return serverError(ex);
- }
- break;
- case enableMaintenanceMode:
- try {
- helixAdmin.enableMaintenanceMode(clusterId, true, content);
- } catch (Exception ex) {
- _logger.error("Failed to enable maintenance mode " + clusterId);
- return serverError(ex);
- }
- break;
- case disableMaintenanceMode:
- try {
- helixAdmin.enableMaintenanceMode(clusterId, false);
- } catch (Exception ex) {
- _logger.error("Failed to disable maintenance mode " + clusterId);
- return serverError(ex);
- }
- break;
- default:
- return badRequest("Unsupported command " + command);
- }
-
- return OK();
- }
-
-
- @GET
- @Path("{clusterId}/configs")
- public Response getClusterConfig(@PathParam("clusterId") String clusterId) {
- ConfigAccessor accessor = getConfigAccessor();
- ClusterConfig config = null;
- try {
- config = accessor.getClusterConfig(clusterId);
- } catch (HelixException ex) {
- // cluster not found.
- _logger.info("Failed to get cluster config for cluster " + clusterId
- + ", cluster not found, Exception: " + ex);
- } catch (Exception ex) {
- _logger.error("Failed to get cluster config for cluster " + clusterId + " Exception: " + ex);
- return serverError(ex);
- }
- if (config == null) {
- return notFound();
- }
- return JSONRepresentation(config.getRecord());
- }
-
- @POST
- @Path("{clusterId}/configs")
- public Response updateClusterConfig(
- @PathParam("clusterId") String clusterId, @QueryParam("command") String commandStr,
- String content) {
- Command command;
- try {
- command = getCommand(commandStr);
- } catch (HelixException ex) {
- return badRequest(ex.getMessage());
- }
-
- ZNRecord record;
- try {
- record = toZNRecord(content);
- } catch (IOException e) {
- _logger.error("Failed to deserialize user's input " + content + ", Exception: " + e);
- return badRequest("Input is not a valid ZNRecord!");
- }
-
- if (!record.getId().equals(clusterId)) {
- return badRequest("ID does not match the cluster name in input!");
- }
-
- ClusterConfig config = new ClusterConfig(record);
- ConfigAccessor configAccessor = getConfigAccessor();
- try {
- switch (command) {
- case update:
- configAccessor.updateClusterConfig(clusterId, config);
- break;
- case delete: {
- HelixConfigScope clusterScope =
- new HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.CLUSTER)
- .forCluster(clusterId).build();
- configAccessor.remove(clusterScope, config.getRecord());
- }
- break;
-
- default:
- return badRequest("Unsupported command " + commandStr);
- }
- } catch (HelixException ex) {
- return notFound(ex.getMessage());
- } catch (Exception ex) {
- _logger.error(
- "Failed to " + command + " cluster config, cluster " + clusterId + " new config: "
- + content + ", Exception: " + ex);
- return serverError(ex);
- }
- return OK();
- }
-
- @GET
- @Path("{clusterId}/controller")
- public Response getClusterController(@PathParam("clusterId") String clusterId) {
- HelixDataAccessor dataAccessor = getDataAccssor(clusterId);
- Map<String, Object> controllerInfo = new HashMap<>();
- controllerInfo.put(Properties.id.name(), clusterId);
-
- LiveInstance leader = dataAccessor.getProperty(dataAccessor.keyBuilder().controllerLeader());
- if (leader != null) {
- controllerInfo.put(ClusterProperties.controller.name(), leader.getInstanceName());
- controllerInfo.putAll(leader.getRecord().getSimpleFields());
- } else {
- controllerInfo.put(ClusterProperties.controller.name(), "No Lead Controller!");
- }
-
- return JSONRepresentation(controllerInfo);
- }
-
- @GET
- @Path("{clusterId}/controller/history")
- public Response getClusterControllerHistory(@PathParam("clusterId") String clusterId) {
- HelixDataAccessor dataAccessor = getDataAccssor(clusterId);
- Map<String, Object> controllerHistory = new HashMap<>();
- controllerHistory.put(Properties.id.name(), clusterId);
-
- LeaderHistory history =
- dataAccessor.getProperty(dataAccessor.keyBuilder().controllerLeaderHistory());
- if (history != null) {
- controllerHistory.put(Properties.history.name(), history.getHistoryList());
- } else {
- controllerHistory.put(Properties.history.name(), Collections.emptyList());
- }
-
- return JSONRepresentation(controllerHistory);
- }
-
- @GET
- @Path("{clusterId}/controller/messages")
- public Response getClusterControllerMessages(@PathParam("clusterId") String clusterId) {
- HelixDataAccessor dataAccessor = getDataAccssor(clusterId);
-
- Map<String, Object> controllerMessages = new HashMap<>();
- controllerMessages.put(Properties.id.name(), clusterId);
-
- List<String> messages =
- dataAccessor.getChildNames(dataAccessor.keyBuilder().controllerMessages());
- controllerMessages.put(ClusterProperties.messages.name(), messages);
- controllerMessages.put(Properties.count.name(), messages.size());
-
- return JSONRepresentation(controllerMessages);
- }
-
- @GET
- @Path("{clusterId}/controller/messages/{messageId}")
- public Response getClusterControllerMessages(@PathParam("clusterId") String clusterId, @PathParam("messageId") String messageId) {
- HelixDataAccessor dataAccessor = getDataAccssor(clusterId);
- Message message = dataAccessor.getProperty(
- dataAccessor.keyBuilder().controllerMessage(messageId));
- return JSONRepresentation(message.getRecord());
- }
-
- @GET
- @Path("{clusterId}/statemodeldefs")
- public Response getClusterStateModelDefinitions(@PathParam("clusterId") String clusterId) {
- HelixDataAccessor dataAccessor = getDataAccssor(clusterId);
- List<String> stateModelDefs =
- dataAccessor.getChildNames(dataAccessor.keyBuilder().stateModelDefs());
-
- Map<String, Object> clusterStateModelDefs = new HashMap<>();
- clusterStateModelDefs.put(Properties.id.name(), clusterId);
- clusterStateModelDefs.put(ClusterProperties.stateModelDefinitions.name(), stateModelDefs);
-
- return JSONRepresentation(clusterStateModelDefs);
- }
-
- @GET
- @Path("{clusterId}/statemodeldefs/{statemodel}")
- public Response getClusterStateModelDefinition(@PathParam("clusterId") String clusterId,
- @PathParam("statemodel") String statemodel) {
- HelixDataAccessor dataAccessor = getDataAccssor(clusterId);
- StateModelDefinition stateModelDef =
- dataAccessor.getProperty(dataAccessor.keyBuilder().stateModelDef(statemodel));
-
- return JSONRepresentation(stateModelDef.getRecord());
- }
-
- private boolean isClusterExist(String cluster) {
- ZkClient zkClient = getZkClient();
- if (ZKUtil.isClusterSetup(cluster, zkClient)) {
- return true;
- }
- return false;
- }
-}
http://git-wip-us.apache.org/repos/asf/helix/blob/40710b27/helix-rest/src/main/java/org/apache/helix/rest/server/resources/InstanceAccessor.java
----------------------------------------------------------------------
diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/InstanceAccessor.java b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/InstanceAccessor.java
deleted file mode 100644
index eeecba9..0000000
--- a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/InstanceAccessor.java
+++ /dev/null
@@ -1,545 +0,0 @@
-package org.apache.helix.rest.server.resources;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import javax.ws.rs.DELETE;
-import javax.ws.rs.GET;
-import javax.ws.rs.POST;
-import javax.ws.rs.PUT;
-import javax.ws.rs.Path;
-import javax.ws.rs.PathParam;
-import javax.ws.rs.QueryParam;
-import javax.ws.rs.core.Response;
-
-import org.apache.helix.HelixAdmin;
-import org.apache.helix.HelixDataAccessor;
-import org.apache.helix.HelixException;
-import org.apache.helix.ZNRecord;
-import org.apache.helix.model.ClusterConfig;
-import org.apache.helix.model.CurrentState;
-import org.apache.helix.model.Error;
-import org.apache.helix.model.HealthStat;
-import org.apache.helix.model.InstanceConfig;
-import org.apache.helix.model.LiveInstance;
-import org.apache.helix.model.Message;
-import org.apache.helix.model.ParticipantHistory;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.codehaus.jackson.JsonNode;
-import org.codehaus.jackson.node.ArrayNode;
-import org.codehaus.jackson.node.JsonNodeFactory;
-import org.codehaus.jackson.node.ObjectNode;
-
-@Path("/clusters/{clusterId}/instances")
-public class InstanceAccessor extends AbstractResource {
- private final static Logger _logger = LoggerFactory.getLogger(InstanceAccessor.class);
-
- public enum InstanceProperties {
- instances,
- online,
- disabled,
- config,
- liveInstance,
- resource,
- resources,
- partitions,
- errors,
- new_messages,
- read_messages,
- total_message_count,
- read_message_count,
- healthreports,
- instanceTags
- }
-
- @GET
- public Response getInstances(@PathParam("clusterId") String clusterId) {
- HelixDataAccessor accessor = getDataAccssor(clusterId);
- ObjectNode root = JsonNodeFactory.instance.objectNode();
- root.put(Properties.id.name(), JsonNodeFactory.instance.textNode(clusterId));
-
- ArrayNode instancesNode = root.putArray(InstanceProperties.instances.name());
- ArrayNode onlineNode = root.putArray(InstanceProperties.online.name());
- ArrayNode disabledNode = root.putArray(InstanceProperties.disabled.name());
-
- List<String> instances = accessor.getChildNames(accessor.keyBuilder().instanceConfigs());
-
- if (instances != null) {
- instancesNode.addAll((ArrayNode) OBJECT_MAPPER.valueToTree(instances));
- } else {
- return notFound();
- }
-
- List<String> liveInstances = accessor.getChildNames(accessor.keyBuilder().liveInstances());
- ClusterConfig clusterConfig = accessor.getProperty(accessor.keyBuilder().clusterConfig());
-
- for (String instanceName : instances) {
- InstanceConfig instanceConfig =
- accessor.getProperty(accessor.keyBuilder().instanceConfig(instanceName));
- if (instanceConfig != null) {
- if (!instanceConfig.getInstanceEnabled() || (clusterConfig.getDisabledInstances() != null
- && clusterConfig.getDisabledInstances().containsKey(instanceName))) {
- disabledNode.add(JsonNodeFactory.instance.textNode(instanceName));
- }
-
- if (liveInstances.contains(instanceName)){
- onlineNode.add(JsonNodeFactory.instance.textNode(instanceName));
- }
- }
- }
-
- return JSONRepresentation(root);
- }
-
- @POST
- public Response updateInstances(@PathParam("clusterId") String clusterId,
- @QueryParam("command") String command, String content) {
- Command cmd;
- try {
- cmd = Command.valueOf(command);
- } catch (Exception e) {
- return badRequest("Invalid command : " + command);
- }
-
- HelixAdmin admin = getHelixAdmin();
- try {
- JsonNode node = null;
- if (content.length() != 0) {
- node = OBJECT_MAPPER.readTree(content);
- }
- if (node == null) {
- return badRequest("Invalid input for content : " + content);
- }
- List<String> enableInstances = OBJECT_MAPPER
- .readValue(node.get(InstanceProperties.instances.name()).toString(),
- OBJECT_MAPPER.getTypeFactory().constructCollectionType(List.class, String.class));
- switch (cmd) {
- case enable:
- admin.enableInstance(clusterId, enableInstances, true);
-
- break;
- case disable:
- admin.enableInstance(clusterId, enableInstances, false);
- break;
- default:
- _logger.error("Unsupported command :" + command);
- return badRequest("Unsupported command :" + command);
- }
- } catch (Exception e) {
- _logger.error("Failed in updating instances : " + content, e);
- return badRequest(e.getMessage());
- }
- return OK();
- }
-
- @GET
- @Path("{instanceName}")
- public Response getInstance(@PathParam("clusterId") String clusterId,
- @PathParam("instanceName") String instanceName) throws IOException {
- HelixDataAccessor accessor = getDataAccssor(clusterId);
- Map<String, Object> instanceMap = new HashMap<>();
- instanceMap.put(Properties.id.name(), JsonNodeFactory.instance.textNode(instanceName));
- instanceMap.put(InstanceProperties.liveInstance.name(), null);
-
- InstanceConfig instanceConfig =
- accessor.getProperty(accessor.keyBuilder().instanceConfig(instanceName));
- LiveInstance liveInstance =
- accessor.getProperty(accessor.keyBuilder().liveInstance(instanceName));
-
- if (instanceConfig != null) {
- instanceMap.put(InstanceProperties.config.name(), instanceConfig.getRecord());
- } else {
- return notFound();
- }
-
- if (liveInstance != null) {
- instanceMap.put(InstanceProperties.liveInstance.name(), liveInstance.getRecord());
- }
-
- return JSONRepresentation(instanceMap);
- }
-
- @PUT
- @Path("{instanceName}")
- public Response addInstance(@PathParam("clusterId") String clusterId,
- @PathParam("instanceName") String instanceName, String content) {
- HelixAdmin admin = getHelixAdmin();
- ZNRecord record;
- try {
- record = toZNRecord(content);
- } catch (IOException e) {
- _logger.error("Failed to deserialize user's input " + content + ", Exception: " + e);
- return badRequest("Input is not a vaild ZNRecord!");
- }
-
- try {
- admin.addInstance(clusterId, new InstanceConfig(record));
- } catch (Exception ex) {
- _logger.error("Error in adding an instance: " + instanceName, ex);
- return serverError(ex);
- }
-
- return OK();
- }
-
- @POST
- @Path("{instanceName}")
- public Response updateInstance(@PathParam("clusterId") String clusterId,
- @PathParam("instanceName") String instanceName, @QueryParam("command") String command,
- String content) {
- Command cmd;
- try {
- cmd = Command.valueOf(command);
- } catch (Exception e) {
- return badRequest("Invalid command : " + command);
- }
-
- HelixAdmin admin = getHelixAdmin();
- try {
- JsonNode node = null;
- if (content.length() != 0) {
- node = OBJECT_MAPPER.readTree(content);
- }
-
- switch (cmd) {
- case enable:
- admin.enableInstance(clusterId, instanceName, true);
- break;
- case disable:
- admin.enableInstance(clusterId, instanceName, false);
- break;
- case reset:
- if (!validInstance(node, instanceName)) {
- return badRequest("Instance names are not match!");
- }
- admin.resetPartition(clusterId, instanceName,
- node.get(InstanceProperties.resource.name()).toString(), (List<String>) OBJECT_MAPPER
- .readValue(node.get(InstanceProperties.partitions.name()).toString(),
- OBJECT_MAPPER.getTypeFactory()
- .constructCollectionType(List.class, String.class)));
- break;
- case addInstanceTag:
- if (!validInstance(node, instanceName)) {
- return badRequest("Instance names are not match!");
- }
- for (String tag : (List<String>) OBJECT_MAPPER
- .readValue(node.get(InstanceProperties.instanceTags.name()).toString(),
- OBJECT_MAPPER.getTypeFactory().constructCollectionType(List.class, String.class))) {
- admin.addInstanceTag(clusterId, instanceName, tag);
- }
- break;
- case removeInstanceTag:
- if (!validInstance(node, instanceName)) {
- return badRequest("Instance names are not match!");
- }
- for (String tag : (List<String>) OBJECT_MAPPER
- .readValue(node.get(InstanceProperties.instanceTags.name()).toString(),
- OBJECT_MAPPER.getTypeFactory().constructCollectionType(List.class, String.class))) {
- admin.removeInstanceTag(clusterId, instanceName, tag);
- }
- break;
- case enablePartitions:
- admin.enablePartition(true, clusterId, instanceName,
- node.get(InstanceProperties.resource.name()).getTextValue(),
- (List<String>) OBJECT_MAPPER
- .readValue(node.get(InstanceProperties.partitions.name()).toString(),
- OBJECT_MAPPER.getTypeFactory()
- .constructCollectionType(List.class, String.class)));
- break;
- case disablePartitions:
- admin.enablePartition(false, clusterId, instanceName,
- node.get(InstanceProperties.resource.name()).getTextValue(),
- (List<String>) OBJECT_MAPPER
- .readValue(node.get(InstanceProperties.partitions.name()).toString(),
- OBJECT_MAPPER.getTypeFactory().constructCollectionType(List.class, String.class)));
- break;
- default:
- _logger.error("Unsupported command :" + command);
- return badRequest("Unsupported command :" + command);
- }
- } catch (Exception e) {
- _logger.error("Failed in updating instance : " + instanceName, e);
- return badRequest(e.getMessage());
- }
- return OK();
- }
-
- @DELETE
- @Path("{instanceName}")
- public Response deleteInstance(@PathParam("clusterId") String clusterId,
- @PathParam("instanceName") String instanceName) {
- HelixAdmin admin = getHelixAdmin();
- try {
- InstanceConfig instanceConfig = admin.getInstanceConfig(clusterId, instanceName);
- admin.dropInstance(clusterId, instanceConfig);
- } catch (HelixException e) {
- return badRequest(e.getMessage());
- }
-
- return OK();
- }
-
- @GET
- @Path("{instanceName}/configs")
- public Response getInstanceConfig(@PathParam("clusterId") String clusterId,
- @PathParam("instanceName") String instanceName) throws IOException {
- HelixDataAccessor accessor = getDataAccssor(clusterId);
- InstanceConfig instanceConfig =
- accessor.getProperty(accessor.keyBuilder().instanceConfig(instanceName));
-
- if (instanceConfig != null) {
- return JSONRepresentation(instanceConfig.getRecord());
- }
-
- return notFound();
- }
-
- @PUT
- @Path("{instanceName}/configs")
- public Response updateInstanceConfig(@PathParam("clusterId") String clusterId,
- @PathParam("instanceName") String instanceName, String content) throws IOException {
- HelixAdmin admin = getHelixAdmin();
- ZNRecord record;
- try {
- record = toZNRecord(content);
- } catch (IOException e) {
- _logger.error("Failed to deserialize user's input " + content + ", Exception: " + e);
- return badRequest("Input is not a vaild ZNRecord!");
- }
-
- try {
- admin.setInstanceConfig(clusterId, instanceName, new InstanceConfig(record));
- } catch (Exception ex) {
- _logger.error("Error in update instance config: " + instanceName, ex);
- return serverError(ex);
- }
-
- return OK();
- }
-
- @GET
- @Path("{instanceName}/resources")
- public Response getResourcesOnInstance(@PathParam("clusterId") String clusterId,
- @PathParam("instanceName") String instanceName) throws IOException {
- HelixDataAccessor accessor = getDataAccssor(clusterId);
-
- ObjectNode root = JsonNodeFactory.instance.objectNode();
- root.put(Properties.id.name(), instanceName);
- ArrayNode resourcesNode = root.putArray(InstanceProperties.resources.name());
-
- List<String> sessionIds = accessor.getChildNames(accessor.keyBuilder().sessions(instanceName));
- if (sessionIds == null || sessionIds.size() == 0) {
- return null;
- }
-
- // Only get resource list from current session id
- String currentSessionId = sessionIds.get(0);
-
- List<String> resources =
- accessor.getChildNames(accessor.keyBuilder().currentStates(instanceName, currentSessionId));
- if (resources != null && resources.size() > 0) {
- resourcesNode.addAll((ArrayNode) OBJECT_MAPPER.valueToTree(resources));
- }
-
- return JSONRepresentation(root);
- }
-
- @GET
- @Path("{instanceName}/resources/{resourceName}")
- public Response getResourceOnInstance(@PathParam("clusterId") String clusterId,
- @PathParam("instanceName") String instanceName,
- @PathParam("resourceName") String resourceName) throws IOException {
- HelixDataAccessor accessor = getDataAccssor(clusterId);
- List<String> sessionIds = accessor.getChildNames(accessor.keyBuilder().sessions(instanceName));
- if (sessionIds == null || sessionIds.size() == 0) {
- return notFound();
- }
-
- // Only get resource list from current session id
- String currentSessionId = sessionIds.get(0);
- CurrentState resourceCurrentState = accessor
- .getProperty(accessor.keyBuilder().currentState(instanceName, currentSessionId, resourceName));
- if (resourceCurrentState != null) {
- return JSONRepresentation(resourceCurrentState.getRecord());
- }
-
- return notFound();
- }
-
- @GET
- @Path("{instanceName}/errors")
- public Response getErrorsOnInstance(@PathParam("clusterId") String clusterId,
- @PathParam("instanceName") String instanceName) throws IOException {
- HelixDataAccessor accessor = getDataAccssor(clusterId);
-
- ObjectNode root = JsonNodeFactory.instance.objectNode();
- root.put(Properties.id.name(), instanceName);
- ObjectNode errorsNode = JsonNodeFactory.instance.objectNode();
-
- List<String> sessionIds =
- accessor.getChildNames(accessor.keyBuilder().errors(instanceName));
-
- if (sessionIds == null || sessionIds.size() == 0) {
- return notFound();
- }
-
- for (String sessionId : sessionIds) {
- List<String> resources =
- accessor.getChildNames(accessor.keyBuilder().errors(instanceName, sessionId));
- if (resources != null) {
- ObjectNode resourcesNode = JsonNodeFactory.instance.objectNode();
- for (String resourceName : resources) {
- List<String> partitions = accessor
- .getChildNames(accessor.keyBuilder().errors(instanceName, sessionId, resourceName));
- if (partitions != null) {
- ArrayNode partitionsNode = resourcesNode.putArray(resourceName);
- partitionsNode.addAll((ArrayNode) OBJECT_MAPPER.valueToTree(partitions));
- }
- }
- errorsNode.put(sessionId, resourcesNode);
- }
- }
- root.put(InstanceProperties.errors.name(), errorsNode);
-
- return JSONRepresentation(root);
- }
-
- @GET
- @Path("{instanceName}/errors/{sessionId}/{resourceName}/{partitionName}")
- public Response getErrorsOnInstance(@PathParam("clusterId") String clusterId,
- @PathParam("instanceName") String instanceName, @PathParam("sessionId") String sessionId,
- @PathParam("resourceName") String resourceName,
- @PathParam("partitionName") String partitionName) throws IOException {
- HelixDataAccessor accessor = getDataAccssor(clusterId);
- Error error = accessor.getProperty(accessor.keyBuilder()
- .stateTransitionError(instanceName, sessionId, resourceName, partitionName));
- if (error != null) {
- return JSONRepresentation(error.getRecord());
- }
-
- return notFound();
- }
-
- @GET
- @Path("{instanceName}/history")
- public Response getHistoryOnInstance(@PathParam("clusterId") String clusterId,
- @PathParam("instanceName") String instanceName) throws IOException {
- HelixDataAccessor accessor = getDataAccssor(clusterId);
- ParticipantHistory history =
- accessor.getProperty(accessor.keyBuilder().participantHistory(instanceName));
- if (history != null) {
- return JSONRepresentation(history.getRecord());
- }
- return notFound();
- }
-
- @GET
- @Path("{instanceName}/messages")
- public Response getMessagesOnInstance(@PathParam("clusterId") String clusterId,
- @PathParam("instanceName") String instanceName) throws IOException {
- HelixDataAccessor accessor = getDataAccssor(clusterId);
-
- ObjectNode root = JsonNodeFactory.instance.objectNode();
- root.put(Properties.id.name(), instanceName);
- ArrayNode newMessages = root.putArray(InstanceProperties.new_messages.name());
- ArrayNode readMessages = root.putArray(InstanceProperties.read_messages.name());
-
-
- List<String> messages =
- accessor.getChildNames(accessor.keyBuilder().messages(instanceName));
- if (messages == null || messages.size() == 0) {
- return notFound();
- }
-
- for (String messageName : messages) {
- Message message = accessor.getProperty(accessor.keyBuilder().message(instanceName, messageName));
- if (message.getMsgState() == Message.MessageState.NEW) {
- newMessages.add(messageName);
- }
-
- if (message.getMsgState() == Message.MessageState.READ) {
- readMessages.add(messageName);
- }
- }
-
- root.put(InstanceProperties.total_message_count.name(),
- newMessages.size() + readMessages.size());
- root.put(InstanceProperties.read_message_count.name(), readMessages.size());
-
- return JSONRepresentation(root);
- }
-
- @GET
- @Path("{instanceName}/messages/{messageId}")
- public Response getMessageOnInstance(@PathParam("clusterId") String clusterId,
- @PathParam("instanceName") String instanceName,
- @PathParam("messageId") String messageId) throws IOException {
- HelixDataAccessor accessor = getDataAccssor(clusterId);
- Message message = accessor.getProperty(accessor.keyBuilder().message(instanceName, messageId));
- if (message != null) {
- return JSONRepresentation(message.getRecord());
- }
-
- return notFound();
- }
-
- @GET
- @Path("{instanceName}/healthreports")
- public Response getHealthReportsOnInstance(@PathParam("clusterId") String clusterId,
- @PathParam("instanceName") String instanceName) throws IOException {
- HelixDataAccessor accessor = getDataAccssor(clusterId);
-
- ObjectNode root = JsonNodeFactory.instance.objectNode();
- root.put(Properties.id.name(), instanceName);
- ArrayNode healthReportsNode = root.putArray(InstanceProperties.healthreports.name());
-
- List<String> healthReports =
- accessor.getChildNames(accessor.keyBuilder().healthReports(instanceName));
-
- if (healthReports != null && healthReports.size() > 0) {
- healthReportsNode.addAll((ArrayNode) OBJECT_MAPPER.valueToTree(healthReports));
- }
-
- return JSONRepresentation(root);
- }
-
- @GET
- @Path("{instanceName}/healthreports/{reportName}")
- public Response getHealthReportsOnInstance(@PathParam("clusterId") String clusterId,
- @PathParam("instanceName") String instanceName,
- @PathParam("reportName") String reportName) throws IOException {
- HelixDataAccessor accessor = getDataAccssor(clusterId);
- HealthStat healthStat =
- accessor.getProperty(accessor.keyBuilder().healthReport(instanceName, reportName));
- if (healthStat != null) {
- return JSONRepresentation(healthStat);
- }
-
- return notFound();
- }
-
- private boolean validInstance(JsonNode node, String instanceName) {
- return instanceName.equals(node.get(Properties.id.name()).getValueAsText());
- }
-}
http://git-wip-us.apache.org/repos/asf/helix/blob/40710b27/helix-rest/src/main/java/org/apache/helix/rest/server/resources/JobAccessor.java
----------------------------------------------------------------------
diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/JobAccessor.java b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/JobAccessor.java
deleted file mode 100644
index db6e6ad..0000000
--- a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/JobAccessor.java
+++ /dev/null
@@ -1,200 +0,0 @@
-package org.apache.helix.rest.server.resources;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-
-import java.io.IOException;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
-import javax.ws.rs.DELETE;
-import javax.ws.rs.GET;
-import javax.ws.rs.PUT;
-import javax.ws.rs.Path;
-import javax.ws.rs.PathParam;
-import javax.ws.rs.core.Response;
-
-import org.apache.helix.HelixException;
-import org.apache.helix.ZNRecord;
-import org.apache.helix.task.JobConfig;
-import org.apache.helix.task.JobContext;
-import org.apache.helix.task.TaskConfig;
-import org.apache.helix.task.TaskDriver;
-import org.apache.helix.task.WorkflowConfig;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.codehaus.jackson.node.ArrayNode;
-import org.codehaus.jackson.node.JsonNodeFactory;
-import org.codehaus.jackson.node.ObjectNode;
-
-@Path("/clusters/{clusterId}/workflows/{workflowName}/jobs")
-public class JobAccessor extends AbstractResource {
- private static Logger _logger = LoggerFactory.getLogger(JobAccessor.class.getName());
-
- public enum JobProperties {
- Jobs,
- JobConfig,
- JobContext,
- TASK_COMMAND
- }
-
- @GET
- public Response getJobs(@PathParam("clusterId") String clusterId,
- @PathParam("workflowName") String workflowName) {
- TaskDriver driver = getTaskDriver(clusterId);
- WorkflowConfig workflowConfig = driver.getWorkflowConfig(workflowName);
- ObjectNode root = JsonNodeFactory.instance.objectNode();
-
- if (workflowConfig == null) {
- return badRequest(String.format("Workflow %s is not found!", workflowName));
- }
-
- Set<String> jobs = workflowConfig.getJobDag().getAllNodes();
- root.put(Properties.id.name(), JobProperties.Jobs.name());
- ArrayNode jobsNode = root.putArray(JobProperties.Jobs.name());
-
- if (jobs != null) {
- jobsNode.addAll((ArrayNode) OBJECT_MAPPER.valueToTree(jobs));
- }
- return JSONRepresentation(root);
- }
-
- @GET
- @Path("{jobName}")
- public Response getJob(@PathParam("clusterId") String clusterId,
- @PathParam("workflowName") String workflowName, @PathParam("jobName") String jobName) {
- TaskDriver driver = getTaskDriver(clusterId);
- Map<String, ZNRecord> jobMap = new HashMap<>();
-
-
- JobConfig jobConfig = driver.getJobConfig(jobName);
- if (jobConfig != null) {
- jobMap.put(JobProperties.JobConfig.name(), jobConfig.getRecord());
- } else {
- return badRequest(String.format("Job config for %s does not exists", jobName));
- }
-
- JobContext jobContext =
- driver.getJobContext(jobName);
- jobMap.put(JobProperties.JobContext.name(), null);
-
- if (jobContext != null) {
- jobMap.put(JobProperties.JobContext.name(), jobContext.getRecord());
- }
-
- return JSONRepresentation(jobMap);
- }
-
- @PUT
- @Path("{jobName}")
- public Response addJob(@PathParam("clusterId") String clusterId,
- @PathParam("workflowName") String workflowName, @PathParam("jobName") String jobName,
- String content) {
- ZNRecord record;
- TaskDriver driver = getTaskDriver(clusterId);
-
- try {
- record = toZNRecord(content);
- JobConfig.Builder jobConfig = JobAccessor.getJobConfig(record);
- driver.enqueueJob(workflowName, jobName, jobConfig);
- } catch (HelixException e) {
- return badRequest(
- String.format("Failed to enqueue job %s for reason : %s", jobName, e.getMessage()));
- } catch (IOException e) {
- return badRequest(String.format("Invalid input for Job Config of Job : %s", jobName));
- }
-
- return OK();
- }
-
- @DELETE
- @Path("{jobName}")
- public Response deleteJob(@PathParam("clusterId") String clusterId,
- @PathParam("workflowName") String workflowName, @PathParam("jobName") String jobName) {
- TaskDriver driver = getTaskDriver(clusterId);
-
- try {
- driver.deleteJob(workflowName, jobName);
- } catch (Exception e) {
- return badRequest(e.getMessage());
- }
-
- return OK();
- }
-
- @GET
- @Path("{jobName}/configs")
- public Response getJobConfig(@PathParam("clusterId") String clusterId,
- @PathParam("workflowName") String workflowName, @PathParam("jobName") String jobName) {
- TaskDriver driver = getTaskDriver(clusterId);
-
- JobConfig jobConfig = driver.getJobConfig(jobName);
- if (jobConfig != null) {
- return JSONRepresentation(jobConfig.getRecord());
- }
- return badRequest("Job config for " + jobName + " does not exists");
- }
-
- @GET
- @Path("{jobName}/context")
- public Response getJobContext(@PathParam("clusterId") String clusterId,
- @PathParam("workflowName") String workflowName, @PathParam("jobName") String jobName) {
- TaskDriver driver = getTaskDriver(clusterId);
-
- JobContext jobContext =
- driver.getJobContext(jobName);
- if (jobContext != null) {
- return JSONRepresentation(jobContext.getRecord());
- }
- return badRequest("Job context for " + jobName + " does not exists");
- }
-
- protected static JobConfig.Builder getJobConfig(Map<String, String> cfgMap) {
- return new JobConfig.Builder().fromMap(cfgMap);
- }
-
- protected static JobConfig.Builder getJobConfig(ZNRecord record) {
- JobConfig.Builder jobConfig = new JobConfig.Builder().fromMap(record.getSimpleFields());
- jobConfig.addTaskConfigMap(getTaskConfigMap(record.getMapFields()));
-
- return jobConfig;
- }
-
- private static Map<String, TaskConfig> getTaskConfigMap(
- Map<String, Map<String, String>> taskConfigs) {
- Map<String, TaskConfig> taskConfigsMap = new HashMap<>();
- if (taskConfigs == null || taskConfigs.isEmpty()) {
- return Collections.emptyMap();
- }
-
- for (Map<String, String> taskConfigMap : taskConfigs.values()) {
- if (!taskConfigMap.containsKey(JobProperties.TASK_COMMAND.name())) {
- continue;
- }
-
- TaskConfig taskConfig =
- new TaskConfig(taskConfigMap.get(JobProperties.TASK_COMMAND.name()), taskConfigMap);
- taskConfigsMap.put(taskConfig.getId(), taskConfig);
- }
-
- return taskConfigsMap;
- }
-}
http://git-wip-us.apache.org/repos/asf/helix/blob/40710b27/helix-rest/src/main/java/org/apache/helix/rest/server/resources/ResourceAccessor.java
----------------------------------------------------------------------
diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/ResourceAccessor.java b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/ResourceAccessor.java
deleted file mode 100644
index 7ea571f..0000000
--- a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/ResourceAccessor.java
+++ /dev/null
@@ -1,278 +0,0 @@
-package org.apache.helix.rest.server.resources;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import javax.ws.rs.DELETE;
-import javax.ws.rs.DefaultValue;
-import javax.ws.rs.GET;
-import javax.ws.rs.POST;
-import javax.ws.rs.PUT;
-import javax.ws.rs.Path;
-import javax.ws.rs.PathParam;
-import javax.ws.rs.QueryParam;
-import javax.ws.rs.core.Response;
-import org.apache.helix.ConfigAccessor;
-import org.apache.helix.HelixAdmin;
-import org.apache.helix.HelixException;
-import org.apache.helix.PropertyPathBuilder;
-import org.apache.helix.ZNRecord;
-import org.apache.helix.manager.zk.ZkClient;
-import org.apache.helix.model.ExternalView;
-import org.apache.helix.model.IdealState;
-import org.apache.helix.model.ResourceConfig;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.codehaus.jackson.node.ArrayNode;
-import org.codehaus.jackson.node.JsonNodeFactory;
-import org.codehaus.jackson.node.ObjectNode;
-
-@Path("/clusters/{clusterId}/resources")
-public class ResourceAccessor extends AbstractResource {
- private final static Logger _logger = LoggerFactory.getLogger(ResourceAccessor.class);
- public enum ResourceProperties {
- idealState,
- idealStates,
- externalView,
- externalViews,
- resourceConfig,
- }
-
- @GET
- public Response getResources(@PathParam("clusterId") String clusterId) {
- ObjectNode root = JsonNodeFactory.instance.objectNode();
- root.put(Properties.id.name(), JsonNodeFactory.instance.textNode(clusterId));
-
- ZkClient zkClient = getZkClient();
-
- ArrayNode idealStatesNode = root.putArray(ResourceProperties.idealStates.name());
- ArrayNode externalViewsNode = root.putArray(ResourceProperties.externalViews.name());
-
- List<String> idealStates = zkClient.getChildren(PropertyPathBuilder.idealState(clusterId));
- List<String> externalViews = zkClient.getChildren(PropertyPathBuilder.externalView(clusterId));
-
- if (idealStates != null) {
- idealStatesNode.addAll((ArrayNode) OBJECT_MAPPER.valueToTree(idealStates));
- } else {
- return notFound();
- }
-
- if (externalViews != null) {
- externalViewsNode.addAll((ArrayNode) OBJECT_MAPPER.valueToTree(externalViews));
- }
-
- return JSONRepresentation(root);
- }
-
- @GET
- @Path("{resourceName}")
- public Response getResource(@PathParam("clusterId") String clusterId,
- @PathParam("resourceName") String resourceName) throws IOException {
- ConfigAccessor accessor = getConfigAccessor();
- HelixAdmin admin = getHelixAdmin();
-
- ResourceConfig resourceConfig = accessor.getResourceConfig(clusterId, resourceName);
- IdealState idealState = admin.getResourceIdealState(clusterId, resourceName);
- ExternalView externalView = admin.getResourceExternalView(clusterId, resourceName);
-
- Map<String, ZNRecord> resourceMap = new HashMap<>();
- if (idealState != null) {
- resourceMap.put(ResourceProperties.idealState.name(), idealState.getRecord());
- } else {
- return notFound();
- }
-
- resourceMap.put(ResourceProperties.resourceConfig.name(), null);
- resourceMap.put(ResourceProperties.externalView.name(), null);
-
- if (resourceConfig != null) {
- resourceMap.put(ResourceProperties.resourceConfig.name(), resourceConfig.getRecord());
- }
-
- if (externalView != null) {
- resourceMap.put(ResourceProperties.externalView.name(), externalView.getRecord());
- }
-
- return JSONRepresentation(resourceMap);
- }
-
- @PUT
- @Path("{resourceName}")
- public Response addResource(@PathParam("clusterId") String clusterId,
- @PathParam("resourceName") String resourceName,
- @DefaultValue("-1") @QueryParam("numPartitions") int numPartitions,
- @DefaultValue("") @QueryParam("stateModelRef") String stateModelRef,
- @DefaultValue("SEMI_AUTO") @QueryParam("rebalancerMode") String rebalancerMode,
- @DefaultValue("DEFAULT") @QueryParam("rebalanceStrategy") String rebalanceStrategy,
- @DefaultValue("0") @QueryParam("bucketSize") int bucketSize,
- @DefaultValue("-1") @QueryParam("maxPartitionsPerInstance") int maxPartitionsPerInstance,
- String content) {
-
- HelixAdmin admin = getHelixAdmin();
-
- try {
- if (content.length() != 0) {
- ZNRecord record;
- try {
- record = toZNRecord(content);
- } catch (IOException e) {
- _logger.error("Failed to deserialize user's input " + content + ", Exception: " + e);
- return badRequest("Input is not a vaild ZNRecord!");
- }
-
- if (record.getSimpleFields() != null) {
- admin.addResource(clusterId, resourceName, new IdealState(record));
- }
- } else {
- admin.addResource(clusterId, resourceName, numPartitions, stateModelRef, rebalancerMode,
- rebalanceStrategy, bucketSize, maxPartitionsPerInstance);
- }
- } catch (Exception e) {
- _logger.error("Error in adding a resource: " + resourceName, e);
- return serverError(e);
- }
-
- return OK();
- }
-
- @POST
- @Path("{resourceName}")
- public Response updateResource(@PathParam("clusterId") String clusterId,
- @PathParam("resourceName") String resourceName, @QueryParam("command") String command,
- @DefaultValue("-1") @QueryParam("replicas") int replicas,
- @DefaultValue("") @QueryParam("keyPrefix") String keyPrefix,
- @DefaultValue("") @QueryParam("group") String group){
- Command cmd;
- try {
- cmd = Command.valueOf(command);
- } catch (Exception e) {
- return badRequest("Invalid command : " + command);
- }
-
- HelixAdmin admin = getHelixAdmin();
- try {
- switch (cmd) {
- case enable:
- admin.enableResource(clusterId, resourceName, true);
- break;
- case disable:
- admin.enableResource(clusterId, resourceName, false);
- break;
- case rebalance:
- if (replicas == -1) {
- return badRequest("Number of replicas is needed for rebalancing!");
- }
- keyPrefix = keyPrefix.length() == 0 ? resourceName : keyPrefix;
- admin.rebalance(clusterId, resourceName, replicas, keyPrefix, group);
- break;
- default:
- _logger.error("Unsupported command :" + command);
- return badRequest("Unsupported command :" + command);
- }
- } catch (Exception e) {
- _logger.error("Failed in updating resource : " + resourceName, e);
- return badRequest(e.getMessage());
- }
- return OK();
- }
-
- @DELETE
- @Path("{resourceName}")
- public Response deleteResource(@PathParam("clusterId") String clusterId,
- @PathParam("resourceName") String resourceName) {
- HelixAdmin admin = getHelixAdmin();
- try {
- admin.dropResource(clusterId, resourceName);
- } catch (Exception e) {
- _logger.error("Error in deleting a resource: " + resourceName, e);
- return serverError();
- }
- return OK();
- }
-
- @GET
- @Path("{resourceName}/configs")
- public Response getResourceConfig(@PathParam("clusterId") String clusterId,
- @PathParam("resourceName") String resourceName) {
- ConfigAccessor accessor = getConfigAccessor();
- ResourceConfig resourceConfig = accessor.getResourceConfig(clusterId, resourceName);
- if (resourceConfig != null) {
- return JSONRepresentation(resourceConfig.getRecord());
- }
-
- return notFound();
- }
-
- @POST
- @Path("{resourceName}/configs")
- public Response updateResourceConfig(@PathParam("clusterId") String clusterId,
- @PathParam("resourceName") String resourceName, String content) {
- ZNRecord record;
- try {
- record = toZNRecord(content);
- } catch (IOException e) {
- _logger.error("Failed to deserialize user's input " + content + ", Exception: " + e);
- return badRequest("Input is not a vaild ZNRecord!");
- }
- ResourceConfig resourceConfig = new ResourceConfig(record);
- ConfigAccessor configAccessor = getConfigAccessor();
- try {
- configAccessor.updateResourceConfig(clusterId, resourceName, resourceConfig);
- } catch (HelixException ex) {
- return notFound(ex.getMessage());
- } catch (Exception ex) {
- _logger.error(
- "Failed to update cluster config, cluster " + clusterId + " new config: " + content
- + ", Exception: " + ex);
- return serverError(ex);
- }
- return OK();
- }
-
- @GET
- @Path("{resourceName}/idealState")
- public Response getResourceIdealState(@PathParam("clusterId") String clusterId,
- @PathParam("resourceName") String resourceName) {
- HelixAdmin admin = getHelixAdmin();
- IdealState idealState = admin.getResourceIdealState(clusterId, resourceName);
- if (idealState != null) {
- return JSONRepresentation(idealState.getRecord());
- }
-
- return notFound();
- }
-
- @GET
- @Path("{resourceName}/externalView")
- public Response getResourceExternalView(@PathParam("clusterId") String clusterId,
- @PathParam("resourceName") String resourceName) {
- HelixAdmin admin = getHelixAdmin();
- ExternalView externalView = admin.getResourceExternalView(clusterId, resourceName);
- if (externalView != null) {
- return JSONRepresentation(externalView.getRecord());
- }
-
- return notFound();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/helix/blob/40710b27/helix-rest/src/main/java/org/apache/helix/rest/server/resources/UIResourceAccessor.java
----------------------------------------------------------------------
diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/UIResourceAccessor.java b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/UIResourceAccessor.java
deleted file mode 100644
index 7ef22a4..0000000
--- a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/UIResourceAccessor.java
+++ /dev/null
@@ -1,62 +0,0 @@
-package org.apache.helix.rest.server.resources;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import javax.ws.rs.GET;
-import javax.ws.rs.Path;
-import javax.ws.rs.PathParam;
-import javax.ws.rs.core.MediaType;
-import javax.ws.rs.core.PathSegment;
-import javax.ws.rs.core.Response;
-import java.io.InputStream;
-import java.util.List;
-
-@Path("/ui")
-public class UIResourceAccessor extends AbstractResource {
- private static final String INDEX_PAGE = "index.html";
- private static final String UI_RESOURCE_FOLDER = "ui";
-
- @GET
- public Response getIndex() {
- return getStaticFile(INDEX_PAGE);
- }
-
- @GET
- @Path("{fileName}")
- public Response getStaticFile(@PathParam("fileName") String fileName) {
- InputStream is = getClass().getClassLoader().getResourceAsStream(UI_RESOURCE_FOLDER + "/" + fileName);
-
- if (is == null) {
- // forward any other requests to index except index is not found
- return fileName.equalsIgnoreCase(INDEX_PAGE) ? notFound() : getIndex();
- }
-
- return Response.ok(is, MediaType.TEXT_HTML).build();
- }
-
- @GET
- @Path("{any: .*}")
- public Response getStaticFile(@PathParam("any") List<PathSegment> segments) {
- // get the last segment
- String fileName = segments.get(segments.size() - 1).getPath();
-
- return getStaticFile(fileName);
- }
-}
http://git-wip-us.apache.org/repos/asf/helix/blob/40710b27/helix-rest/src/main/java/org/apache/helix/rest/server/resources/WorkflowAccessor.java
----------------------------------------------------------------------
diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/WorkflowAccessor.java b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/WorkflowAccessor.java
deleted file mode 100644
index 398a4d2..0000000
--- a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/WorkflowAccessor.java
+++ /dev/null
@@ -1,325 +0,0 @@
-package org.apache.helix.rest.server.resources;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import javax.ws.rs.DELETE;
-import javax.ws.rs.GET;
-import javax.ws.rs.POST;
-import javax.ws.rs.PUT;
-import javax.ws.rs.Path;
-import javax.ws.rs.PathParam;
-import javax.ws.rs.QueryParam;
-import javax.ws.rs.core.Response;
-
-import org.apache.helix.HelixException;
-import org.apache.helix.ZNRecord;
-import org.apache.helix.task.JobConfig;
-import org.apache.helix.task.JobDag;
-import org.apache.helix.task.JobQueue;
-import org.apache.helix.task.TaskDriver;
-import org.apache.helix.task.Workflow;
-import org.apache.helix.task.WorkflowConfig;
-import org.apache.helix.task.WorkflowContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.codehaus.jackson.JsonNode;
-import org.codehaus.jackson.map.type.TypeFactory;
-import org.codehaus.jackson.node.ArrayNode;
-import org.codehaus.jackson.node.JsonNodeFactory;
-import org.codehaus.jackson.node.ObjectNode;
-import org.codehaus.jackson.node.TextNode;
-
-@Path("/clusters/{clusterId}/workflows")
-public class WorkflowAccessor extends AbstractResource {
- private static Logger _logger = LoggerFactory.getLogger(WorkflowAccessor.class.getName());
-
- public enum WorkflowProperties {
- Workflows,
- WorkflowConfig,
- WorkflowContext,
- Jobs,
- ParentJobs
- }
-
- public enum TaskCommand {
- stop,
- resume,
- clean
- }
-
- @GET
- public Response getWorkflows(@PathParam("clusterId") String clusterId) {
- TaskDriver taskDriver = getTaskDriver(clusterId);
- Map<String, WorkflowConfig> workflowConfigMap = taskDriver.getWorkflows();
- Map<String, List<String>> dataMap = new HashMap<>();
- dataMap.put(WorkflowProperties.Workflows.name(), new ArrayList<>(workflowConfigMap.keySet()));
-
- return JSONRepresentation(dataMap);
- }
-
- @GET
- @Path("{workflowId}")
- public Response getWorkflow(@PathParam("clusterId") String clusterId,
- @PathParam("workflowId") String workflowId) {
- TaskDriver taskDriver = getTaskDriver(clusterId);
- WorkflowConfig workflowConfig = taskDriver.getWorkflowConfig(workflowId);
- WorkflowContext workflowContext = taskDriver.getWorkflowContext(workflowId);
-
- ObjectNode root = JsonNodeFactory.instance.objectNode();
- TextNode id = JsonNodeFactory.instance.textNode(workflowId);
- root.put(Properties.id.name(), id);
-
- ObjectNode workflowConfigNode = JsonNodeFactory.instance.objectNode();
- ObjectNode workflowContextNode = JsonNodeFactory.instance.objectNode();
-
- if (workflowConfig != null) {
- getWorkflowConfigNode(workflowConfigNode, workflowConfig.getRecord());
- }
-
- if (workflowContext != null) {
- getWorkflowContextNode(workflowContextNode, workflowContext.getRecord());
- }
-
- root.put(WorkflowProperties.WorkflowConfig.name(), workflowConfigNode);
- root.put(WorkflowProperties.WorkflowContext.name(), workflowContextNode);
-
- JobDag jobDag = workflowConfig.getJobDag();
- ArrayNode jobs = OBJECT_MAPPER.valueToTree(jobDag.getAllNodes());
- ObjectNode parentJobs = OBJECT_MAPPER.valueToTree(jobDag.getParentsToChildren());
- root.put(WorkflowProperties.Jobs.name(), jobs);
- root.put(WorkflowProperties.ParentJobs.name(), parentJobs);
-
- return JSONRepresentation(root);
- }
-
- @PUT
- @Path("{workflowId}")
- public Response createWorkflow(@PathParam("clusterId") String clusterId,
- @PathParam("workflowId") String workflowId, String content) {
- TaskDriver driver = getTaskDriver(clusterId);
- Map<String, String> cfgMap;
- try {
- JsonNode root = OBJECT_MAPPER.readTree(content);
- cfgMap = OBJECT_MAPPER
- .readValue(root.get(WorkflowProperties.WorkflowConfig.name()).toString(),
- TypeFactory.defaultInstance()
- .constructMapType(HashMap.class, String.class, String.class));
-
- WorkflowConfig workflowConfig = WorkflowConfig.Builder.fromMap(cfgMap).build();
-
- // Since JobQueue can keep adding jobs, Helix create JobQueue will ignore the jobs
- if (workflowConfig.isJobQueue()) {
- driver.start(new JobQueue.Builder(workflowId).setWorkflowConfig(workflowConfig).build());
- return OK();
- }
-
- Workflow.Builder workflow = new Workflow.Builder(workflowId);
-
- if (root.get(WorkflowProperties.Jobs.name()) != null) {
- Map<String, JobConfig.Builder> jobConfigs =
- getJobConfigs((ArrayNode) root.get(WorkflowProperties.Jobs.name()));
- for (Map.Entry<String, JobConfig.Builder> job : jobConfigs.entrySet()) {
- workflow.addJob(job.getKey(), job.getValue());
- }
- }
-
- if (root.get(WorkflowProperties.ParentJobs.name()) != null) {
- Map<String, List<String>> parentJobs = OBJECT_MAPPER
- .readValue(root.get(WorkflowProperties.ParentJobs.name()).toString(),
- TypeFactory.defaultInstance()
- .constructMapType(HashMap.class, String.class, List.class));
- for (Map.Entry<String, List<String>> entry : parentJobs.entrySet()) {
- String parentJob = entry.getKey();
- for (String childJob : entry.getValue()) {
- workflow.addParentChildDependency(parentJob, childJob);
- }
- }
- }
-
- driver.start(workflow.build());
- } catch (IOException e) {
- return badRequest(String
- .format("Invalid input of Workflow %s for reason : %s", workflowId, e.getMessage()));
- } catch (HelixException e) {
- return badRequest(String
- .format("Failed to create workflow %s for reason : %s", workflowId, e.getMessage()));
- }
- return OK();
- }
-
- @DELETE
- @Path("{workflowId}")
- public Response deleteWorkflow(@PathParam("clusterId") String clusterId,
- @PathParam("workflowId") String workflowId) {
- TaskDriver driver = getTaskDriver(clusterId);
- try {
- driver.delete(workflowId);
- } catch (HelixException e) {
- return badRequest(String
- .format("Failed to delete workflow %s for reason : %s", workflowId, e.getMessage()));
- }
- return OK();
- }
-
- @POST
- @Path("{workflowId}")
- public Response updateWorkflow(@PathParam("clusterId") String clusterId,
- @PathParam("workflowId") String workflowId, @QueryParam("command") String command) {
- TaskDriver driver = getTaskDriver(clusterId);
-
- try {
- TaskCommand cmd = TaskCommand.valueOf(command);
- switch (cmd) {
- case stop:
- driver.stop(workflowId);
- break;
- case resume:
- driver.resume(workflowId);
- break;
- case clean:
- driver.cleanupQueue(workflowId);
- break;
- default:
- return badRequest(String.format("Invalid command : %s", command));
- }
- } catch (HelixException e) {
- return badRequest(
- String.format("Failed to execute operation %s for reason : %s", command, e.getMessage()));
- } catch (Exception e) {
- return serverError(e);
- }
-
- return OK();
- }
-
- @GET
- @Path("{workflowId}/configs")
- public Response getWorkflowConfig(@PathParam("clusterId") String clusterId,
- @PathParam("workflowId") String workflowId) {
- TaskDriver taskDriver = getTaskDriver(clusterId);
- WorkflowConfig workflowConfig = taskDriver.getWorkflowConfig(workflowId);
- ObjectNode workflowConfigNode = JsonNodeFactory.instance.objectNode();
- if (workflowConfig != null) {
- getWorkflowConfigNode(workflowConfigNode, workflowConfig.getRecord());
- }
-
- return JSONRepresentation(workflowConfigNode);
- }
-
- @POST
- @Path("{workflowId}/configs")
- public Response updateWorkflowConfig(@PathParam("clusterId") String clusterId,
- @PathParam("workflowId") String workflowId, String content) {
- ZNRecord record;
- TaskDriver driver = getTaskDriver(clusterId);
-
- try {
- record = toZNRecord(content);
-
- WorkflowConfig workflowConfig = driver.getWorkflowConfig(workflowId);
- if (workflowConfig == null) {
- return badRequest(
- String.format("WorkflowConfig for workflow %s does not exists!", workflowId));
- }
-
- workflowConfig.getRecord().update(record);
- driver.updateWorkflow(workflowId, workflowConfig);
- } catch (HelixException e) {
- return badRequest(
- String.format("Failed to update WorkflowConfig for workflow %s", workflowId));
- } catch (Exception e) {
- return badRequest(String.format("Invalid WorkflowConfig for workflow %s", workflowId));
- }
-
- return OK();
- }
-
- @GET
- @Path("{workflowId}/context")
- public Response getWorkflowContext(@PathParam("clusterId") String clusterId,
- @PathParam("workflowId") String workflowId) {
- TaskDriver taskDriver = getTaskDriver(clusterId);
- WorkflowContext workflowContext = taskDriver.getWorkflowContext(workflowId);
- ObjectNode workflowContextNode = JsonNodeFactory.instance.objectNode();
- if (workflowContext != null) {
- getWorkflowContextNode(workflowContextNode, workflowContext.getRecord());
- }
-
- return JSONRepresentation(workflowContextNode);
- }
-
- private void getWorkflowConfigNode(ObjectNode workflowConfigNode, ZNRecord record) {
- for (Map.Entry<String, String> entry : record.getSimpleFields().entrySet()) {
- if (!entry.getKey().equals(WorkflowConfig.WorkflowConfigProperty.Dag)) {
- workflowConfigNode.put(entry.getKey(), JsonNodeFactory.instance.textNode(entry.getValue()));
- }
- }
- }
-
- private void getWorkflowContextNode(ObjectNode workflowContextNode, ZNRecord record) {
- if (record.getMapFields() != null) {
- for (String fieldName : record.getMapFields().keySet()) {
- JsonNode node = OBJECT_MAPPER.valueToTree(record.getMapField(fieldName));
- workflowContextNode.put(fieldName, node);
- }
- }
-
- if (record.getSimpleFields() != null) {
- for (Map.Entry<String, String> entry : record.getSimpleFields().entrySet()) {
- workflowContextNode
- .put(entry.getKey(), JsonNodeFactory.instance.textNode(entry.getValue()));
- }
- }
- }
-
- private Map<String, JobConfig.Builder> getJobConfigs(ArrayNode root)
- throws HelixException, IOException {
- Map<String, JobConfig.Builder> jobConfigsMap = new HashMap<>();
- for (Iterator<JsonNode> it = root.getElements(); it.hasNext(); ) {
- JsonNode job = it.next();
- ZNRecord record = null;
-
- try {
- record = toZNRecord(job.toString());
- } catch (IOException e) {
- // Ignore the parse since it could be just simple fields
- }
-
- if (record == null || record.getSimpleFields().isEmpty()) {
- Map<String, String> cfgMap = OBJECT_MAPPER.readValue(job.toString(),
- TypeFactory.defaultInstance()
- .constructMapType(HashMap.class, String.class, String.class));
- jobConfigsMap
- .put(job.get(Properties.id.name()).getTextValue(), JobAccessor.getJobConfig(cfgMap));
- } else {
- jobConfigsMap
- .put(job.get(Properties.id.name()).getTextValue(), JobAccessor.getJobConfig(record));
- }
- }
-
- return jobConfigsMap;
- }
-}