You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by jc...@apache.org on 2016/09/15 18:48:20 UTC

aurora git commit: Ensure shell health checkers running for tasks running under an isolated fileystem are run within that filesystem.

Repository: aurora
Updated Branches:
  refs/heads/master 593233857 -> 783baaefb


Ensure shell health checkers running for tasks running under an isolated fileystem are run within
that filesystem.

Reviewed at https://reviews.apache.org/r/51899/


Project: http://git-wip-us.apache.org/repos/asf/aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/783baaef
Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/783baaef
Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/783baaef

Branch: refs/heads/master
Commit: 783baaefb9a814ca01fad78181fe3df3de5b34af
Parents: 5932338
Author: Joshua Cohen <jc...@apache.org>
Authored: Thu Sep 15 13:48:05 2016 -0500
Committer: Joshua Cohen <jc...@apache.org>
Committed: Thu Sep 15 13:48:05 2016 -0500

----------------------------------------------------------------------
 .../apache/aurora/common/health_check/shell.py  | 29 ++++++++--
 .../executor/bin/thermos_executor_main.py       |  4 +-
 .../aurora/executor/common/health_checker.py    | 41 +++++++++----
 src/main/python/apache/thermos/common/BUILD     |  1 +
 .../apache/thermos/common/process_util.py       | 44 ++++++++++++++
 src/main/python/apache/thermos/core/process.py  | 25 ++------
 .../aurora/common/health_check/test_shell.py    | 21 ++++++-
 .../executor/common/test_health_checker.py      | 61 +++++++++++++++++++-
 .../apache/aurora/e2e/http/http_example.aurora  | 12 +++-
 9 files changed, 194 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/aurora/blob/783baaef/src/main/python/apache/aurora/common/health_check/shell.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/common/health_check/shell.py b/src/main/python/apache/aurora/common/health_check/shell.py
index 3575082..6ac9021 100644
--- a/src/main/python/apache/aurora/common/health_check/shell.py
+++ b/src/main/python/apache/aurora/common/health_check/shell.py
@@ -24,19 +24,40 @@ else:
   import subprocess
 
 
+class WrappedCalledProcessError(subprocess.CalledProcessError):
+  """
+  Wraps a CalledProcessError but overrides the command so that in the event it was run through an
+  isolator, the original command is exposed to the user, rather than the isolated value.
+  """
+
+  def __init__(self, original_command, error):
+    self.cmd = original_command
+    self.returncode = error.returncode
+    self.output = error.output
+
+
 class ShellHealthCheck(object):
 
-  def __init__(self, cmd, preexec_fn=None, timeout_secs=None):
+  def __init__(
+        self,
+        cmd,
+        preexec_fn=None,
+        timeout_secs=None,
+        wrapper_fn=None):
+
     """
-    Initialize with the commmand we would like to call.
+    Initialize with the command we would like to call.
     :param cmd: Command to execute that is expected to have a 0 return code on success.
     :type cmd: str
     :param preexec_fn: Callable to invoke just before the child shell process is executed.
     :type preexec_fn: callable
     :param timeout_secs: Timeout in seconds.
     :type timeout_secs: int
+    :param wrapper_fn: Callable to invoke that wraps the shell command for filesystem isolation.
+    :type wrapper_fn: callable
     """
-    self._cmd = cmd
+    self._original_cmd = cmd
+    self._cmd = cmd if wrapper_fn is None else wrapper_fn(cmd)
     self._preexec_fn = preexec_fn
     self._timeout_secs = timeout_secs
 
@@ -56,7 +77,7 @@ class ShellHealthCheck(object):
       return True, None
     except subprocess.CalledProcessError as reason:
       # The command didn't return a 0 so provide reason for failure.
-      return False, str(reason)
+      return False, str(WrappedCalledProcessError(self._original_cmd, reason))
     except subprocess.TimeoutExpired:
       return False, 'Health check timed out.'
     except OSError as e:

http://git-wip-us.apache.org/repos/asf/aurora/blob/783baaef/src/main/python/apache/aurora/executor/bin/thermos_executor_main.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/executor/bin/thermos_executor_main.py b/src/main/python/apache/aurora/executor/bin/thermos_executor_main.py
index 5211f28..c6c0898 100644
--- a/src/main/python/apache/aurora/executor/bin/thermos_executor_main.py
+++ b/src/main/python/apache/aurora/executor/bin/thermos_executor_main.py
@@ -218,7 +218,9 @@ def initialize(options):
 
   # status providers:
   status_providers = [
-      HealthCheckerProvider(nosetuid_health_checks=options.nosetuid_health_checks),
+      HealthCheckerProvider(
+          nosetuid_health_checks=options.nosetuid_health_checks,
+          mesos_containerizer_path=options.mesos_containerizer_path),
       ResourceManagerProvider(checkpoint_root=checkpoint_root)
   ]
 

http://git-wip-us.apache.org/repos/asf/aurora/blob/783baaef/src/main/python/apache/aurora/executor/common/health_checker.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/executor/common/health_checker.py b/src/main/python/apache/aurora/executor/common/health_checker.py
index 5fc845e..3c7c09d 100644
--- a/src/main/python/apache/aurora/executor/common/health_checker.py
+++ b/src/main/python/apache/aurora/executor/common/health_checker.py
@@ -12,7 +12,7 @@
 # limitations under the License.
 #
 
-import os.path
+import os
 import pwd
 import threading
 import time
@@ -27,6 +27,7 @@ from twitter.common.metrics import LambdaGauge
 from apache.aurora.common.health_check.http_signaler import HttpSignaler
 from apache.aurora.common.health_check.shell import ShellHealthCheck
 from apache.aurora.config.schema.base import MesosContext
+from apache.thermos.common.process_util import wrap_with_mesos_containerizer
 from apache.thermos.config.schema import ThermosContext
 
 from .status_checker import StatusChecker, StatusCheckerProvider, StatusResult
@@ -208,8 +209,9 @@ class HealthChecker(StatusChecker):
 
 class HealthCheckerProvider(StatusCheckerProvider):
 
-  def __init__(self, nosetuid_health_checks=False):
-    self.nosetuid_health_checks = nosetuid_health_checks
+  def __init__(self, nosetuid_health_checks=False, mesos_containerizer_path=None):
+    self._nosetuid_health_checks = nosetuid_health_checks
+    self._mesos_containerizer_path = mesos_containerizer_path
 
   @staticmethod
   def interpolate_cmd(task, cmd):
@@ -241,24 +243,41 @@ class HealthCheckerProvider(StatusCheckerProvider):
     timeout_secs = health_check_config.get('timeout_secs')
     if SHELL_HEALTH_CHECK in health_checker:
       shell_command = health_checker.get(SHELL_HEALTH_CHECK, {}).get('shell_command')
-      # Filling in variables eg thermos.ports[http] that could have been passed in as part of
+
+      # Filling in variables e.g. thermos.ports[http] that could have been passed in as part of
       # shell_command.
       interpolated_command = HealthCheckerProvider.interpolate_cmd(
         task=assigned_task,
-        cmd=shell_command
-      )
-      # If we do not want user which is job's role to execute the health shell check
-      # --nosetuid-health-checks should be passed in as an argument to the executor.
+        cmd=shell_command)
+
+      # If we do not want the health check to execute as the user from the job's role
+      # --nosetuid-health-checks should be passed as an argument to the executor.
       demote_to_job_role_user = None
-      if not self.nosetuid_health_checks:
+      if not self._nosetuid_health_checks and not sandbox.is_filesystem_image:
         pw_entry = pwd.getpwnam(assigned_task.task.job.role)
         def demote_to_job_role_user():
           os.setgid(pw_entry.pw_gid)
           os.setuid(pw_entry.pw_uid)
 
-      shell_signaler = ShellHealthCheck(cmd=interpolated_command,
+      # If the task is executing in an isolated filesystem we'll want to wrap the health check
+      # command within a mesos-containerizer invocation so that it's executed within that
+      # filesystem.
+      wrapper = None
+      if sandbox.is_filesystem_image:
+        health_check_user = (os.getusername() if self._nosetuid_health_checks
+            else assigned_task.task.job.role)
+        def wrapper(cmd):
+          return wrap_with_mesos_containerizer(
+              cmd,
+              health_check_user,
+              sandbox.container_root,
+              self._mesos_containerizer_path)
+
+      shell_signaler = ShellHealthCheck(
+        cmd=interpolated_command,
         preexec_fn=demote_to_job_role_user,
-        timeout_secs=timeout_secs)
+        timeout_secs=timeout_secs,
+        wrapper_fn=wrapper)
       a_health_checker = lambda: shell_signaler()
     else:
       portmap = resolve_ports(mesos_task, assigned_task.assignedPorts)

http://git-wip-us.apache.org/repos/asf/aurora/blob/783baaef/src/main/python/apache/thermos/common/BUILD
----------------------------------------------------------------------
diff --git a/src/main/python/apache/thermos/common/BUILD b/src/main/python/apache/thermos/common/BUILD
index 879b812..0adabbf 100644
--- a/src/main/python/apache/thermos/common/BUILD
+++ b/src/main/python/apache/thermos/common/BUILD
@@ -22,6 +22,7 @@ python_library(
     '3rdparty/python:pystachio',
     '3rdparty/python:twitter.common.log',
     '3rdparty/python:twitter.common.recordio',
+    'api/src/main/thrift/org/apache/aurora/gen',
     'api/src/main/thrift/org/apache/thermos'
   ],
   provides = setup_py(

http://git-wip-us.apache.org/repos/asf/aurora/blob/783baaef/src/main/python/apache/thermos/common/process_util.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/thermos/common/process_util.py b/src/main/python/apache/thermos/common/process_util.py
new file mode 100644
index 0000000..abd2c0e
--- /dev/null
+++ b/src/main/python/apache/thermos/common/process_util.py
@@ -0,0 +1,44 @@
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import os
+
+from gen.apache.aurora.api.constants import TASK_FILESYSTEM_MOUNT_POINT
+
+
+def wrap_with_mesos_containerizer(cmdline, user, cwd, mesos_containerizer_path):
+  # We're going to embed this in JSON, so we must escape quotes and newlines.
+  cmdline = cmdline.replace('"', '\\"').replace('\n', '\\n')
+
+  # We must wrap the command in single quotes otherwise the shell that executes
+  # mesos-containerizer will expand any bash variables in the cmdline. Escaping single quotes in
+  # bash is hard: https://github.com/koalaman/shellcheck/wiki/SC1003.
+  bash_wrapper = "/bin/bash -c '\\''%s'\\''"
+
+  # The shell: true below shouldn't be necessary. Since we're just invoking bash anyway, using it
+  # results in a process like: `sh -c /bin/bash -c ...`, however in my testing no combination of
+  # shell: false and splitting the bash/cmdline args across value/arguments produced an invocation
+  # that actually worked. That said, it *should* be possbie.
+  # TODO(jcohen): Investigate setting shell:false further.
+  return ('%s launch '
+          '--unshare_namespace_mnt '
+          '--working_directory=%s '
+          '--rootfs=%s '
+          '--user=%s '
+          '--command=\'{"shell":true,"value":"%s"}\'' % (
+              mesos_containerizer_path,
+              cwd,
+              os.path.join(os.environ['MESOS_DIRECTORY'], TASK_FILESYSTEM_MOUNT_POINT),
+              user,
+              bash_wrapper % cmdline))

http://git-wip-us.apache.org/repos/asf/aurora/blob/783baaef/src/main/python/apache/thermos/core/process.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/thermos/core/process.py b/src/main/python/apache/thermos/core/process.py
index 2134d4f..3ec43e2 100644
--- a/src/main/python/apache/thermos/core/process.py
+++ b/src/main/python/apache/thermos/core/process.py
@@ -39,6 +39,8 @@ from twitter.common.lang import Interface
 from twitter.common.quantity import Amount, Data, Time
 from twitter.common.recordio import ThriftRecordReader, ThriftRecordWriter
 
+from apache.thermos.common.process_util import wrap_with_mesos_containerizer
+
 from gen.apache.aurora.api.constants import TASK_FILESYSTEM_MOUNT_POINT
 from gen.apache.thermos.ttypes import ProcessState, ProcessStatus, RunnerCkpt
 
@@ -386,27 +388,8 @@ class Process(ProcessBase):
     if self._mesos_containerizer_path is None:
       return ['/bin/bash', '-c', cmdline]
 
-    # We're going to embed this in JSON, so we must escape quotes and newlines.
-    cmdline = cmdline.replace('"', '\\"').replace('\n', '\\n')
-
-    # We must wrap the command in single quotes otherwise the shell that executes
-    # mesos-containerizer will expand any bash variables in the cmdline. Escaping single quotes in
-    # bash is hard: https://github.com/koalaman/shellcheck/wiki/SC1003.
-    bash_wrapper = "/bin/bash -c '\\''%s'\\''"
-
-    wrapped = ('%s launch '
-               '--unshare_namespace_mnt '
-               '--working_directory=%s '
-               '--rootfs=%s '
-               '--user=%s '
-               '--command=\'{"shell":true,"value":"%s"}\'' % (
-                   self._mesos_containerizer_path,
-                   cwd,
-                   os.path.join(os.environ['MESOS_DIRECTORY'], TASK_FILESYSTEM_MOUNT_POINT),
-                   self._user,
-                   bash_wrapper % cmdline))
-
-    return shlex.split(wrapped)
+    return shlex.split(
+        wrap_with_mesos_containerizer(cmdline, self._user, cwd, self._mesos_containerizer_path))
 
   def execute(self):
     """Perform final initialization and launch target process commandline in a subprocess."""

http://git-wip-us.apache.org/repos/asf/aurora/blob/783baaef/src/test/python/apache/aurora/common/health_check/test_shell.py
----------------------------------------------------------------------
diff --git a/src/test/python/apache/aurora/common/health_check/test_shell.py b/src/test/python/apache/aurora/common/health_check/test_shell.py
index 011464c..4f02878 100644
--- a/src/test/python/apache/aurora/common/health_check/test_shell.py
+++ b/src/test/python/apache/aurora/common/health_check/test_shell.py
@@ -41,12 +41,27 @@ class TestHealthChecker(unittest.TestCase):
 
   @mock.patch('subprocess32.check_call', autospec=True)
   def test_health_check_failed(self, mock_popen):
+    cmd = 'failed'
     # Fail due to command returning a non-0 exit status.
-    mock_popen.side_effect = subprocess.CalledProcessError(1, 'failed')
+    mock_popen.side_effect = subprocess.CalledProcessError(1, cmd)
 
-    shell = ShellHealthCheck('cmd', timeout_secs=30)
+    shell = ShellHealthCheck(cmd, timeout_secs=30)
     success, msg = shell()
-    mock_popen.assert_called_once_with('cmd', shell=True, timeout=30, preexec_fn=mock.ANY)
+    mock_popen.assert_called_once_with(cmd, shell=True, timeout=30, preexec_fn=mock.ANY)
+
+    self.assertFalse(success)
+    self.assertEqual(msg, "Command 'failed' returned non-zero exit status 1")
+
+  @mock.patch('subprocess32.check_call', autospec=True)
+  def test_health_check_failed_with_wrapper(self, mock_popen):
+    cmd = 'failed'
+    mock_popen.side_effect = subprocess.CalledProcessError(1, cmd)
+
+    shell = ShellHealthCheck(cmd, timeout_secs=30, wrapper_fn=lambda c: 'wrapped: %s' % c)
+    success, msg = shell()
+    self.assertEqual(
+        mock_popen.mock_calls,
+        [mock.call('wrapped: %s' % cmd, shell=True, timeout=30, preexec_fn=mock.ANY)])
 
     self.assertFalse(success)
     self.assertEqual(msg, "Command 'failed' returned non-zero exit status 1")

http://git-wip-us.apache.org/repos/asf/aurora/blob/783baaef/src/test/python/apache/aurora/executor/common/test_health_checker.py
----------------------------------------------------------------------
diff --git a/src/test/python/apache/aurora/executor/common/test_health_checker.py b/src/test/python/apache/aurora/executor/common/test_health_checker.py
index bb6ea69..da0c56c 100644
--- a/src/test/python/apache/aurora/executor/common/test_health_checker.py
+++ b/src/test/python/apache/aurora/executor/common/test_health_checker.py
@@ -275,7 +275,12 @@ class TestHealthCheckerProvider(unittest.TestCase):
     execconfig_data = json.loads(assigned_task.task.executorConfig.data)
     assert execconfig_data[
              'health_check_config']['health_checker']['shell']['shell_command'] == 'failed command'
-    health_checker = HealthCheckerProvider().from_assigned_task(assigned_task, None)
+
+    mock_sandbox = mock.Mock(spec_set=SandboxInterface)
+    type(mock_sandbox).root = mock.PropertyMock(return_value='/some/path')
+    type(mock_sandbox).is_filesystem_image = mock.PropertyMock(return_value=False)
+
+    health_checker = HealthCheckerProvider().from_assigned_task(assigned_task, mock_sandbox)
     assert health_checker.threaded_health_checker.interval == interval_secs
     assert health_checker.threaded_health_checker.initial_interval == initial_interval_secs
     hct_max_fail = health_checker.threaded_health_checker.max_consecutive_failures
@@ -309,8 +314,14 @@ class TestHealthCheckerProvider(unittest.TestCase):
     execconfig_data = json.loads(assigned_task.task.executorConfig.data)
     assert execconfig_data[
              'health_check_config']['health_checker']['shell']['shell_command'] == 'failed command'
+
+    mock_sandbox = mock.Mock(spec_set=SandboxInterface)
+    type(mock_sandbox).root = mock.PropertyMock(return_value='/some/path')
+    type(mock_sandbox).is_filesystem_image = mock.PropertyMock(return_value=False)
+
     health_checker = HealthCheckerProvider(nosetuid_health_checks=True).from_assigned_task(
-      assigned_task, None)
+        assigned_task,
+        mock_sandbox)
     assert health_checker.threaded_health_checker.interval == interval_secs
     assert health_checker.threaded_health_checker.initial_interval == initial_interval_secs
     hct_max_fail = health_checker.threaded_health_checker.max_consecutive_failures
@@ -318,6 +329,52 @@ class TestHealthCheckerProvider(unittest.TestCase):
     # Should not be trying to access role's user info.
     assert not mock_getpwnam.called
 
+  @mock.patch('pwd.getpwnam')
+  def test_from_assigned_task_shell_filesystem_image(self, mock_getpwnam):
+    interval_secs = 17
+    initial_interval_secs = 3
+    max_consecutive_failures = 2
+    timeout_secs = 5
+    shell_config = ShellHealthChecker(shell_command='failed command')
+    task_config = TaskConfig(
+            job=JobKey(role='role', environment='env', name='name'),
+            executorConfig=ExecutorConfig(
+                    name='thermos-generic',
+                    data=MESOS_JOB(
+                            task=HELLO_WORLD,
+                            health_check_config=HealthCheckConfig(
+                                    health_checker=HealthCheckerConfig(shell=shell_config),
+                                    interval_secs=interval_secs,
+                                    initial_interval_secs=initial_interval_secs,
+                                    max_consecutive_failures=max_consecutive_failures,
+                                    timeout_secs=timeout_secs,
+                            )
+                    ).json_dumps()
+            )
+    )
+    assigned_task = AssignedTask(task=task_config, instanceId=1, assignedPorts={'foo': 9001})
+    execconfig_data = json.loads(assigned_task.task.executorConfig.data)
+    assert execconfig_data[
+             'health_check_config']['health_checker']['shell']['shell_command'] == 'failed command'
+
+    mock_sandbox = mock.Mock(spec_set=SandboxInterface)
+    type(mock_sandbox).root = mock.PropertyMock(return_value='/some/path')
+    type(mock_sandbox).is_filesystem_image = mock.PropertyMock(return_value=True)
+
+    with mock.patch('apache.aurora.executor.common.health_checker.ShellHealthCheck') as mock_shell:
+      HealthCheckerProvider(
+          nosetuid_health_checks=False,
+          mesos_containerizer_path='/some/path/mesos-containerizer').from_assigned_task(
+              assigned_task,
+              mock_sandbox)
+
+      class NotNone(object):
+        def __eq__(self, other):
+          return other is not None
+
+      assert mock_shell.mock_calls == [
+          mock.call(cmd='failed command', wrapper_fn=NotNone(), preexec_fn=None, timeout_secs=5.0)]
+
   def test_interpolate_cmd(self):
     """Making sure thermos.ports[foo] gets correctly substituted with assignedPorts info."""
     interval_secs = 17

http://git-wip-us.apache.org/repos/asf/aurora/blob/783baaef/src/test/sh/org/apache/aurora/e2e/http/http_example.aurora
----------------------------------------------------------------------
diff --git a/src/test/sh/org/apache/aurora/e2e/http/http_example.aurora b/src/test/sh/org/apache/aurora/e2e/http/http_example.aurora
index 290627f..c71fb81 100644
--- a/src/test/sh/org/apache/aurora/e2e/http/http_example.aurora
+++ b/src/test/sh/org/apache/aurora/e2e/http/http_example.aurora
@@ -73,6 +73,12 @@ no_python_task = SequentialTask(
 
 update_config = UpdateConfig(watch_secs=10, batch_size=2)
 health_check_config = HealthCheckConfig(initial_interval_secs=5, interval_secs=1)
+shell_health_check_config = HealthCheckConfig(
+  health_checker = HealthCheckerConfig(
+    shell = ShellHealthChecker(shell_command='stat /usr/local/bin/run-server.sh')),
+  initial_interval_secs=5,
+  interval_secs=1,
+)
 
 job = Service(
   cluster = 'devcluster',
@@ -106,11 +112,13 @@ jobs = [
   job(
     name = 'http_example_unified_appc',
     container = Mesos(image=AppcImage(name='http_example_netcat', image_id='{{appc_image_id}}')),
-    task = no_python_task
+    task = no_python_task,
+    health_check_config=shell_health_check_config
   ).bind(profile=DefaultProfile()),
   job(
     name = 'http_example_unified_docker',
     container = Mesos(image=DockerImage(name='http_example_netcat', tag='latest')),
-    task = no_python_task
+    task = no_python_task,
+    health_check_config=shell_health_check_config
   ).bind(profile=DefaultProfile())
 ]