You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by cf...@apache.org on 2021/10/03 18:49:45 UTC
[mesos] branch master updated: remove tasks limit and condition.
add global functions.
This is an automated email from the ASF dual-hosted git repository.
cfnatali pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git
The following commit(s) were added to refs/heads/master by this push:
new 067fb9f remove tasks limit and condition. add global functions.
067fb9f is described below
commit 067fb9f512a8638b1070b23462c43aec4ca6f370
Author: Andreas Peters <ap...@aventer.biz>
AuthorDate: Tue Sep 21 12:03:36 2021 +0200
remove tasks limit and condition. add global functions.
remove unneeded empty lines
FIX test script.
REMOVE: not needed newline.
FIX: Missing config.
MODIFY text length.
---
src/python/cli_new/lib/cli/http.py | 14 ++------
src/python/cli_new/lib/cli/mesos.py | 40 ++++++++++++++++-----
src/python/cli_new/lib/cli/tests/agent.py | 3 +-
src/python/cli_new/lib/cli/tests/base.py | 59 +++++++++++++++----------------
4 files changed, 65 insertions(+), 51 deletions(-)
diff --git a/src/python/cli_new/lib/cli/http.py b/src/python/cli_new/lib/cli/http.py
index c39935f..ea1ff89 100644
--- a/src/python/cli_new/lib/cli/http.py
+++ b/src/python/cli_new/lib/cli/http.py
@@ -64,13 +64,11 @@ def read_endpoint(addr, endpoint, config, query=None):
.format(url=url, error=str(exception)))
-def get_json(addr, endpoint, config, condition=None, query=None):
+def get_json(addr, endpoint, config, query=None):
"""
- Return the contents of the 'endpoint' at 'addr' as JSON data
- subject to the condition specified in 'condition'. If we are
- unable to read the data we throw an error.
+ Return the contents of the 'endpoint' at 'addr' as JSON data.
+ If we are unable to read the data we throw an error.
"""
-
data = read_endpoint(addr, endpoint, config, query)
try:
@@ -79,10 +77,4 @@ def get_json(addr, endpoint, config, condition=None, query=None):
raise CLIException("Could not load JSON from '{data}': {error}"
.format(data=data, error=str(exception)))
- if not condition:
- return data
-
- if condition(data):
- return data
-
return data
diff --git a/src/python/cli_new/lib/cli/mesos.py b/src/python/cli_new/lib/cli/mesos.py
index 44a66db..a1055b3 100644
--- a/src/python/cli_new/lib/cli/mesos.py
+++ b/src/python/cli_new/lib/cli/mesos.py
@@ -49,13 +49,8 @@ def get_agent_address(agent_id, master, config):
Given a master and an agent id, return the agent address
by checking the /slaves endpoint of the master.
"""
- try:
- agents = http.get_json(master, "slaves", config)["slaves"]
- except Exception as exception:
- raise CLIException("Could not open '/slaves'"
- " endpoint at '{addr}': {error}"
- .format(addr=master,
- error=exception))
+ agents = get_agents(master, config)
+
for agent in agents:
if agent["id"] == agent_id:
return agent["pid"].split("@")[1]
@@ -68,6 +63,35 @@ def get_agents(master, config):
"""
endpoint = "slaves"
key = "slaves"
+ return get_key_endpoint(key, endpoint, master, config)
+
+def get_framework_address(framework_id, master, config):
+ """
+ Given a master and an framework id, return the framework address
+ by checking the /master/frameworks endpoint of the master.
+ """
+ frameworks = get_frameworks(master, config)
+
+ for framework in frameworks:
+ if framework["id"] == framework_id:
+ return framework["webui_url"]
+ raise CLIException("Unable to find framework '{id}'"
+ .format(id=framework_id))
+
+
+def get_frameworks(master, config):
+ """
+ Get the frameworks in a Mesos cluster.
+ """
+ endpoint = "master/frameworks/"
+ key = "frameworks"
+ return get_key_endpoint(key, endpoint, master, config)
+
+
+def get_key_endpoint(key, endpoint, master, config):
+ """
+ Get the json key of the given endpoint
+ """
try:
data = http.get_json(master, endpoint, config)
except Exception as exception:
@@ -121,7 +145,7 @@ def get_tasks(master, config, query=None):
key = "tasks"
if query is None:
- query = {'order':'asc'}
+ query = {'order':'asc', 'limit':'-1'}
try:
data = http.get_json(master, endpoint, config, query=query)
diff --git a/src/python/cli_new/lib/cli/tests/agent.py b/src/python/cli_new/lib/cli/tests/agent.py
index 8ff6842..57a2742 100644
--- a/src/python/cli_new/lib/cli/tests/agent.py
+++ b/src/python/cli_new/lib/cli/tests/agent.py
@@ -48,7 +48,8 @@ class TestAgentPlugin(CLITestCase):
# Open the master's `/slaves` endpoint and read the
# agents' information ourselves.
- agents = http.get_json(master.addr, None, 'slaves')["slaves"]
+ agents = http.get_json(master.addr, 'slaves',
+ config.Config(None))["slaves"]
self.assertEqual(type(agents), list)
self.assertEqual(len(agents), 1)
diff --git a/src/python/cli_new/lib/cli/tests/base.py b/src/python/cli_new/lib/cli/tests/base.py
index 980c00b..cc2120e 100644
--- a/src/python/cli_new/lib/cli/tests/base.py
+++ b/src/python/cli_new/lib/cli/tests/base.py
@@ -33,7 +33,7 @@ from tenacity import retry
from tenacity import stop_after_delay
from tenacity import wait_fixed
-from cli import http
+from cli import http, config
from cli.tests.constants import TEST_AGENT_IP
from cli.tests.constants import TEST_AGENT_PORT
@@ -182,6 +182,7 @@ class Master(Executable):
self.flags = flags
self.name = "master"
self.addr = "{ip}:{port}".format(ip=flags["ip"], port=flags["port"])
+ self.config = config.Config(None)
self.executable = os.path.join(
CLITestCase.MESOS_BUILD_DIR,
"bin",
@@ -248,6 +249,7 @@ class Agent(Executable):
self.flags = flags
self.name = "agent"
self.addr = "{ip}:{port}".format(ip=flags["ip"], port=flags["port"])
+ self.config = config.Config(None)
self.executable = os.path.join(
CLITestCase.MESOS_BUILD_DIR,
"bin",
@@ -263,7 +265,7 @@ class Agent(Executable):
shutil.rmtree(self.flags["runtime_dir"])
# pylint: disable=arguments-differ
- def launch(self, timeout=TIMEOUT):
+ def launch(self):
"""
After starting the agent, we first need to make sure its
reference count is increased and then check that it has
@@ -272,41 +274,33 @@ class Agent(Executable):
super(Agent, self).launch()
Agent.count += 1
- try:
- # pylint: disable=missing-docstring
- def single_slave(data):
- return len(data["slaves"]) == 1
-
- http.get_json(self.flags["master"], "slaves", single_slave, timeout)
- except Exception as exception:
+ data = http.get_json(self.flags["master"], "slaves", self.config)
+ if len(data["slaves"]) == 1:
stdout = ""
if self.proc.poll():
stdout = "\n{output}".format(output=self.proc.stdout.read())
raise CLIException("Could not get '/slaves' endpoint as JSON with"
- " only 1 agent in it: {error}{stdout}"
- .format(error=exception, stdout=stdout))
+ " only 1 agent in it: {stdout}"
+ .format(stdout=stdout))
# pylint: disable=arguments-differ
- def kill(self, timeout=TIMEOUT):
+ def kill(self):
"""
After killing the agent, we need to make sure it has
successfully unregistered from the master before proceeding.
"""
super(Agent, self).kill()
- try:
- # pylint: disable=missing-docstring
- def one_inactive_slave(data):
- slaves = data["slaves"]
- return len(slaves) == 1 and not slaves[0]["active"]
+ data = http.get_json(self.flags["master"], "slaves", self.config)
+ if len(data["slaves"]) == 1 and not data["slaves"][0]["active"]:
+ stdout = ""
+ if self.proc.poll():
+ stdout = "\n{output}".format(output=self.proc.stdout.read())
- http.get_json(
- self.flags["master"], "slaves", one_inactive_slave, timeout)
- except Exception as exception:
raise CLIException("Could not get '/slaves' endpoint as"
- " JSON with 0 agents in it: {error}"
- .format(error=exception))
+ " JSON with 0 agents in it: {stdout}"
+ .format(stdout=stdout))
Agent.count -= 1
@@ -335,18 +329,19 @@ class Task(Executable):
self.flags = flags
self.name = flags["name"]
+ self.config = config.Config(None)
self.executable = os.path.join(
CLITestCase.MESOS_BUILD_DIR,
"src",
"mesos-execute")
- def __wait_for_containers(self, condition, timeout=TIMEOUT):
+ def __wait_for_containers(self, condition):
"""
Wait for the agent's '/containers' endpoint
to return data subject to 'condition'.
"""
try:
- data = http.get_json(self.flags["master"], None, "slaves")
+ data = http.get_json(self.flags["master"], "slaves", self.config)
except Exception as exception:
raise CLIException("Could not get '/slaves' endpoint"
" as JSON: {error}"
@@ -368,15 +363,16 @@ class Task(Executable):
data = http.get_json(
agent["addr"],
"containers",
- condition,
- timeout)
+ self.config)
+
+ condition(data)
except Exception as exception:
raise CLIException("Could not get '/containers' endpoint as"
" JSON subject to condition: {error}"
.format(error=exception))
# pylint: disable=arguments-differ
- def launch(self, timeout=TIMEOUT):
+ def launch(self):
"""
After starting the task, we need to make sure its container
has actually been added to the agent before proceeding.
@@ -390,7 +386,7 @@ class Task(Executable):
return any(container["executor_id"] == self.flags["name"]
for container in data)
- self.__wait_for_containers(container_exists, timeout)
+ self.__wait_for_containers(container_exists)
except Exception as exception:
stdout = ""
if self.proc.poll():
@@ -404,7 +400,7 @@ class Task(Executable):
stdout=stdout))
# pylint: disable=arguments-differ
- def kill(self, timeout=TIMEOUT):
+ def kill(self):
"""
After killing the task, we need to make sure its container has
actually been removed from the agent before proceeding.
@@ -417,7 +413,7 @@ class Task(Executable):
return not any(container["executor_id"] == self.flags["name"]
for container in data)
- self.__wait_for_containers(container_does_not_exist, timeout)
+ self.__wait_for_containers(container_does_not_exist)
except Exception as exception:
raise CLIException("Container with name '{name}' still"
" exists after timeout: {error}"
@@ -510,7 +506,8 @@ def wait_for_task(master, name, state, delay=1):
"""
@retry(wait=wait_fixed(0.2), stop=stop_after_delay(delay))
def _wait_for_task():
- tasks = http.get_json(master.addr, None, "tasks")["tasks"]
+ tasks = http.get_json(master.addr, "tasks",
+ config.Config(None))["tasks"]
for task in tasks:
if task["name"] == name and task["state"] == state:
return task