You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ka...@apache.org on 2015/02/19 06:13:44 UTC
[1/4] helix git commit: Refactoring cluster.py: Changing from module
to class based functions. Abstracting Cluster object. Adding ZKCluster
object.
Repository: helix
Updated Branches:
refs/heads/master 01222c4f6 -> 80a4a13fd
Refactoring cluster.py:
Changing from module to class based functions.
Abstracting Cluster object.
Adding ZKCluster object.
functions.py:
Put all functions inside a class.
Switched to using a class variable for host selection.
zkfunctions.py:
Adding support for direct zookeeper access.
statemodeldefs.py:
Constants for state model definitions.
test/test_helix.py:
Unit tests for both Rest object and zookeeper object.
Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/a714f002
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/a714f002
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/a714f002
Branch: refs/heads/master
Commit: a714f002cd2d9dd98bd86723da787c94f0d70ee7
Parents: 01222c4
Author: Casey Miller <ca...@linkedin.com>
Authored: Fri Feb 13 19:43:24 2015 -0800
Committer: Casey Miller <ca...@linkedin.com>
Committed: Fri Feb 13 19:43:24 2015 -0800
----------------------------------------------------------------------
contributors/py-helix-admin/helix/cluster.py | 91 +-
contributors/py-helix-admin/helix/functions.py | 925 +++++++++----------
.../py-helix-admin/helix/statemodeldefs.py | 39 +
.../py-helix-admin/helix/test/test_helix.py | 60 ++
.../py-helix-admin/helix/zkfunctions.py | 522 +++++++++++
5 files changed, 1111 insertions(+), 526 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/helix/blob/a714f002/contributors/py-helix-admin/helix/cluster.py
----------------------------------------------------------------------
diff --git a/contributors/py-helix-admin/helix/cluster.py b/contributors/py-helix-admin/helix/cluster.py
index 7db7014..175bff8 100644
--- a/contributors/py-helix-admin/helix/cluster.py
+++ b/contributors/py-helix-admin/helix/cluster.py
@@ -23,17 +23,21 @@ from partition import Partition
from resourcegroup import ResourceGroup
from helixexceptions import HelixException
-import functions
+from functions import RestHelixFunctions
+try:
+ from zkfunctions import ZookeeperHelixFunctions
+ zookeeper_ok = True
+except ImportError:
+ zookeeper_ok = False
-class Cluster(object):
+class BaseCluster(object):
"""Basic model of a cluster, holds participants, partitions, slices,
external view, ideal state, resource groups"""
ver = (1, 0)
- def __init__(self, host, cluster):
- super(Cluster, self).__init__()
- self.host = host
+ def __init__(self, cluster):
+ super(BaseCluster, self).__init__()
self.cluster = cluster
# dynamically loaded data below
@@ -48,15 +52,13 @@ class Cluster(object):
self.cluster)
def __repr__(self):
- return "{0}({1}, {2})".format(self.__class__.__name__, self.cluster,
- self.host)
+ return "{0}({1}, {2})".format(self.__class__.__name__, self.cluster)
def load_resources(self):
"""queries helix for resource groups and loades them into model"""
try:
- for cur_resource in functions.get_resource_groups(self.host,
- self.cluster):
- data = functions.get_resource_group(self.host, self.cluster,
+ for cur_resource in self.functions.get_resource_groups(self.cluster):
+ data = self.functions.get_resource_group(self.cluster,
cur_resource)
name = data["id"]
count = data["simpleFields"]["NUM_PARTITIONS"]
@@ -94,7 +96,7 @@ class Cluster(object):
def _cluster_exists(self):
"""verify cluster exists in helix"""
- if self.cluster in functions.get_clusters(self.host):
+ if self.cluster in self.functions.get_clusters():
return True
return False
@@ -103,7 +105,7 @@ class Cluster(object):
self._participants = {}
try:
- instances = functions.get_instances(self.host, self.cluster)
+ instances = self.functions.get_instances(self.cluster)
for instance in instances:
ident = instance["id"]
enabled = instance["simpleFields"]["HELIX_ENABLED"]
@@ -129,7 +131,7 @@ class Cluster(object):
"""query partitions from helix and load into model"""
self._partitions = {}
for resource in self.resources:
- newstate = functions.get_ideal_state(self.host, self.cluster,
+ newstate = self.functions.get_ideal_state(self.cluster,
resource)
self._partitions[resource] = {}
if newstate:
@@ -152,7 +154,7 @@ class Cluster(object):
self._ideal_state = {}
for resource in self.resources:
self._ideal_state[resource] = \
- functions.get_ideal_state(self.host, self.cluster, resource)
+ self.functions.get_ideal_state(self.cluster, resource)
@property
def ideal_state(self):
@@ -171,7 +173,7 @@ class Cluster(object):
self._external_view = {}
for resource in self.resources:
self._external_view[resource] = \
- functions.get_external_view(self.host, self.cluster, resource)
+ self.functions.get_external_view(self.cluster, resource)
@property
def external_view(self):
@@ -187,18 +189,18 @@ class Cluster(object):
def get_config(self, config):
""" get requested config from helix"""
- return functions.get_config(self.host, self.cluster, config)
+ return self.functions.get_config(self.cluster, config)
def set_cluster_config(self, config):
""" set given configs in helix"""
- return functions.set_config(self.host, self.cluster, config)
+ return self.functions.set_config(self.cluster, config)
def set_resource_config(self, config, resource):
""" set given configs in helix"""
rname = resource
if isinstance(resource, ResourceGroup):
rname = resource.name
- return functions.set_config(self.host, self.cluster, config,
+ return self.functions.set_config(self.cluster, config,
resource=rname)
def set_participant_config(self, config, participant):
@@ -206,34 +208,34 @@ class Cluster(object):
if isinstance(participant, Participant):
pname = participant.ident
""" set given configs in helix"""
- return functions.set_config(self.host, self.cluster, config,
+ return self.functions.set_config(self.cluster, config,
participant=pname)
def activate_cluster(self, grand, enabled=True):
"""activate this cluster with the specified grand cluster"""
- return functions.activate_cluster(self.host, self.cluster, grand,
+ return self.functions.activate_cluster(self.cluster, grand,
enabled)
def deactivate_cluster(self, grand):
"""deactivate this cluster against the given grandcluster"""
- return functions.deactivate_cluster(self.host, self.cluster, grand)
+ return self.functions.deactivate_cluster(self.cluster, grand)
def add_cluster(self):
"""add cluster to helix"""
- return functions.add_cluster(self.host, self.cluster)
+ return self.functions.add_cluster(self.cluster)
def add_instance(self, instances, port):
"""add instance to cluster"""
- return functions.add_instance(self.host, self.cluster, instances, port)
+ return self.functions.add_instance(self.cluster, instances, port)
def rebalance(self, resource, replicas, key=""):
"""rebalance a resource group"""
- return functions.rebalance(self.host, self.cluster, resource,
+ return self.functions.rebalance(self.cluster, resource,
replicas, key)
def add_resource(self, resource, partitions, state_model_def, mode=""):
"""add resource to cluster"""
- return functions.add_resource(self.host, self.cluster, resource,
+ return self.functions.add_resource(self.cluster, resource,
partitions, state_model_def, mode)
def enable_instance(self, instance, enabled=True):
@@ -245,7 +247,7 @@ class Cluster(object):
ident = instance
else:
raise HelixException("Instance must be a string or participant")
- return functions.enable_instance(self.host, self.cluster, ident,
+ return self.functions.enable_instance(self.cluster, ident,
enabled)
def disable_instance(self, instance):
@@ -272,7 +274,7 @@ class Cluster(object):
else:
raise HelixException("Partition must be a string or partition")
- return functions.enable_partition(self.host, self.cluster, resource,
+ return self.functions.enable_partition(self.cluster, resource,
part_id, ident, enabled)
def disable_partition(self, resource, partition, instance):
@@ -291,7 +293,7 @@ class Cluster(object):
raise HelixException(
"Resource must be a string or a resource group object")
- return functions.enable_resource(self.host, self.cluster,
+ return self.functions.enable_resource(self.cluster,
resource_name, enabled)
def disable_resource(self, resource):
@@ -308,7 +310,7 @@ class Cluster(object):
else:
raise HelixException("Resource must be resource object or string")
- return functions.add_resource_tag(self.host, self.cluster,
+ return self.functions.add_resource_tag(self.cluster,
resource_name, tag)
# del resource not yet available in api
@@ -322,7 +324,7 @@ class Cluster(object):
# else:
# raise HelixException("Resource must be resource object or str")
#
- # return functions.del_resource_tag(self.host, self.cluster,
+ # return self.functions.del_resource_tag(self.cluster,
# resource_name, tag)
def add_instance_tag(self, instance, tag):
@@ -335,7 +337,7 @@ class Cluster(object):
else:
raise HelixException("Instance must be a string or participant")
- return functions.add_instance_tag(self.host, self.cluster, ident, tag)
+ return self.functions.add_instance_tag(self.cluster, ident, tag)
def del_instance_tag(self, instance, tag):
ident = None
@@ -347,18 +349,37 @@ class Cluster(object):
else:
raise HelixException("Instance must be a string or participant")
- return functions.del_instance_tag(self.host, self.cluster, ident, tag)
+ return self.functions.del_instance_tag(self.cluster, ident, tag)
def del_instance(self, instance):
"""remove instance from cluster, assumes instance is a
participant object"""
- return functions.del_instance(self.host, self.cluster, instance.ident)
+ return self.functions.del_instance(self.cluster, instance.ident)
def del_resource(self, resource):
"""remove resource group from cluster, assumes resource is a
resource object"""
- return functions.del_resource(self.host, self.cluster, resource.name)
+ return self.functions.del_resource(self.cluster, resource.name)
def del_cluster(self):
"""remove cluster from helix"""
- return functions.del_cluster(self.host, self.cluster)
+ return self.functions.del_cluster(self.cluster)
+
+class Cluster(BaseCluster):
+ def __init__(self, host, cluster):
+ super(Cluster, self).__init__(cluster)
+ self.host = host
+ self.functions = RestHelixFunctions(host)
+
+
+class ZKCluster(BaseCluster):
+ def __init__(self, zookeeper_connect_string, zookeeper_root, cluster):
+ super(ZKCluster, self).__init__(cluster)
+
+ # We want to fail if kazoo cannot be found, but only if using the zookeeper object.
+ if not zookeeper_ok:
+ raise ImportError
+
+ self.zookeeper_connect_string = zookeeper_connect_string
+ self.zookeeper_root = zookeeper_root
+ self.functions = ZookeeperHelixFunctions(self.zookeeper_connect_string, self.zookeeper_root)
http://git-wip-us.apache.org/repos/asf/helix/blob/a714f002/contributors/py-helix-admin/helix/functions.py
----------------------------------------------------------------------
diff --git a/contributors/py-helix-admin/helix/functions.py b/contributors/py-helix-admin/helix/functions.py
index 5e36404..9221aca 100644
--- a/contributors/py-helix-admin/helix/functions.py
+++ b/contributors/py-helix-admin/helix/functions.py
@@ -25,538 +25,481 @@ from helixexceptions import HelixException
from helixexceptions import HelixAlreadyExistsException
from helixexceptions import HelixDoesNotExistException
-
-def _post_payload(host, path, data, **kwargs):
- """generic function to handle posting data
- :rtype : return body of page
- :param host: host to send data to
- :param path: path to interact with
- :param data: data to send
- :param kwargs: additional keyword args
- """
-
- if "http://" not in host:
- host = "http://{0}".format(host)
-
- res = Resource(host)
-
- payload = "jsonParameters={0}".format(json.dumps(data))
- for key, value in kwargs.items():
- payload += '&{0}={1}'.format(key, json.dumps(value))
- headers = {"Content-Type": "application/json"}
- # print "path is %s" % path
- page = res.post(path=path, payload=payload, headers=headers)
- body = page.body_string()
- if body:
- body = json.loads(body)
+class RestHelixFunctions:
+ def __init__(self, host):
+ if "http://" not in host:
+ self.host = "http://{0}".format(host)
+ else:
+ self.host = host
+
+ def _post_payload(self, path, data, **kwargs):
+ """generic function to handle posting data
+ :rtype : return body of page
+ :param path: path to interact with
+ :param data: data to send
+ :param kwargs: additional keyword args
+ """
+
+ res = Resource(self.host)
+
+ payload = "jsonParameters={0}".format(json.dumps(data))
+ for key, value in kwargs.items():
+ payload += '&{0}={1}'.format(key, json.dumps(value))
+ headers = {"Content-Type": "application/json"}
+ # print "path is %s" % path
+ page = res.post(path=path, payload=payload, headers=headers)
+ body = page.body_string()
+ if body:
+ body = json.loads(body)
+
+ if isinstance(body, dict) and "ERROR" in body:
+ raise HelixException(body["ERROR"])
+
+ # test what was returned, see if any exceptions need to be raise
+ # if not body:
+ # raise HelixException("body for path {0} is empty".format(path))
+ # else:
+ # print "BODY IS EMPTY FOR ", path
+ # print "BODY is %s." % body
+
+ return body
+
+
+ def _get_page(self, path):
+ """if we're specifying a cluster then verify that a cluster is set"""
+
+ res = Resource(self.host)
+
+ page = res.get(path=path)
+ data = page.body_string()
+ body = None
+ try:
+ body = json.loads(data)
+ except ValueError:
+ body = json.loads(data[:-3])
+
+ # test what was returned, see if any exceptions need to be raise
+ if not body:
+ raise HelixException("body for path {0} is empty".format(path))
if isinstance(body, dict) and "ERROR" in body:
raise HelixException(body["ERROR"])
- # test what was returned, see if any exceptions need to be raise
- # if not body:
- # raise HelixException("body for path {0} is empty".format(path))
- # else:
- # print "BODY IS EMPTY FOR ", path
- # print "BODY is %s." % body
-
- return body
-
-
-def _get_page(host, path):
- """if we're specifying a cluster then verify that a cluster is set"""
-
- if "http://" not in host:
- host = "http://{0}".format(host)
-
- res = Resource(host)
-
- page = res.get(path=path)
- data = page.body_string()
- body = None
- try:
- body = json.loads(data)
- except ValueError:
- body = json.loads(data[:-3])
-
- # test what was returned, see if any exceptions need to be raise
- if not body:
- raise HelixException("body for path {0} is empty".format(path))
-
- if isinstance(body, dict) and "ERROR" in body:
- raise HelixException(body["ERROR"])
-
- return body
-
+ return body
-def _delete_page(host, path):
- """delete page at a given path"""
- retval = None
- if "http://" not in host:
- host = "http://{0}".format(host)
- res = Resource(host)
+ def _delete_page(self, path):
+ """delete page at a given path"""
+ retval = None
- page = res.delete(path)
- data = page.body_string()
- if data:
- retval = json.loads(data)
+ res = Resource(self.host)
- return retval
+ page = res.delete(path)
+ data = page.body_string()
+ if data:
+ retval = json.loads(data)
+ return retval
-def get_clusters(host):
- """ querys helix cluster for all clusters """
- return _get_page(host, "/clusters")["listFields"]["clusters"]
+ def get_clusters(self):
+ """ querys helix cluster for all clusters """
+ return self._get_page("/clusters")["listFields"]["clusters"]
-def get_resource_groups(host, cluster):
- """ querys helix cluster for resources groups of the current cluster"""
- return _get_page(host, "/clusters/{0}/resourceGroups".format(cluster))[
- "listFields"]["ResourceGroups"]
+ def get_resource_groups(self, cluster):
+ """ querys helix cluster for resources groups of the current cluster"""
+ return self._get_page("/clusters/{0}/resourceGroups".format(cluster))[
+ "listFields"]["ResourceGroups"]
-def get_resource_tags(host, cluster):
- """returns a dict of resource tags for a cluster"""
- return _get_page(host, "/clusters/{0}/resourceGroups".format(cluster))[
- "mapFields"]["ResourceTags"]
+ def get_resource_tags(self, cluster):
+ """returns a dict of resource tags for a cluster"""
+ return self._get_page("/clusters/{0}/resourceGroups".format(cluster))[
+ "mapFields"]["ResourceTags"]
-def get_resource_group(host, cluster, resource):
- """ gets the ideal state of the specified resource group of the
- current cluster"""
- if resource not in get_resource_groups(host, cluster):
- raise HelixException(
- "{0} is not a resource group of {1}".format(resource, cluster))
- return _get_page(host, "/clusters/{0}/resourceGroups/{1}".format(cluster,
- resource))
+ def get_resource_group(self, cluster, resource):
+ """ gets the ideal state of the specified resource group of the
+ current cluster"""
+ if resource not in self.get_resource_groups(cluster):
+ raise HelixException(
+ "{0} is not a resource group of {1}".format(resource, cluster))
+ return self._get_page("/clusters/{0}/resourceGroups/{1}".format(cluster,
+ resource))
-def get_ideal_state(host, cluster, resource):
- """ gets the ideal state of the specified resource group of the
- current cluster"""
+ def get_ideal_state(self, cluster, resource):
+ """ gets the ideal state of the specified resource group of the
+ current cluster"""
- if resource not in get_resource_groups(host, cluster):
- raise HelixException(
- "{0} is not a resource group of {1}".format(resource, cluster))
+ if resource not in self.get_resource_groups(cluster):
+ raise HelixException(
+ "{0} is not a resource group of {1}".format(resource, cluster))
- return _get_page(host, "/clusters/{0}/resourceGroups/{1}/idealState".
- format(cluster, resource))["mapFields"]
+ return self._get_page("/clusters/{0}/resourceGroups/{1}/idealState".
+ format(cluster, resource))["mapFields"]
+ def get_external_view(self, cluster, resource):
+ """return the external view for a given cluster and resource"""
+ if resource not in self.get_resource_groups(cluster):
+ raise HelixException(
+ "{0} is not a resource group of {1}".format(resource, cluster))
-def get_external_view(host, cluster, resource):
- """return the external view for a given cluster and resource"""
- if resource not in get_resource_groups(host, cluster):
- raise HelixException(
- "{0} is not a resource group of {1}".format(resource, cluster))
+ return self._get_page("/clusters/{0}/resourceGroups/{1}/externalView".format(
+ cluster, resource))["mapFields"]
- return _get_page(host,
- "/clusters/{0}/resourceGroups/{1}/externalView".format(
- cluster, resource))["mapFields"]
+ def get_instances(self, cluster):
+ """get list of instances registered to the cluster"""
+ if not cluster:
+ raise HelixException("Cluster must be set before "
+ "calling this function")
+ return self._get_page("/clusters/{0}/instances".format(cluster))[
+ "instanceInfo"]
-def get_instances(host, cluster):
- """get list of instances registered to the cluster"""
- if not cluster:
- raise HelixException("Cluster must be set before "
- "calling this function")
+ def get_instance_detail(self, cluster, name):
+ """get details of an instance"""
+ return self._get_page("/clusters/{0}/instances/{1}".format(cluster, name))
- return _get_page(host, "/clusters/{0}/instances".format(cluster))[
- "instanceInfo"]
+ def get_config(self, cluster, config):
+ """get requested config"""
+ return self._get_page("/clusters/{0}/configs/{1}".format(cluster, config))
+ def add_cluster(self, cluster):
+ """add a cluster to helix"""
+ if cluster in self.get_clusters():
+ raise HelixAlreadyExistsException(
+ "Cluster {0} already exists".format(cluster))
-def get_instance_detail(host, cluster, name):
- """get details of an instance"""
- return _get_page(host, "/clusters/{0}/instances/{1}".format(cluster, name))
+ data = {"command": "addCluster",
+ "clusterName": cluster}
-
-def get_config(host, cluster, config):
- """get requested config"""
- return _get_page(host, "/clusters/{0}/configs/{1}".format(cluster, config))
-
-
-def add_cluster(host, cluster):
- """add a cluster to helix"""
- if cluster in get_clusters(host):
- raise HelixAlreadyExistsException(
- "Cluster {0} already exists".format(cluster))
-
- data = {"command": "addCluster",
- "clusterName": cluster}
-
- page = _post_payload(host, "/clusters", data)
- return page
-
-
-def add_instance(host, cluster, instances, port):
- """add a list of instances to a cluster"""
- if cluster not in get_clusters(host):
- raise HelixDoesNotExistException(
- "Cluster {0} does not exist".format(cluster))
-
- if not isinstance(instances, list):
- instances = [instances]
- instances = ["{0}:{1}".format(instance, port) for instance in instances]
- try:
- newinstances = set(instances)
- oldinstances = set(
- [x["id"].replace('_', ':') for x in get_instances(host, cluster)])
- instances = list(newinstances - oldinstances)
- except HelixException:
- # this will get thrown if instances is empty,
- # which if we're just populating should happen
- pass
-
- if instances:
- data = {"command": "addInstance",
- "instanceNames": ";".join(instances)}
-
- instance_path = "/clusters/{0}/instances".format(cluster)
- # print "adding to", instance_path
- page = _post_payload(host, instance_path, data)
+ page = self._post_payload("/clusters", data)
return page
- else:
- raise HelixAlreadyExistsException(
- "All instances given already exist in cluster")
-
-
-def rebalance(host, cluster, resource, replicas, key=""):
- """rebalance the given resource group"""
- if resource not in get_resource_groups(host, cluster):
- raise HelixException(
- "{0} is not a resource group of {1}".format(resource, cluster))
-
- data = {"command": "rebalance",
- "replicas": replicas}
-
- if key:
- data["key"] = key
- page = _post_payload(host,
- "/clusters/{0}/resourceGroups/{1}/idealState".format(
- cluster, resource), data)
- return page
-
-
-def activate_cluster(host, cluster, grand_cluster, enabled=True):
- """activate the cluster with the grand cluster"""
- if grand_cluster not in get_clusters(host):
- raise HelixException(
- "grand cluster {0} does not exist".format(grand_cluster))
-
- data = {'command': 'activateCluster',
- 'grandCluster': grand_cluster}
-
- if enabled:
- data["enabled"] = "true"
- else:
- data["enabled"] = "false"
-
- page = _post_payload(host, "/clusters/{0}".format(cluster), data)
- return page
-
-
-def deactivate_cluster(host, cluster, grand_cluster):
- """deactivate the cluster with the grand cluster"""
- return activate_cluster(host, cluster, grand_cluster, enabled=False)
-
-
-def add_resource(host, cluster, resource, partitions,
- state_model_def, mode=""):
- """Add given resource group"""
- if resource in get_resource_groups(host, cluster):
- raise HelixAlreadyExistsException(
- "ResourceGroup {0} already exists".format(resource))
-
- data = {"command": "addResource",
- "resourceGroupName": resource,
- "partitions": partitions,
- "stateModelDefRef": state_model_def}
-
- if mode:
- data["mode"] = mode
-
- return _post_payload(host, "/clusters/{0}/resourceGroups".format(cluster),
- data)
-
-
-def enable_resource(host, cluster, resource, enabled=True):
- """enable or disable specified resource"""
- data = {"command": "enableResource"}
- if enabled:
- data["enabled"] = "true"
- else:
- data["enabled"] = "false"
-
- return _post_payload(host, "/clusters/{0}/resourceGroups/{1}".format(
- cluster, resource), data)
-
-
-def disable_resource(host, cluster, resource):
- """function for disabling resources"""
- return enable_resource(host, cluster, resource, enabled=False)
-
-
-def alter_ideal_state(host, cluster, resource, newstate):
- """alter ideal state"""
- data = {"command": "alterIdealState"}
- return _post_payload(host,
- "/clusters/{0}/resourceGroups/{1}/idealState".format(
- cluster, resource), data,
- newIdealState=newstate)
-
-
-def enable_instance(host, cluster, instance, enabled=True):
- """enable instance within cluster"""
- data = {"command": "enableInstance"}
- if enabled:
- data["enabled"] = "true"
- else:
- data["enabled"] = "false"
-
- return _post_payload(host, "/clusters/{0}/instances/{1}".format(cluster,
- instance),
- data)
-
-
-def disable_instance(host, cluster, instance):
- """wrapper for ease of use for disabling an instance"""
- return enable_instance(host, cluster, instance, enabled=False)
-
-
-def swap_instance(host, cluster, old, new):
- """swap instance"""
- data = {"command": "swapInstance",
- "oldInstance": old,
- "newInstance": new}
-
- return _post_payload(host, "/cluster/{0}/instances".format(cluster), data)
-
-
-def enable_partition(host, cluster, resource, partition, instance,
- enabled=True):
- """enable Partition """
- if resource not in get_resource_groups(host, cluster):
- raise HelixDoesNotExistException(
- "ResourceGroup {0} does not exist".format(resource))
-
- data = {"command": "enablePartition",
- "resource": resource,
- "partition": partition,
- "enabled": enabled}
- return _post_payload(host, "/clusters/{0}/instances/{1}".format(cluster,
- instance),
- data)
-
-
-def disable_partition(host, cluster, resource, partitions, instance):
- """disable Partition """
- return enable_partition(host, cluster, resource, partitions, instance,
- enabled=False)
-
-
-def reset_partition(host, cluster, resource, partitions, instance):
- """reset partition"""
- if resource not in get_resource_groups(host, cluster):
- raise HelixDoesNotExistException(
- "ResourceGroup {0} does not exist".format(resource))
-
- data = {"command": "resetPartition",
- "resource": resource,
- "partition": " ".join(partitions)}
- return _post_payload(host, "/clusters/{0}/instances/{1}".format(cluster,
- instance),
- data)
-
-
-def reset_resource(host, cluster, resource):
- """reset resource"""
- if resource not in get_resource_groups(host, cluster):
- raise HelixDoesNotExistException(
- "ResourceGroup {0} does not exist".format(resource))
-
- data = {"command": "resetResource"}
- return _post_payload(host,
- "/clusters/{0}/resourceGroups/{1}".format(cluster,
- resource),
- data)
-
-
-def reset_instance(host, cluster, instance):
- """reset instance"""
- if instance not in get_instances(host, cluster):
- raise HelixDoesNotExistException(
- "Instance {0} does not exist".format(instance))
-
- data = {"command": "resetInstance"}
- return _post_payload(host, "/clusters/{0}/instances/{1}".format(cluster,
- instance),
- data)
-
-
-def add_instance_tag(host, cluster, instance, tag):
- """add tag to an instance"""
- data = {"command": "addInstanceTag",
- "instanceGroupTag": tag}
- return _post_payload(host,
- "/clusters/{0}/instances/{1}".format(
- cluster, instance), data)
-
-
-def del_instance_tag(host, cluster, instance, tag):
- """remove tag from instance"""
- data = {"command": "removeInstanceTag",
- "instanceGroupTag": tag}
- return _post_payload(host,
- "/clusters/{0}/instances/{1}".format(
- cluster, instance), data)
-
-
-def add_resource_tag(host, cluster, resource, tag):
- """add tag to resource group"""
- if resource not in get_resource_groups(host, cluster):
- raise HelixDoesNotExistException(
- "ResourceGroup {0} does not exist".format(resource))
-
- data = {"command": "addResourceProperty",
- "INSTANCE_GROUP_TAG": tag}
- return _post_payload(host,
- "/clusters/{0}/resourceGroups/{1}/idealState".format(
- cluster, resource), data)
-
-
-"""
-del resource currently does not exist in helix api
-def del_resource_tag(host, cluster, resource, tag):
- if resource not in get_resource_groups(host, cluster):
- raise HelixDoesNotExistException(
- "ResourceGroup {0} does not exist".format(resource))
-
- data = {"command": "removeResourceProperty",
- "INSTANCE_GROUP_TAG": tag}
- return _post_payload(host,
- "/clusters/{0}/resourceGroups/{1}/idealState".format(
- cluster, resource), data)
-"""
-
-
-def get_instance_taginfo(host, cluster):
- return _get_page(host, "/clusters/{0}/instances".format(
- cluster))["tagInfo"]
-
-
-def expand_cluster(host, cluster):
- """expand cluster"""
- data = {"command": "expandCluster"}
-
- return _post_payload(host, "/clusters/{0}/".format(cluster), data)
-
-
-def expand_resource(host, cluster, resource):
- """expand resource"""
- data = {"command": "expandResource"}
-
- return _post_payload(host,
- "/clusters/{0}/resourceGroup/{1}/idealState".format(
- cluster, resource), data)
-
-
-def add_resource_property(host, cluster, resource, properties):
- """add resource property properties must be a dictionary of properties"""
- properties["command"] = "addResourceProperty"
-
- return _post_payload(host,
- "/clusters/{0}/resourceGroup/{1}/idealState".format(
- cluster, resource), properties)
-
-
-def _handle_config(host, cluster, configs, command, participant=None,
- resource=None):
- """helper function to set or delete configs in helix"""
- data = {"command": "{0}Config".format(command),
- "configs": ",".join(
- ["{0}={1}".format(x, y) for x, y in configs.items()])}
-
- address = "/clusters/{0}/configs/".format(cluster)
- if participant:
- address += "participant/{0}".format(participant)
- elif resource:
- address += "resource/{0}".format(resource)
- else:
- address += "cluster"
-
- return _post_payload(host, address, data)
-
-
-def set_config(host, cluster, configs, participant=None, resource=None):
- """sets config in helix"""
- return _handle_config(host, cluster, configs, "set", participant, resource)
-
-
-def remove_config(host, cluster, configs, participant=None, resource=None):
- """sets config in helix"""
- return _handle_config(host, "remove", cluster, configs, participant,
- resource)
-
-
-def get_zk_path(host, path):
- """get zookeeper path"""
- return _get_page(host, "zkPath/{0}".format(path))
-
-
-def del_zk_path(host, path):
- """delete zookeeper path"""
- return _delete_page(host, "zkPath/{0}".format(path))
-
-
-def get_zk_child(host, path):
- """get zookeeper child"""
- return _get_page(host, "zkChild/{0}".format(path))
-
-
-def del_zk_child(host, path):
- """delete zookeeper child"""
- return _delete_page(host, "zkChild/{0}".format(path))
-
-
-def add_state_model(host, cluster, newstate):
- """add state model"""
- data = {"command": "addStateModel"}
+ def add_instance(self, cluster, instances, port):
+ """add a list of instances to a cluster"""
+ if cluster not in self.get_clusters():
+ raise HelixDoesNotExistException(
+ "Cluster {0} does not exist".format(cluster))
+
+ if not isinstance(instances, list):
+ instances = [instances]
+ instances = ["{0}:{1}".format(instance, port) for instance in instances]
+ try:
+ newinstances = set(instances)
+ oldinstances = set(
+ [x["id"].replace('_', ':') for x in self.get_instances(cluster)])
+ instances = list(newinstances - oldinstances)
+ except HelixException:
+ # this will get thrown if instances is empty,
+ # which if we're just populating should happen
+ pass
+
+ if instances:
+ data = {"command": "addInstance",
+ "instanceNames": ";".join(instances)}
+
+ instance_path = "/clusters/{0}/instances".format(cluster)
+ # print "adding to", instance_path
+ page = self._post_payload(instance_path, data)
+ return page
+
+ else:
+ raise HelixAlreadyExistsException(
+ "All instances given already exist in cluster")
+
+ def rebalance(self, cluster, resource, replicas, key=""):
+ """rebalance the given resource group"""
+ if resource not in self.get_resource_groups(cluster):
+ raise HelixException(
+ "{0} is not a resource group of {1}".format(resource, cluster))
+
+ data = {"command": "rebalance",
+ "replicas": replicas}
+
+ if key:
+ data["key"] = key
+ page = self._post_payload("/clusters/{0}/resourceGroups/{1}/idealState".format(
+ cluster, resource), data)
+ return page
- return _post_payload(host, "/clusters/{0}/StateModelDefs".format(cluster),
- data, newStateModelDef=newstate)
+ def activate_cluster(self, cluster, grand_cluster, enabled=True):
+ """activate the cluster with the grand cluster"""
+ if grand_cluster not in self.get_clusters():
+ raise HelixException(
+ "grand cluster {0} does not exist".format(grand_cluster))
+ data = {'command': 'activateCluster',
+ 'grandCluster': grand_cluster}
-def del_instance(host, cluster, instance):
- """delete instance"""
- if instance not in [x["id"] for x in get_instances(host, cluster)]:
- raise HelixDoesNotExistException(
- "Instance {0} does not exist.".format(instance))
+ if enabled:
+ data["enabled"] = "true"
+ else:
+ data["enabled"] = "false"
- page = _delete_page(host,
- "/clusters/{0}/instances/{1}".format(cluster,
- instance))
- return page
+ page = self._post_payload("/clusters/{0}".format(cluster), data)
+ return page
+ def deactivate_cluster(self, cluster, grand_cluster):
+ """deactivate the cluster with the grand cluster"""
+ return activate_cluster(cluster, grand_cluster, enabled=False)
+
+ def add_resource(self, cluster, resource, partitions, state_model_def, mode=""):
+ """Add given resource group"""
+ if resource in self.get_resource_groups(cluster):
+ raise HelixAlreadyExistsException(
+ "ResourceGroup {0} already exists".format(resource))
+
+ data = {"command": "addResource",
+ "resourceGroupName": resource,
+ "partitions": partitions,
+ "stateModelDefRef": state_model_def}
+
+ if mode:
+ data["mode"] = mode
+
+ return self._post_payload("/clusters/{0}/resourceGroups".format(cluster),
+ data)
+
+ def enable_resource(self, cluster, resource, enabled=True):
+ """enable or disable specified resource"""
+ data = {"command": "enableResource"}
+ if enabled:
+ data["enabled"] = "true"
+ else:
+ data["enabled"] = "false"
+
+ return self._post_payload("/clusters/{0}/resourceGroups/{1}".format(
+ cluster, resource), data)
+
+ def disable_resource(self, cluster, resource):
+ """function for disabling resources"""
+ return enable_resource(cluster, resource, enabled=False)
+
+ def alter_ideal_state(self, cluster, resource, newstate):
+ """alter ideal state"""
+ data = {"command": "alterIdealState"}
+ return self._post_payload("/clusters/{0}/resourceGroups/{1}/idealState".format(
+ cluster, resource), data,
+ newIdealState=newstate)
+
+ def enable_instance(self, cluster, instance, enabled=True):
+ """enable instance within cluster"""
+ data = {"command": "enableInstance"}
+ if enabled:
+ data["enabled"] = "true"
+ else:
+ data["enabled"] = "false"
+
+ return self._post_payload("/clusters/{0}/instances/{1}".format(cluster,
+ instance),
+ data)
+
+ def disable_instance(self, cluster, instance):
+ """wrapper for ease of use for disabling an instance"""
+ return enable_instance(cluster, instance, enabled=False)
+
+ def swap_instance(self, cluster, old, new):
+ """swap instance"""
+ data = {"command": "swapInstance",
+ "oldInstance": old,
+ "newInstance": new}
+
+ return self._post_payload("/cluster/{0}/instances".format(cluster), data)
+
+ def enable_partition(self, cluster, resource, partition, instance,
+ enabled=True):
+ """enable Partition """
+ if resource not in self.get_resource_groups(cluster):
+ raise HelixDoesNotExistException(
+ "ResourceGroup {0} does not exist".format(resource))
+
+ data = {"command": "enablePartition",
+ "resource": resource,
+ "partition": partition,
+ "enabled": enabled}
+ return self._post_payload("/clusters/{0}/instances/{1}".format(cluster,
+ instance),
+ data)
+
+ def disable_partition(self, cluster, resource, partitions, instance):
+ """disable Partition """
+ return enable_partition(cluster, resource, partitions, instance,
+ enabled=False)
+
+ def reset_partition(self, cluster, resource, partitions, instance):
+ """reset partition"""
+ if resource not in self.get_resource_groups(cluster):
+ raise HelixDoesNotExistException(
+ "ResourceGroup {0} does not exist".format(resource))
+
+ data = {"command": "resetPartition",
+ "resource": resource,
+ "partition": " ".join(partitions)}
+ return self._post_payload("/clusters/{0}/instances/{1}".format(cluster,
+ instance),
+ data)
+
+ def reset_resource(self, cluster, resource):
+ """reset resource"""
+ if resource not in self.get_resource_groups(cluster):
+ raise HelixDoesNotExistException(
+ "ResourceGroup {0} does not exist".format(resource))
+
+ data = {"command": "resetResource"}
+ return self._post_payload("/clusters/{0}/resourceGroups/{1}".format(cluster,
+ resource),
+ data)
+
+ def reset_instance(self, cluster, instance):
+ """reset instance"""
+ if instance not in self.get_instances(cluster):
+ raise HelixDoesNotExistException(
+ "Instance {0} does not exist".format(instance))
+
+ data = {"command": "resetInstance"}
+ return self._post_payload("/clusters/{0}/instances/{1}".format(cluster,
+ instance),
+ data)
+
+ def add_instance_tag(self, cluster, instance, tag):
+ """add tag to an instance"""
+ data = {"command": "addInstanceTag",
+ "instanceGroupTag": tag}
+ return self._post_payload("/clusters/{0}/instances/{1}".format(
+ cluster, instance), data)
+
+ def del_instance_tag(self, cluster, instance, tag):
+ """remove tag from instance"""
+ data = {"command": "removeInstanceTag",
+ "instanceGroupTag": tag}
+ return self._post_payload("/clusters/{0}/instances/{1}".format(
+ cluster, instance), data)
+
+ def add_resource_tag(self, cluster, resource, tag):
+ """add tag to resource group"""
+ if resource not in self.get_resource_groups(cluster):
+ raise HelixDoesNotExistException(
+ "ResourceGroup {0} does not exist".format(resource))
+
+ data = {"command": "addResourceProperty",
+ "INSTANCE_GROUP_TAG": tag}
+ return self._post_payload("/clusters/{0}/resourceGroups/{1}/idealState".format(
+ cluster, resource), data)
-def del_resource(host, cluster, resource):
- """delete specified resource from cluster"""
- if resource not in get_resource_groups(host, cluster):
- raise HelixDoesNotExistException(
- "ResourceGroup {0} does not exist".format(resource))
+ """
+ del resource currently does not exist in helix api
+ def del_resource_tag(self, cluster, resource, tag):
+ if resource not in self.get_resource_groups(host, cluster):
+ raise HelixDoesNotExistException(
+ "ResourceGroup {0} does not exist".format(resource))
+
+ data = {"command": "removeResourceProperty",
+ "INSTANCE_GROUP_TAG": tag}
+ return _post_payload(host,
+ "/clusters/{0}/resourceGroups/{1}/idealState".format(
+ cluster, resource), data)
+ """
- page = _delete_page(host, "/clusters/{0}/resourceGroups/{1}".format(
- cluster, resource))
- return page
+ def get_instance_taginfo(self, cluster):
+ return self._get_page("/clusters/{0}/instances".format(
+ cluster))["tagInfo"]
+
+ def expand_cluster(self, cluster):
+ """expand cluster"""
+ data = {"command": "expandCluster"}
+ return self._post_payload("/clusters/{0}/".format(cluster), data)
+
+ def expand_resource(self, cluster, resource):
+ """expand resource"""
+ data = {"command": "expandResource"}
+
+ return self._post_payload("/clusters/{0}/resourceGroup/{1}/idealState".format(
+ cluster, resource), data)
+
+ def add_resource_property(self, cluster, resource, properties):
+ """add resource property properties must be a dictionary of properties"""
+ properties["command"] = "addResourceProperty"
+
+ return self._post_payload("/clusters/{0}/resourceGroup/{1}/idealState".format(
+ cluster, resource), properties)
+
+ def _handle_config(self, cluster, configs, command, participant=None,
+ resource=None):
+ """helper function to set or delete configs in helix"""
+ data = {"command": "{0}Config".format(command),
+ "configs": ",".join(
+ ["{0}={1}".format(x, y) for x, y in configs.items()])}
+
+ address = "/clusters/{0}/configs/".format(cluster)
+ if participant:
+ address += "participant/{0}".format(participant)
+ elif resource:
+ address += "resource/{0}".format(resource)
+ else:
+ address += "cluster"
+
+ return self._post_payload(address, data)
+
+ def set_config(self, cluster, configs, participant=None, resource=None):
+ """sets config in helix"""
+ return self._handle_config(cluster, configs, "set", participant, resource)
+
+ def remove_config(self, cluster, configs, participant=None, resource=None):
+ """sets config in helix"""
+ return self._handle_config(host, "remove", cluster, configs, participant,
+ resource)
+
+ def get_zk_path(self, path):
+ """get zookeeper path"""
+ return self._get_page("zkPath/{0}".format(path))
+
+ def del_zk_path(self, path):
+ """delete zookeeper path"""
+ return self._delete_page("zkPath/{0}".format(path))
+
+ def get_zk_child(self, path):
+ """get zookeeper child"""
+ return self._get_page("zkChild/{0}".format(path))
+
+ def del_zk_child(self, path):
+ """delete zookeeper child"""
+ return self._delete_page("zkChild/{0}".format(path))
+
+ def add_state_model(self, cluster, newstate):
+ """add state model"""
+ data = {"command": "addStateModel"}
+
+ return self._post_payload("/clusters/{0}/StateModelDefs".format(cluster),
+ data, newStateModelDef=newstate)
+
+ def del_instance(self, cluster, instance):
+ """delete instance"""
+ if instance not in [x["id"] for x in self.get_instances(cluster)]:
+ raise HelixDoesNotExistException(
+ "Instance {0} does not exist.".format(instance))
+
+ page = self._delete_page("/clusters/{0}/instances/{1}".format(cluster,
+ instance))
+ return page
+ def del_resource(self, cluster, resource):
+ """delete specified resource from cluster"""
+ if resource not in self.get_resource_groups(cluster):
+ raise HelixDoesNotExistException(
+ "ResourceGroup {0} does not exist".format(resource))
-def del_cluster(host, cluster):
- """delete cluster"""
- page = _delete_page(host, "/clusters/{0}".format(cluster))
+ page = self._delete_page("/clusters/{0}/resourceGroups/{1}".format(
+ cluster, resource))
+ return page
- return page
+ def del_cluster(self, cluster):
+ """delete cluster"""
+ page = self._delete_page("/clusters/{0}".format(cluster))
+ return page
-def send_message(host, cluster, path, **kwargs):
- pass
+ def send_message(self, cluster, path, **kwargs):
+ pass
http://git-wip-us.apache.org/repos/asf/helix/blob/a714f002/contributors/py-helix-admin/helix/statemodeldefs.py
----------------------------------------------------------------------
diff --git a/contributors/py-helix-admin/helix/statemodeldefs.py b/contributors/py-helix-admin/helix/statemodeldefs.py
new file mode 100644
index 0000000..8446ae9
--- /dev/null
+++ b/contributors/py-helix-admin/helix/statemodeldefs.py
@@ -0,0 +1,39 @@
+from ordereddict import OrderedDict
+
+# These essentially come from the java classes defined here. It is cheesey and should probably come from a configuration file.
+# https://github.com/linkedin/helix/blob/master/helix-core/src/main/java/com/linkedin/helix/tools/StateModelConfigGenerator.java
+
+LEADER_STANDBY_STATE_DEF = OrderedDict()
+LEADER_STANDBY_STATE_DEF["id"] = "LeaderStandby"
+MAP_FIELDS = OrderedDict()
+LEADER_STANDBY_STATE_DEF["mapFields"] = MAP_FIELDS
+MAP_FIELDS["DROPPED.meta"] = { "count" : "-1" }
+MAP_FIELDS["LEADER.meta"] = { "count" : "1" }
+LEADER_NEXT = OrderedDict()
+MAP_FIELDS["LEADER.next"] = LEADER_NEXT
+LEADER_NEXT["DROPPED"] = "STANDBY"
+LEADER_NEXT["STANDBY"] = "STANDBY"
+LEADER_NEXT["OFFLINE"] = "STANDBY"
+MAP_FIELDS["OFFLINE.meta"] = { "count" : "-1" }
+OFFLINE_NEXT = OrderedDict()
+MAP_FIELDS["OFFLINE.next"] = OFFLINE_NEXT
+OFFLINE_NEXT["LEADER"] = "STANDBY"
+OFFLINE_NEXT["DROPPED"] = "DROPPED"
+OFFLINE_NEXT["STANDBY"] = "STANDBY"
+MAP_FIELDS["STANDBY.meta"] = { "count" : "R" }
+STANDBY_NEXT = OrderedDict()
+MAP_FIELDS["STANDBY.next"] = STANDBY_NEXT
+STANDBY_NEXT["LEADER"] = "LEADER"
+STANDBY_NEXT["DROPPED"] = "OFFLINE"
+STANDBY_NEXT["OFFLINE"] = "OFFLINE"
+LIST_FIELDS = OrderedDict()
+LEADER_STANDBY_STATE_DEF["listFields"] = LIST_FIELDS
+LIST_FIELDS["STATE_PRIORITY_LIST"] = [ "LEADER", "STANDBY", "OFFLINE", "DROPPED" ]
+LIST_FIELDS["STATE_TRANSITION_PRIORITYLIST"] = [ "LEADER-STANDBY", "STANDBY-LEADER", "OFFLINE-STANDBY", "STANDBY-OFFLINE", "OFFLINE-DROPPED" ]
+SIMPLE_FIELDS = OrderedDict()
+LEADER_STANDBY_STATE_DEF["simpleFields"] = SIMPLE_FIELDS
+SIMPLE_FIELDS["INITIAL_STATE"] = "OFFLINE"
+
+STATE_DEF_MAP = {
+ "LeaderStandby": LEADER_STANDBY_STATE_DEF
+}
http://git-wip-us.apache.org/repos/asf/helix/blob/a714f002/contributors/py-helix-admin/helix/test/test_helix.py
----------------------------------------------------------------------
diff --git a/contributors/py-helix-admin/helix/test/test_helix.py b/contributors/py-helix-admin/helix/test/test_helix.py
new file mode 100755
index 0000000..f466086
--- /dev/null
+++ b/contributors/py-helix-admin/helix/test/test_helix.py
@@ -0,0 +1,60 @@
+#!/usr/bin/env python2.6
+
+from helixexceptions import HelixException
+from helixexceptions import HelixAlreadyExistsException
+from helixexceptions import HelixDoesNotExistException
+
+from participant import Participant
+from partition import Partition
+from resourcegroup import ResourceGroup
+
+from functions import RestHelixFunctions
+from zkfunctions import ZookeeperHelixFunctions
+
+from cluster import ZKCluster
+from cluster import Cluster
+
+import pytest
+import random
+
+INSTANCE_ID = "fake_12345"
+INSTANCE_NAME = "fake"
+INSTANCE_PORT = 12345
+PARTITION_COUNT = 5
+REPLICA_COUNT = 1
+STATEMODELDEF = "LeaderStandby"
+REBALANCE_MODE = "FULL_AUTO"
+RESOURCE_NAME = "fake_resource"
+TAG_NAME = "fake_tag"
+
+CLUSTER_ID = "helix_{id}"
+ZOOKEEPER_ROOT = "/testing_helix"
+ZOOKEEPER_HOST = "localhost:2181"
+REST_HOST = "localhost:8100"
+
+class TestHelixAdmin(object):
+ #@pytest.mark.int
+ def test_zookeeper_cluster(self):
+ cluster = ZKCluster(ZOOKEEPER_HOST, ZOOKEEPER_ROOT, self._get_cluster_name())
+ self._cluster_actions(cluster)
+
+ #@pytest.mark.int
+ def test_rest_cluster(self):
+ cluster = Cluster(REST_HOST, self._get_cluster_name())
+ self._cluster_actions(cluster)
+
+ def _get_cluster_name(self):
+ return CLUSTER_ID.format(id=random.randint(1, 1000000))
+
+ def _cluster_actions(self, cluster):
+ cluster.add_cluster()
+ cluster.add_resource(RESOURCE_NAME, PARTITION_COUNT, STATEMODELDEF, mode=REBALANCE_MODE)
+ cluster.add_instance(INSTANCE_NAME, INSTANCE_PORT)
+ cluster.add_instance_tag(INSTANCE_ID, TAG_NAME)
+ cluster.add_resource_tag(RESOURCE_NAME, TAG_NAME)
+ cluster.rebalance(RESOURCE_NAME, REPLICA_COUNT)
+ participant = cluster.participants.get(INSTANCE_ID)
+ cluster.del_instance(participant)
+ resource = cluster.resources.get(RESOURCE_NAME)
+ cluster.del_resource(resource)
+ cluster.del_cluster()
http://git-wip-us.apache.org/repos/asf/helix/blob/a714f002/contributors/py-helix-admin/helix/zkfunctions.py
----------------------------------------------------------------------
diff --git a/contributors/py-helix-admin/helix/zkfunctions.py b/contributors/py-helix-admin/helix/zkfunctions.py
new file mode 100644
index 0000000..aed2b74
--- /dev/null
+++ b/contributors/py-helix-admin/helix/zkfunctions.py
@@ -0,0 +1,522 @@
+"""library to handle helix commands"""
+# XXX: Explore using zookeeper transactions for some of the write operations into zookeeper.
+# Currently, it looks like ensure_path and create with the make_path argument are not supported in transactions so it isn't usable out of the box.
+import json
+from ordereddict import OrderedDict
+from kazoo.client import KazooClient
+
+from statemodeldefs import STATE_DEF_MAP
+from helixexceptions import HelixException
+from helixexceptions import HelixAlreadyExistsException
+from helixexceptions import HelixDoesNotExistException
+from kazoo.exceptions import NodeExistsError
+
+RESOURCE_MODES = ["FULL_AUTO", "CUSTOMIZED", "SEMI_AUTO", "USER_DEFINED"]
+
+IDEAL_STATE_PATH = "/{clusterName}/IDEALSTATES"
+RESOURCE_IDEAL_STATE_PATH = "/{clusterName}/IDEALSTATES/{resourceName}"
+EXTERNAL_VIEW_STATE_PATH = "/{clusterName}/EXTERNALVIEW/{resourceName}"
+INSTANCE_PATH = "/{clusterName}/INSTANCES"
+PARTICIPANT_CONFIG_PATH = "/{clusterName}/CONFIGS/PARTICIPANT/{instanceName}"
+PARTICIPANTS_CONFIG_PATH = "/{clusterName}/CONFIGS/PARTICIPANT"
+CLUSTER_CONFIG_PATH = "/{clusterName}/CONFIGS/CLUSTER/{clusterName}"
+CONFIG_PATH = "/{clusterName}/CONFIGS/{configName}/{entityName}"
+STATE_MODEL_DEF_PATH = "/{clusterName}/STATEMODELDEFS/{stateModelName}"
+LIVE_INSTANCE_PATH = "/{clusterName}/LIVEINSTANCES/{instanceName}"
+
+HELIX_ZOOKEEPER_PATHS = {
+ "cluster": [
+ "/{clusterName}/CONFIGS",
+ "/{clusterName}/CONFIGS/RESOURCE",
+ "/{clusterName}/CONFIGS/CLUSTER",
+ PARTICIPANTS_CONFIG_PATH,
+ "/{clusterName}/LIVEINSTANCES",
+ INSTANCE_PATH,
+ IDEAL_STATE_PATH,
+ #"/{clusterName}/RESOURCEASSIGNMENTS",
+ "/{clusterName}/EXTERNALVIEW",
+ "/{clusterName}/STATEMODELDEFS",
+ "/{clusterName}/CONTROLLER",
+ "/{clusterName}/CONTROLLER/HISTORY",
+ "/{clusterName}/CONTROLLER/ERRORS",
+ "/{clusterName}/CONTROLLER/MESSAGES",
+ "/{clusterName}/CONTROLLER/STATUSUPDATES",
+ "/{clusterName}/PROPERTYSTORE",
+ ],
+ "resource": [
+ RESOURCE_IDEAL_STATE_PATH,
+ "/{clusterName}/RESOURCEASSIGNMENTS/{resourceName}",
+ EXTERNAL_VIEW_STATE_PATH
+ ],
+ "instance": [
+ #"/{clusterName}/LIVEINSTANCES/{instanceName}",
+ "/{clusterName}/INSTANCES/{instanceName}",
+ "/{clusterName}/INSTANCES/{instanceName}/CURRENTSTATES",
+ "/{clusterName}/INSTANCES/{instanceName}/ERRORS",
+ "/{clusterName}/INSTANCES/{instanceName}/STATUSUPDATES",
+ "/{clusterName}/INSTANCES/{instanceName}/MESSAGES"
+ ],
+ "statemodel": [
+ STATE_MODEL_DEF_PATH
+ ]
+}
+
+CLUSTER_CONFIG_TEMPLATE = OrderedDict()
+CLUSTER_CONFIG_TEMPLATE["id"] = "{clusterName}"
+CLUSTER_CONFIG_TEMPLATE["mapFields"] = {}
+CLUSTER_CONFIG_TEMPLATE["listFields"] = {}
+CLUSTER_CONFIG_TEMPLATE["simpleFields"] = {"allowParticipantAutoJoin": "true"}
+
+
+class ZookeeperHelixFunctions(object):
+ """Zookeeper based client to manage helix clusters"""
+ def __init__(self, zookeeper_connect_string, zk_root):
+ """Constructor."""
+ self.zk = KazooClient(hosts=zookeeper_connect_string)
+ self.zk.start()
+ self.zk_root = zk_root
+
+ def _list_path(self, path):
+ """List a zookeeper path."""
+ return self.zk.get_children(path)
+
+ def _is_valid_cluster(self, cluster):
+ """Validate cluster configuration."""
+ for path in HELIX_ZOOKEEPER_PATHS.get("cluster"):
+ full_path = self._build_path(path.format(clusterName=cluster))
+ if not self.zk.exists(full_path):
+ return False
+ return True
+
+ def _build_path(self, path):
+ """Construct zookeeper path."""
+ return "".join([self.zk_root, path])
+
+ @classmethod
+ def _build_instance_entry(cls, instance, enabled="true"):
+ """Create the data entry for an instance."""
+ host, port = instance.split(":")
+ instance_data = OrderedDict()
+ instance_data["id"] = "{host}_{port}".format(host=host, port=port)
+ instance_data["listFields"] = {}
+ instance_data["mapFields"] = {}
+ instance_data["simpleFields"] = OrderedDict()
+ instance_data["simpleFields"]["HELIX_ENABLED"] = enabled
+ instance_data["simpleFields"]["HELIX_HOST"] = host
+ instance_data["simpleFields"]["HELIX_PORT"] = port
+ return instance_data
+
+ def create_root(self):
+ """Initialize zookeeper root"""
+ path = self._build_path("")
+ if not self.zk.exists(path):
+ self.zk.create(path)
+ return True
+
+ def get_clusters(self):
+ """ querys helix cluster for all clusters """
+ if self.zk.exists(self.zk_root):
+ return [ cluster for cluster in self._list_path(self.zk_root) if self._is_valid_cluster(cluster) ]
+ else:
+ return []
+
+ def get_resource_groups(self, cluster):
+ """ querys helix cluster for resources groups of the current cluster"""
+ return self._list_path(self._build_path(IDEAL_STATE_PATH.format(clusterName=cluster)))
+
+ def get_resource_tags(self, cluster):
+ """returns a dict of resource tags for a cluster"""
+ resource_tags = {}
+ for resource in self.get_resource_groups(cluster):
+ resource_data, resource_meta = self._get_resource_group(cluster, resource)
+ tag = resource_data.get("INSTANCE_GROUP_TAG")
+ if tag:
+ resource_tags[tag] = [resource]
+
+ return resource_tags
+
+ def _get_resource_group(self, cluster, resource):
+ """ gets the ideal state of the specified resource group of the
+ current cluster"""
+
+ if resource not in self.get_resource_groups(cluster):
+ raise HelixException(
+ "{resource} is not a resource group of {cluster}".format(resource=resource, cluster=cluster))
+
+ data, stat = self.zk.get(self._build_path(RESOURCE_IDEAL_STATE_PATH.format(clusterName=cluster, resourceName=resource)))
+ return (json.loads(data), stat)
+
+ def get_resource_group(self, cluster, resource):
+ """ COMPAT: gets the ideal state of the specified resource group of the
+ current cluster"""
+
+ return self._get_resource_group(cluster, resource)[0]
+
+ def _get_ideal_state(self, cluster, resource):
+ """ gets the ideal state of the specified resource group of the
+ current cluster"""
+
+ if resource not in self.get_resource_groups(cluster):
+ raise HelixException(
+ "{0} is not a resource group of {1}".format(resource, cluster))
+
+ return self._get_resource_group(cluster, resource)["mapFields"]
+
+ def get_ideal_state(self, cluster, resource):
+ """ COMPAT: gets the ideal state of the specified resource group of the
+ current cluster"""
+
+ return self._get_ideal_state(cluster, resource)[0]
+
+ def _get_external_view(self, cluster, resource):
+ """return the external view for a given cluster and resource"""
+ if resource not in self.get_resource_groups(cluster):
+ raise HelixException(
+ "{0} is not a resource group of {1}".format(resource, cluster))
+ data, stat = self.zk.get(self._build_path(EXTERNAL_VIEW_STATE_PATH.format(clusterName=cluster, resourceName=resource)))
+ return (json.loads(data)["mapFields"], stat)
+
+ def get_external_view(self, cluster, resource):
+ """ COMPAT: return the external view for a given cluster and resource"""
+ return self._get_external_view(cluster, resource)[0]
+
+ def get_instances(self, cluster):
+ """get list of instances registered to the cluster"""
+ if not cluster:
+ raise HelixException("Cluster must be set before "
+ "calling this function")
+
+ instances = []
+ for instance in self._list_path(self._build_path(PARTICIPANTS_CONFIG_PATH.format(clusterName=cluster))):
+ instance_data = json.loads(self.zk.get(self._build_path(PARTICIPANT_CONFIG_PATH.format(clusterName=cluster, instanceName=instance)))[0])
+ if self.zk.exists(self._build_path(LIVE_INSTANCE_PATH.format(clusterName=cluster, instanceName=instance))):
+ instance_data["simpleFields"]["Alive"] = "true"
+ else:
+ instance_data["simpleFields"]["Alive"] = "false"
+ instances.append(instance_data)
+ return instances
+
+ def _get_instance_detail(self, cluster, instance):
+ """get details of an instance"""
+ data, stat = self.zk.get(self._build_path(PARTICIPANT_CONFIG_PATH.format(clusterName=cluster, instanceName=instance)))
+ return (json.loads(data), stat)
+
+ def get_instance_detail(self, cluster, instance):
+ """ COMPAT: get details of an instance"""
+ return self._get_instance_detail(cluster, instance)[0]
+
+ def _get_config(self, cluster, config, entity):
+ """get requested config"""
+ data, stat = self.zk.get(self._build_path(CONFIG_PATH.format(clusterName=cluster, configName=config, entityName=entity)))
+ return (json.loads(data), stat)
+
+ def get_config(self, cluster, config, entity):
+ """ COMPAT: get requested config"""
+ return self._get_config(cluster, config, entity)[0]
+
+ def add_cluster(self, cluster):
+ """add a cluster to helix"""
+ if cluster in self.get_clusters():
+ raise HelixAlreadyExistsException(
+ "Cluster {0} already exists".format(cluster))
+
+ for path in HELIX_ZOOKEEPER_PATHS.get("cluster"):
+ self.zk.ensure_path(self._build_path(path.format(clusterName=cluster)))
+
+ data = CLUSTER_CONFIG_TEMPLATE
+ data["id"] = cluster
+
+ try:
+ self.zk.create(self._build_path(CLUSTER_CONFIG_PATH.format(clusterName=cluster)), json.dumps(data))
+ except NodeExistsError:
+ # Ignore existing cluster
+ pass
+
+ # Insert state defs if they don't exist
+ for state_def in STATE_DEF_MAP:
+ if not self.zk.exists(self._build_path(STATE_MODEL_DEF_PATH.format(clusterName=cluster, stateModelName=state_def))):
+ self.zk.create(self._build_path(STATE_MODEL_DEF_PATH.format(clusterName=cluster, stateModelName=state_def)), json.dumps(STATE_DEF_MAP[state_def]))
+
+ return True
+
+ def add_instance(self, cluster, instances, port):
+ """add a list of instances to a cluster"""
+ if cluster not in self.get_clusters():
+ raise HelixDoesNotExistException(
+ "Cluster {0} does not exist".format(cluster))
+
+ if not isinstance(instances, list):
+ instances = [instances]
+ instances = ["{instance}:{port}".format(instance=instance, port=port) for instance in instances]
+ try:
+ newinstances = set(instances)
+ oldinstances = set(
+ [x["id"].replace('_', ':') for x in self.get_instances(cluster)])
+ instances = list(newinstances - oldinstances)
+ except HelixException:
+ # this will get thrown if instances is empty,
+ # which if we're just populating should happen
+ pass
+
+ if instances:
+ for instance in instances:
+ data = self._build_instance_entry(instance)
+ self.zk.create(self._build_path(PARTICIPANT_CONFIG_PATH.format(clusterName=cluster, instanceName=instance.replace(':', '_'))), json.dumps(data))
+ for path in HELIX_ZOOKEEPER_PATHS.get("instance"):
+ self.zk.ensure_path(self._build_path(path.format(clusterName=cluster, instanceName=instance.replace(':', '_'))))
+ return True
+ else:
+ raise HelixAlreadyExistsException(
+ "All instances given already exist in cluster")
+
+
+ def rebalance(self, cluster, resource, replicas, key=""):
+ """rebalance the given resource group"""
+ if resource not in self.get_resource_groups(cluster):
+ raise HelixException(
+ "{0} is not a resource group of {1}".format(resource, cluster))
+
+ # TODO: key usage is currently not supported.
+ if not key == "":
+ raise NotImplementedError
+
+ resource_data, resource_meta = self._get_resource_group(cluster, resource)
+ resource_data["simpleFields"]["REPLICAS"] = replicas
+ self.zk.set(self._build_path(RESOURCE_IDEAL_STATE_PATH.format(clusterName=cluster, resourceName=resource)), json.dumps(resource_data))
+
+ return True
+
+ def activate_cluster(self, cluster, grand_cluster, enabled=True):
+ """activate the cluster with the grand cluster"""
+ if grand_cluster not in self.get_clusters():
+ raise HelixException(
+ "grand cluster {0} does not exist".format(grand_cluster))
+
+ raise NotImplementedError
+
+ def deactivate_cluster(self, cluster, grand_cluster):
+ """deactivate the cluster with the grand cluster"""
+ return self.activate_cluster(cluster, grand_cluster, enabled=False)
+
+
+ def add_resource(self, cluster, resource, partitions,
+ state_model_def, mode="", state_model_factory_name="DEFAULT"):
+ """Add given resource group"""
+ if resource in self.get_resource_groups(cluster):
+ raise HelixAlreadyExistsException(
+ "ResourceGroup {0} already exists".format(resource))
+
+ data = {"id": resource,
+ "mapFields": {},
+ "listFields": {},
+ "simpleFields": {
+ "IDEAL_STATE_MODE": "AUTO",
+ "NUM_PARTITIONS": partitions,
+ "REBALANCE_MODE": mode,
+ "REPLICAS": "0",
+ "STATE_MODEL_DEF_REF": state_model_def,
+ "STATE_MODEL_FACTORY_NAME": state_model_factory_name
+ }
+ }
+
+ if mode:
+ if mode in RESOURCE_MODES:
+ data["mode"] = mode
+ else:
+ raise ValueError("Invalid mode ({mode})".format(mode=mode))
+
+ self.zk.create(self._build_path(RESOURCE_IDEAL_STATE_PATH.format(clusterName=cluster, resourceName=resource)), json.dumps(data))
+ return True
+
+ def enable_resource(self, cluster, resource, enabled=True):
+ """enable or disable specified resource"""
+ raise NotImplementedError
+
+ def disable_resource(self, cluster, resource):
+ """function for disabling resources"""
+ return self.enable_resource(cluster, resource, enabled=False)
+
+ def alter_ideal_state(self, cluster, resource, newstate):
+ """alter ideal state"""
+ raise NotImplementedError
+
+ def enable_instance(self, cluster, instance, enabled=True):
+ """enable instance within cluster"""
+ raise NotImplementedError
+
+ def disable_instance(self, cluster, instance):
+ """wrapper for ease of use for disabling an instance"""
+ return self.enable_instance(cluster, instance, enabled=False)
+
+ def swap_instance(self, cluster, old, new):
+ """swap instance"""
+ raise NotImplementedError
+
+ def enable_partition(self, cluster, resource, partition, instance,
+ enabled=True):
+ """enable Partition """
+ if resource not in self.get_resource_groups(cluster):
+ raise HelixDoesNotExistException(
+ "ResourceGroup {0} does not exist".format(resource))
+ raise NotImplementedError
+
+ def disable_partition(self, cluster, resource, partitions, instance):
+ """disable Partition """
+ return self.enable_partition(cluster, resource, partitions, instance,
+ enabled=False)
+
+ def reset_partition(self, cluster, resource, partitions, instance):
+ """reset partition"""
+ if resource not in self.get_resource_groups(cluster):
+ raise HelixDoesNotExistException(
+ "ResourceGroup {0} does not exist".format(resource))
+
+ raise NotImplementedError
+
+ def reset_resource(self, cluster, resource):
+ """reset resource"""
+ if resource not in self.get_resource_groups(cluster):
+ raise HelixDoesNotExistException(
+ "ResourceGroup {0} does not exist".format(resource))
+
+ raise NotImplementedError
+
+ def reset_instance(self, cluster, instance):
+ """reset instance"""
+ if instance not in self.get_instances(cluster):
+ raise HelixDoesNotExistException(
+ "Instance {0} does not exist".format(instance))
+
+ raise NotImplementedError
+
+ def add_instance_tag(self, cluster, instance, tag):
+ """add tag to an instance"""
+ instance_data, instance_meta = self._get_instance_detail(cluster, instance)
+ instance_tags = instance_data.get("listFields").get("TAG_LIST", [])
+ if tag in instance_tags:
+ raise HelixAlreadyExistsException(
+ "Tag ({tag}) already exists for instance ({instance}).".format(tag=tag, instance=instance))
+
+ instance_tags.append(tag)
+ instance_data["listFields"]["TAG_LIST"] = instance_tags
+
+ # XXX: Apply some retry logic here
+ self.zk.set(self._build_path(PARTICIPANT_CONFIG_PATH.format(clusterName=cluster, instanceName=instance)), json.dumps(instance_data), version=instance_meta.version)
+ return True
+
+ def del_instance_tag(self, cluster, instance, tag):
+ """remove tag from instance"""
+ if instance not in [x["id"] for x in self.get_instances(cluster)]:
+ raise HelixDoesNotExistException(
+ "Instance {0} does not exist.".format(instance))
+
+ def add_resource_tag(self, cluster, resource, tag):
+ """add tag to resource group"""
+ if resource not in self.get_resource_groups(cluster):
+ raise HelixDoesNotExistException(
+ "ResourceGroup {0} does not exist".format(resource))
+
+ resource_data, resource_stat = self._get_resource_group(cluster, resource)
+ resource_data["simpleFields"]["INSTANCE_GROUP_TAG"] = tag
+
+ self.zk.set(self._build_path(RESOURCE_IDEAL_STATE_PATH.format(clusterName=cluster, resourceName=resource)), json.dumps(resource_data), version=resource_stat.version)
+ return True
+
+ def del_resource_tag(self, cluster, resource, tag):
+ """Delete resource tag."""
+ if resource not in self.get_resource_groups(cluster):
+ raise HelixDoesNotExistException(
+ "ResourceGroup {0} does not exist".format(resource))
+ raise NotImplementedError
+
+ def get_instance_taginfo(self, cluster):
+ """Get resource tag info."""
+ instance_tags = {}
+ for instance in self.get_instances(cluster):
+ list_fields = instance.get("listFields")
+ if "TAG_LIST" in list_fields:
+ for tag in list_fields.get("TAG_LIST"):
+ if tag in instance_tags:
+ instance_tags[tag].append(instance.get("id"))
+ else:
+ instance_tags[tag] = [instance.get("id")]
+ return instance_tags
+
+ def expand_cluster(self, cluster):
+ """expand cluster"""
+ raise NotImplementedError
+
+ def expand_resource(self, cluster, resource):
+ """expand resource"""
+ raise NotImplementedError
+
+ def add_resource_property(self, cluster, resource, properties):
+ """Add resource property. Properties must be a dictionary of properties."""
+ raise NotImplementedError
+
+ def set_config(self, cluster, configs, participant=None, resource=None):
+ """sets config in helix"""
+ raise NotImplementedError
+
+ def remove_config(self, cluster, configs, participant=None, resource=None):
+ """sets config in helix"""
+ raise NotImplementedError
+
+ def get_zk_path(self, path):
+ """get zookeeper path"""
+ return self.zk.get(self._build_path(path))
+
+ def del_zk_path(self, path):
+ """delete zookeeper path"""
+ return self.zk.delete(self._build_path(path))
+
+ def add_state_model(self, cluster, newstate):
+ """add state model"""
+ raise NotImplementedError
+
+ def del_instance(self, cluster, instance):
+ """delete instance"""
+ if cluster not in self.get_clusters():
+ raise HelixDoesNotExistException(
+ "Cluster {0} does not exist.".format(cluster))
+
+ if instance not in [x["id"] for x in self.get_instances(cluster)]:
+ raise HelixDoesNotExistException(
+ "Instance {0} does not exist.".format(instance))
+
+ self.zk.delete(self._build_path(PARTICIPANT_CONFIG_PATH.format(clusterName=cluster, instanceName=instance.replace(':', '_'))))
+
+ # Reverse zookeeper structure for destruction.
+ for path in HELIX_ZOOKEEPER_PATHS.get("instance")[::-1]:
+ self.zk.delete(self._build_path(path.format(clusterName=cluster, instanceName=instance.replace(':', '_'))))
+ return True
+
+ def del_resource(self, cluster, resource):
+ """delete specified resource from cluster"""
+ if cluster not in self.get_clusters():
+ raise HelixDoesNotExistException(
+ "Cluster {0} does not exist.".format(cluster))
+
+ if resource not in self.get_resource_groups(cluster):
+ raise HelixDoesNotExistException(
+ "ResourceGroup {0} does not exist".format(resource))
+
+ self.zk.delete(self._build_path(RESOURCE_IDEAL_STATE_PATH.format(clusterName=cluster, resourceName=resource)))
+ return True
+
+ def del_cluster(self, cluster):
+ """delete cluster"""
+ if cluster not in self.get_clusters():
+ raise HelixDoesNotExistException(
+ "Cluster {0} does not exist.".format(cluster))
+
+ self.zk.delete(self._build_path(CLUSTER_CONFIG_PATH.format(clusterName=cluster)))
+
+ for path in HELIX_ZOOKEEPER_PATHS.get("cluster")[::-1]:
+ self.zk.ensure_path(self._build_path(path.format(clusterName=cluster)))
+
+ return True
+
+ def send_message(self, cluster, path, **kwargs):
+ """Send helix IPC message."""
+ raise NotImplementedError
[2/4] helix git commit: Adding license to new files.
Posted by ka...@apache.org.
Adding license to new files.
Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/c5590115
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/c5590115
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/c5590115
Branch: refs/heads/master
Commit: c55901151237a137ef643143ffbb9dda204dc861
Parents: a714f00
Author: Casey Miller <ca...@linkedin.com>
Authored: Sun Feb 15 11:57:36 2015 -0800
Committer: Casey Miller <ca...@linkedin.com>
Committed: Sun Feb 15 11:57:36 2015 -0800
----------------------------------------------------------------------
.../py-helix-admin/helix/statemodeldefs.py | 18 ++++++++++++++++++
.../py-helix-admin/helix/test/test_helix.py | 18 ++++++++++++++++++
contributors/py-helix-admin/helix/zkfunctions.py | 18 ++++++++++++++++++
3 files changed, 54 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/helix/blob/c5590115/contributors/py-helix-admin/helix/statemodeldefs.py
----------------------------------------------------------------------
diff --git a/contributors/py-helix-admin/helix/statemodeldefs.py b/contributors/py-helix-admin/helix/statemodeldefs.py
index 8446ae9..5c0fc8a 100644
--- a/contributors/py-helix-admin/helix/statemodeldefs.py
+++ b/contributors/py-helix-admin/helix/statemodeldefs.py
@@ -1,3 +1,21 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
from ordereddict import OrderedDict
# These essentially come from the java classes defined here. It is cheesey and should probably come from a configuration file.
http://git-wip-us.apache.org/repos/asf/helix/blob/c5590115/contributors/py-helix-admin/helix/test/test_helix.py
----------------------------------------------------------------------
diff --git a/contributors/py-helix-admin/helix/test/test_helix.py b/contributors/py-helix-admin/helix/test/test_helix.py
index f466086..752c4e0 100755
--- a/contributors/py-helix-admin/helix/test/test_helix.py
+++ b/contributors/py-helix-admin/helix/test/test_helix.py
@@ -1,4 +1,22 @@
#!/usr/bin/env python2.6
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
from helixexceptions import HelixException
from helixexceptions import HelixAlreadyExistsException
http://git-wip-us.apache.org/repos/asf/helix/blob/c5590115/contributors/py-helix-admin/helix/zkfunctions.py
----------------------------------------------------------------------
diff --git a/contributors/py-helix-admin/helix/zkfunctions.py b/contributors/py-helix-admin/helix/zkfunctions.py
index aed2b74..8d389e8 100644
--- a/contributors/py-helix-admin/helix/zkfunctions.py
+++ b/contributors/py-helix-admin/helix/zkfunctions.py
@@ -1,3 +1,21 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
"""library to handle helix commands"""
# XXX: Explore using zookeeper transactions for some of the write operations into zookeeper.
# Currently, it looks like ensure_path and create with the make_path argument are not supported in transactions so it isn't usable out of the box.
[4/4] helix git commit: Fixing pylint issues with cluster.py.
Posted by ka...@apache.org.
Fixing pylint issues with cluster.py.
Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/80a4a13f
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/80a4a13f
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/80a4a13f
Branch: refs/heads/master
Commit: 80a4a13fd9b079e44737f06b7239bf14c7cbf3f1
Parents: 4bb86d8
Author: Casey Miller <ca...@linkedin.com>
Authored: Sun Feb 15 12:10:58 2015 -0800
Committer: Casey Miller <ca...@linkedin.com>
Committed: Sun Feb 15 12:10:58 2015 -0800
----------------------------------------------------------------------
contributors/py-helix-admin/helix/cluster.py | 15 ++++++++-------
1 file changed, 8 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/helix/blob/80a4a13f/contributors/py-helix-admin/helix/cluster.py
----------------------------------------------------------------------
diff --git a/contributors/py-helix-admin/helix/cluster.py b/contributors/py-helix-admin/helix/cluster.py
index 6b45557..e4d8b31 100644
--- a/contributors/py-helix-admin/helix/cluster.py
+++ b/contributors/py-helix-admin/helix/cluster.py
@@ -25,10 +25,10 @@ from resourcegroup import ResourceGroup
from helixexceptions import HelixException
from functions import RestHelixFunctions
try:
- from zkfunctions import ZookeeperHelixFunctions
- zookeeper_ok = True
+ from zkfunctions import ZookeeperHelixFunctions
+ zookeeper_ok = True
except ImportError:
- zookeeper_ok = False
+ zookeeper_ok = False
class BaseCluster(object):
@@ -39,6 +39,7 @@ class BaseCluster(object):
def __init__(self, cluster):
super(BaseCluster, self).__init__()
self.cluster = cluster
+ self.functions = None
# dynamically loaded data below
self._partitions = {}
@@ -367,9 +368,9 @@ class BaseCluster(object):
class Cluster(BaseCluster):
def __init__(self, host, cluster):
- super(Cluster, self).__init__(cluster)
- self.host = host
- self.functions = RestHelixFunctions(host)
+ super(Cluster, self).__init__(cluster)
+ self.host = host
+ self.functions = RestHelixFunctions(host)
class ZKCluster(BaseCluster):
@@ -378,7 +379,7 @@ class ZKCluster(BaseCluster):
# We want to fail if kazoo cannot be found, but only if using the zookeeper object.
if not zookeeper_ok:
- raise ImportError
+ raise ImportError
self.zookeeper_connect_string = zookeeper_connect_string
self.zookeeper_root = zookeeper_root
[3/4] helix git commit: Fixing spacing on multi line arguments.
Posted by ka...@apache.org.
Fixing spacing on multi line arguments.
Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/4bb86d84
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/4bb86d84
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/4bb86d84
Branch: refs/heads/master
Commit: 4bb86d84673124b1b6c9474d02e5825edc44e22b
Parents: c559011
Author: Casey Miller <ca...@linkedin.com>
Authored: Sun Feb 15 12:02:41 2015 -0800
Committer: Casey Miller <ca...@linkedin.com>
Committed: Sun Feb 15 12:02:41 2015 -0800
----------------------------------------------------------------------
contributors/py-helix-admin/helix/cluster.py | 18 +++++++++---------
1 file changed, 9 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/helix/blob/4bb86d84/contributors/py-helix-admin/helix/cluster.py
----------------------------------------------------------------------
diff --git a/contributors/py-helix-admin/helix/cluster.py b/contributors/py-helix-admin/helix/cluster.py
index 175bff8..6b45557 100644
--- a/contributors/py-helix-admin/helix/cluster.py
+++ b/contributors/py-helix-admin/helix/cluster.py
@@ -59,7 +59,7 @@ class BaseCluster(object):
try:
for cur_resource in self.functions.get_resource_groups(self.cluster):
data = self.functions.get_resource_group(self.cluster,
- cur_resource)
+ cur_resource)
name = data["id"]
count = data["simpleFields"]["NUM_PARTITIONS"]
replicas = data["simpleFields"]["REPLICAS"]
@@ -132,7 +132,7 @@ class BaseCluster(object):
self._partitions = {}
for resource in self.resources:
newstate = self.functions.get_ideal_state(self.cluster,
- resource)
+ resource)
self._partitions[resource] = {}
if newstate:
for part in newstate:
@@ -201,7 +201,7 @@ class BaseCluster(object):
if isinstance(resource, ResourceGroup):
rname = resource.name
return self.functions.set_config(self.cluster, config,
- resource=rname)
+ resource=rname)
def set_participant_config(self, config, participant):
pname = participant
@@ -209,12 +209,12 @@ class BaseCluster(object):
pname = participant.ident
""" set given configs in helix"""
return self.functions.set_config(self.cluster, config,
- participant=pname)
+ participant=pname)
def activate_cluster(self, grand, enabled=True):
"""activate this cluster with the specified grand cluster"""
return self.functions.activate_cluster(self.cluster, grand,
- enabled)
+ enabled)
def deactivate_cluster(self, grand):
"""deactivate this cluster against the given grandcluster"""
@@ -236,7 +236,7 @@ class BaseCluster(object):
def add_resource(self, resource, partitions, state_model_def, mode=""):
"""add resource to cluster"""
return self.functions.add_resource(self.cluster, resource,
- partitions, state_model_def, mode)
+ partitions, state_model_def, mode)
def enable_instance(self, instance, enabled=True):
"""enable instance, assumes instance a participant object"""
@@ -248,7 +248,7 @@ class BaseCluster(object):
else:
raise HelixException("Instance must be a string or participant")
return self.functions.enable_instance(self.cluster, ident,
- enabled)
+ enabled)
def disable_instance(self, instance):
"""disable instance, assumes instance is a participant object"""
@@ -294,7 +294,7 @@ class BaseCluster(object):
"Resource must be a string or a resource group object")
return self.functions.enable_resource(self.cluster,
- resource_name, enabled)
+ resource_name, enabled)
def disable_resource(self, resource):
"""disable given function"""
@@ -311,7 +311,7 @@ class BaseCluster(object):
raise HelixException("Resource must be resource object or string")
return self.functions.add_resource_tag(self.cluster,
- resource_name, tag)
+ resource_name, tag)
# del resource not yet available in api
# def del_resource_tag(self, resource, tag):