You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by cf...@apache.org on 2021/10/03 18:49:45 UTC

[mesos] branch master updated: remove tasks limit and condition. add global functions.

This is an automated email from the ASF dual-hosted git repository.

cfnatali pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git


The following commit(s) were added to refs/heads/master by this push:
     new 067fb9f  remove tasks limit and condition. add global functions.
067fb9f is described below

commit 067fb9f512a8638b1070b23462c43aec4ca6f370
Author: Andreas Peters <ap...@aventer.biz>
AuthorDate: Tue Sep 21 12:03:36 2021 +0200

    remove tasks limit and condition. add global functions.
    
    remove unneeded empty lines
    
    FIX test script.
    
    REMOVE: not needed newline.
    
    FIX: Missing config.
    
    MODIFY text length.
---
 src/python/cli_new/lib/cli/http.py        | 14 ++------
 src/python/cli_new/lib/cli/mesos.py       | 40 ++++++++++++++++-----
 src/python/cli_new/lib/cli/tests/agent.py |  3 +-
 src/python/cli_new/lib/cli/tests/base.py  | 59 +++++++++++++++----------------
 4 files changed, 65 insertions(+), 51 deletions(-)

diff --git a/src/python/cli_new/lib/cli/http.py b/src/python/cli_new/lib/cli/http.py
index c39935f..ea1ff89 100644
--- a/src/python/cli_new/lib/cli/http.py
+++ b/src/python/cli_new/lib/cli/http.py
@@ -64,13 +64,11 @@ def read_endpoint(addr, endpoint, config, query=None):
                            .format(url=url, error=str(exception)))
 
 
-def get_json(addr, endpoint, config, condition=None, query=None):
+def get_json(addr, endpoint, config, query=None):
     """
-    Return the contents of the 'endpoint' at 'addr' as JSON data
-    subject to the condition specified in 'condition'. If we are
-    unable to read the data we throw an error.
+    Return the contents of the 'endpoint' at 'addr' as JSON data.
+    If we are unable to read the data we throw an error.
     """
-
     data = read_endpoint(addr, endpoint, config, query)
 
     try:
@@ -79,10 +77,4 @@ def get_json(addr, endpoint, config, condition=None, query=None):
         raise CLIException("Could not load JSON from '{data}': {error}"
                            .format(data=data, error=str(exception)))
 
-    if not condition:
-        return data
-
-    if condition(data):
-        return data
-
     return data
diff --git a/src/python/cli_new/lib/cli/mesos.py b/src/python/cli_new/lib/cli/mesos.py
index 44a66db..a1055b3 100644
--- a/src/python/cli_new/lib/cli/mesos.py
+++ b/src/python/cli_new/lib/cli/mesos.py
@@ -49,13 +49,8 @@ def get_agent_address(agent_id, master, config):
     Given a master and an agent id, return the agent address
     by checking the /slaves endpoint of the master.
     """
-    try:
-        agents = http.get_json(master, "slaves", config)["slaves"]
-    except Exception as exception:
-        raise CLIException("Could not open '/slaves'"
-                           " endpoint at '{addr}': {error}"
-                           .format(addr=master,
-                                   error=exception))
+    agents = get_agents(master, config)
+
     for agent in agents:
         if agent["id"] == agent_id:
             return agent["pid"].split("@")[1]
@@ -68,6 +63,35 @@ def get_agents(master, config):
     """
     endpoint = "slaves"
     key = "slaves"
+    return get_key_endpoint(key, endpoint, master, config)
+
+def get_framework_address(framework_id, master, config):
+    """
+    Given a master and an framework id, return the framework address
+    by checking the /master/frameworks endpoint of the master.
+    """
+    frameworks = get_frameworks(master, config)
+
+    for framework in frameworks:
+        if framework["id"] == framework_id:
+            return framework["webui_url"]
+    raise CLIException("Unable to find framework '{id}'"
+                       .format(id=framework_id))
+
+
+def get_frameworks(master, config):
+    """
+    Get the frameworks in a Mesos cluster.
+    """
+    endpoint = "master/frameworks/"
+    key = "frameworks"
+    return get_key_endpoint(key, endpoint, master, config)
+
+
+def get_key_endpoint(key, endpoint, master, config):
+    """
+    Get the json key of the given endpoint
+    """
     try:
         data = http.get_json(master, endpoint, config)
     except Exception as exception:
@@ -121,7 +145,7 @@ def get_tasks(master, config, query=None):
     key = "tasks"
 
     if query is None:
-        query = {'order':'asc'}
+        query = {'order':'asc', 'limit':'-1'}
 
     try:
         data = http.get_json(master, endpoint, config, query=query)
diff --git a/src/python/cli_new/lib/cli/tests/agent.py b/src/python/cli_new/lib/cli/tests/agent.py
index 8ff6842..57a2742 100644
--- a/src/python/cli_new/lib/cli/tests/agent.py
+++ b/src/python/cli_new/lib/cli/tests/agent.py
@@ -48,7 +48,8 @@ class TestAgentPlugin(CLITestCase):
 
         # Open the master's `/slaves` endpoint and read the
         # agents' information ourselves.
-        agents = http.get_json(master.addr, None, 'slaves')["slaves"]
+        agents = http.get_json(master.addr, 'slaves',
+                               config.Config(None))["slaves"]
 
         self.assertEqual(type(agents), list)
         self.assertEqual(len(agents), 1)
diff --git a/src/python/cli_new/lib/cli/tests/base.py b/src/python/cli_new/lib/cli/tests/base.py
index 980c00b..cc2120e 100644
--- a/src/python/cli_new/lib/cli/tests/base.py
+++ b/src/python/cli_new/lib/cli/tests/base.py
@@ -33,7 +33,7 @@ from tenacity import retry
 from tenacity import stop_after_delay
 from tenacity import wait_fixed
 
-from cli import http
+from cli import http, config
 
 from cli.tests.constants import TEST_AGENT_IP
 from cli.tests.constants import TEST_AGENT_PORT
@@ -182,6 +182,7 @@ class Master(Executable):
         self.flags = flags
         self.name = "master"
         self.addr = "{ip}:{port}".format(ip=flags["ip"], port=flags["port"])
+        self.config = config.Config(None)
         self.executable = os.path.join(
             CLITestCase.MESOS_BUILD_DIR,
             "bin",
@@ -248,6 +249,7 @@ class Agent(Executable):
         self.flags = flags
         self.name = "agent"
         self.addr = "{ip}:{port}".format(ip=flags["ip"], port=flags["port"])
+        self.config = config.Config(None)
         self.executable = os.path.join(
             CLITestCase.MESOS_BUILD_DIR,
             "bin",
@@ -263,7 +265,7 @@ class Agent(Executable):
             shutil.rmtree(self.flags["runtime_dir"])
 
     # pylint: disable=arguments-differ
-    def launch(self, timeout=TIMEOUT):
+    def launch(self):
         """
         After starting the agent, we first need to make sure its
         reference count is increased and then check that it has
@@ -272,41 +274,33 @@ class Agent(Executable):
         super(Agent, self).launch()
         Agent.count += 1
 
-        try:
-            # pylint: disable=missing-docstring
-            def single_slave(data):
-                return len(data["slaves"]) == 1
-
-            http.get_json(self.flags["master"], "slaves", single_slave, timeout)
-        except Exception as exception:
+        data = http.get_json(self.flags["master"], "slaves", self.config)
+        if len(data["slaves"]) == 1:
             stdout = ""
             if self.proc.poll():
                 stdout = "\n{output}".format(output=self.proc.stdout.read())
 
             raise CLIException("Could not get '/slaves' endpoint as JSON with"
-                               " only 1 agent in it: {error}{stdout}"
-                               .format(error=exception, stdout=stdout))
+                               " only 1 agent in it: {stdout}"
+                               .format(stdout=stdout))
 
     # pylint: disable=arguments-differ
-    def kill(self, timeout=TIMEOUT):
+    def kill(self):
         """
         After killing the agent, we need to make sure it has
         successfully unregistered from the master before proceeding.
         """
         super(Agent, self).kill()
 
-        try:
-            # pylint: disable=missing-docstring
-            def one_inactive_slave(data):
-                slaves = data["slaves"]
-                return len(slaves) == 1 and not slaves[0]["active"]
+        data = http.get_json(self.flags["master"], "slaves", self.config)
+        if len(data["slaves"]) == 1 and not data["slaves"][0]["active"]:
+            stdout = ""
+            if self.proc.poll():
+                stdout = "\n{output}".format(output=self.proc.stdout.read())
 
-            http.get_json(
-                self.flags["master"], "slaves", one_inactive_slave, timeout)
-        except Exception as exception:
             raise CLIException("Could not get '/slaves' endpoint as"
-                               " JSON with 0 agents in it: {error}"
-                               .format(error=exception))
+                               " JSON with 0 agents in it: {stdout}"
+                               .format(stdout=stdout))
 
         Agent.count -= 1
 
@@ -335,18 +329,19 @@ class Task(Executable):
 
         self.flags = flags
         self.name = flags["name"]
+        self.config = config.Config(None)
         self.executable = os.path.join(
             CLITestCase.MESOS_BUILD_DIR,
             "src",
             "mesos-execute")
 
-    def __wait_for_containers(self, condition, timeout=TIMEOUT):
+    def __wait_for_containers(self, condition):
         """
         Wait for the agent's '/containers' endpoint
         to return data subject to 'condition'.
         """
         try:
-            data = http.get_json(self.flags["master"], None, "slaves")
+            data = http.get_json(self.flags["master"], "slaves", self.config)
         except Exception as exception:
             raise CLIException("Could not get '/slaves' endpoint"
                                " as JSON: {error}"
@@ -368,15 +363,16 @@ class Task(Executable):
             data = http.get_json(
                 agent["addr"],
                 "containers",
-                condition,
-                timeout)
+                self.config)
+
+            condition(data)
         except Exception as exception:
             raise CLIException("Could not get '/containers' endpoint as"
                                " JSON subject to condition: {error}"
                                .format(error=exception))
 
     # pylint: disable=arguments-differ
-    def launch(self, timeout=TIMEOUT):
+    def launch(self):
         """
         After starting the task, we need to make sure its container
         has actually been added to the agent before proceeding.
@@ -390,7 +386,7 @@ class Task(Executable):
                 return any(container["executor_id"] == self.flags["name"]
                            for container in data)
 
-            self.__wait_for_containers(container_exists, timeout)
+            self.__wait_for_containers(container_exists)
         except Exception as exception:
             stdout = ""
             if self.proc.poll():
@@ -404,7 +400,7 @@ class Task(Executable):
                                        stdout=stdout))
 
     # pylint: disable=arguments-differ
-    def kill(self, timeout=TIMEOUT):
+    def kill(self):
         """
         After killing the task, we need to make sure its container has
         actually been removed from the agent before proceeding.
@@ -417,7 +413,7 @@ class Task(Executable):
                 return not any(container["executor_id"] == self.flags["name"]
                                for container in data)
 
-            self.__wait_for_containers(container_does_not_exist, timeout)
+            self.__wait_for_containers(container_does_not_exist)
         except Exception as exception:
             raise CLIException("Container with name '{name}' still"
                                " exists after timeout: {error}"
@@ -510,7 +506,8 @@ def wait_for_task(master, name, state, delay=1):
     """
     @retry(wait=wait_fixed(0.2), stop=stop_after_delay(delay))
     def _wait_for_task():
-        tasks = http.get_json(master.addr, None, "tasks")["tasks"]
+        tasks = http.get_json(master.addr, "tasks",
+                              config.Config(None))["tasks"]
         for task in tasks:
             if task["name"] == name and task["state"] == state:
                 return task