You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by ma...@apache.org on 2016/03/23 01:22:49 UTC

aurora git commit: Ensure final processes are executed when ephemeral daemon processes exist.

Repository: aurora
Updated Branches:
  refs/heads/master b6d23afdf -> c66a9eeef


Ensure final processes are executed when ephemeral daemon processes exist.

Bugs closed: AURORA-1642

Reviewed at https://reviews.apache.org/r/45115/


Project: http://git-wip-us.apache.org/repos/asf/aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/c66a9eee
Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/c66a9eee
Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/c66a9eee

Branch: refs/heads/master
Commit: c66a9eeef97ebb0be367f76a3992ef9f17ebbde0
Parents: b6d23af
Author: Amol Deshmukh <am...@apache.org>
Authored: Tue Mar 22 17:22:40 2016 -0700
Committer: Maxim Khutornenko <ma...@apache.org>
Committed: Tue Mar 22 17:22:40 2016 -0700

----------------------------------------------------------------------
 src/main/python/apache/thermos/core/process.py  | 95 ++++++++++++--------
 .../python/apache/thermos/core/test_process.py  | 26 +++---
 .../e2e/ephemeral_daemon_with_final.aurora      | 47 ++++++++++
 .../sh/org/apache/aurora/e2e/test_end_to_end.sh | 47 +++++++++-
 4 files changed, 165 insertions(+), 50 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/aurora/blob/c66a9eee/src/main/python/apache/thermos/core/process.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/thermos/core/process.py b/src/main/python/apache/thermos/core/process.py
index f147af7..1791b5f 100644
--- a/src/main/python/apache/thermos/core/process.py
+++ b/src/main/python/apache/thermos/core/process.py
@@ -417,8 +417,13 @@ class Process(ProcessBase):
                                                        mode=self._logger_mode,
                                                        rotate_log_size=self._rotate_log_size,
                                                        rotate_log_backups=self._rotate_log_backups)
-    stdout, stderr = log_destination_resolver.get_handlers()
-    executor = PipedSubprocessExecutor(stdout=stdout,
+    stdout, stderr, handlers_are_files = log_destination_resolver.get_handlers()
+    if handlers_are_files:
+      executor = SubprocessExecutor(stdout=stdout,
+                                    stderr=stderr,
+                                    **subprocess_args)
+    else:
+      executor = PipedSubprocessExecutor(stdout=stdout,
                                        stderr=stderr,
                                        **subprocess_args)
 
@@ -486,9 +491,35 @@ class SubprocessExecutorBase(object):
     raise NotImplementedError()
 
 
+class SubprocessExecutor(SubprocessExecutorBase):
+  """
+  Basic implementation of a SubprocessExecutor that writes stderr/stdout to specified output files.
+  """
+
+  def __init__(self, args, close_fds, cwd, env, pathspec, stdout=None, stderr=None):
+    """
+    See SubprocessExecutorBase.__init__
+
+    Takes additional arguments:
+      stdout = Destination handler for stdout output. Default is /dev/null.
+      stderr = Destination handler for stderr output. Default is /dev/null.
+    """
+    super(SubprocessExecutor, self).__init__(args, close_fds, cwd, env, pathspec)
+    self._stderr = stderr
+    self._stdout = stdout
+
+  def start(self):
+    self._popen = self._start_subprocess(self._stderr, self._stdout)
+    return self._popen.pid
+
+  def wait(self):
+    return self._popen.wait()
+
+
 class PipedSubprocessExecutor(SubprocessExecutorBase):
   """
-  Implementation of SubprocessExecutorBase that passes logs to provided destinations
+  Implementation of SubprocessExecutorBase that uses pipes to poll the pipes to output streams and
+  copies them to the specified destinations.
   """
 
   READ_BUFFER_SIZE = 2 ** 16
@@ -537,7 +568,7 @@ class PipedSubprocessExecutor(SubprocessExecutorBase):
 
 class LogDestinationResolver(object):
   """
-  Resolves correct stdout/stderr destinations based on process configuration
+  Resolves correct stdout/stderr destinations based on process configuration.
   """
 
   STDOUT = 'stdout'
@@ -568,11 +599,20 @@ class LogDestinationResolver(object):
     """
     Creates stdout/stderr handler by provided configuration
     """
-    return self._get_handler(self.STDOUT), self._get_handler(self.STDERR)
+    return (self._get_handler(self.STDOUT),
+            self._get_handler(self.STDERR),
+            self._handlers_are_files())
+
+  def _handlers_are_files(self):
+    """
+    Returns True if both the handlers are standard file objects.
+    """
+    return (self._destination == LoggerDestination.CONSOLE or
+        (self._destination == LoggerDestination.FILE and self._mode == LoggerMode.STANDARD))
 
   def _get_handler(self, name):
     """
-    Constructs correct handler by provided configuration
+    Constructs correct handler or file object based on the provided configuration.
     """
 
     # On no destination write logs to /dev/null
@@ -581,7 +621,7 @@ class LogDestinationResolver(object):
 
     # Streamed logs to predefined outputs
     if self._destination == LoggerDestination.CONSOLE:
-      return self._get_stream(name)
+      return sys.stdout if name == self.STDOUT else sys.stderr
 
     # Streaming AND file logs are required
     if self._destination == LoggerDestination.BOTH:
@@ -592,7 +632,7 @@ class LogDestinationResolver(object):
 
   def _get_file(self, name):
     if self._mode == LoggerMode.STANDARD:
-      return FileHandler(self._get_log_path(name))
+      return safe_open(self._get_log_path(name), mode='a')
     if self._mode == LoggerMode.ROTATE:
       log_size = int(self._rotate_log_size.as_(Data.BYTES))
       return RotatingFileHandler(self._get_log_path(name),
@@ -612,19 +652,25 @@ class LogDestinationResolver(object):
     return self._pathspec.with_filename(log_name).getpath('process_logdir')
 
 
-class FileHandler(object):
+class RotatingFileHandler(object):
   """
-  Base file handler.
+  File handler that implements max size/rotation.
   """
 
-  def __init__(self, filename, mode='w'):
+  def __init__(self, filename, max_bytes, max_backups, mode='w'):
     """
       required:
-        filename = The file name.
+        filename    = The file name.
+        max_bytes   = The maximum size of an individual log file.
+        max_backups = The maximum number of log file backups to create.
 
       optional:
         mode = Mode to open the file in.
     """
+    if max_bytes > 0 and max_backups <= 0:
+      raise ValueError('A positive value for max_backups must be specified if max_bytes > 0.')
+    self._max_bytes = max_bytes
+    self._max_backups = max_backups
     self.file = safe_open(filename, mode=mode)
     self.filename = filename
     self.mode = mode
@@ -638,31 +684,6 @@ class FileHandler(object):
   def write(self, b):
     self.file.write(b)
     self.file.flush()
-
-
-class RotatingFileHandler(FileHandler):
-  """
-  File handler that implements max size/rotation.
-  """
-
-  def __init__(self, filename, max_bytes, max_backups, mode='w'):
-    """
-      required:
-        filename    = The file name.
-        max_bytes   = The maximum size of an individual log file.
-        max_backups = The maximum number of log file backups to create.
-
-      optional:
-        mode = Mode to open the file in.
-    """
-    if max_bytes > 0 and max_backups <= 0:
-      raise ValueError('A positive value for max_backups must be specified if max_bytes > 0.')
-    self._max_bytes = max_bytes
-    self._max_backups = max_backups
-    super(RotatingFileHandler, self).__init__(filename, mode)
-
-  def write(self, b):
-    super(RotatingFileHandler, self).write(b)
     if self.should_rollover():
       self.rollover()
 

http://git-wip-us.apache.org/repos/asf/aurora/blob/c66a9eee/src/test/python/apache/thermos/core/test_process.py
----------------------------------------------------------------------
diff --git a/src/test/python/apache/thermos/core/test_process.py b/src/test/python/apache/thermos/core/test_process.py
index c339c91..77f644c 100644
--- a/src/test/python/apache/thermos/core/test_process.py
+++ b/src/test/python/apache/thermos/core/test_process.py
@@ -29,7 +29,6 @@ from twitter.common.recordio import ThriftRecordReader
 
 from apache.thermos.common.path import TaskPath
 from apache.thermos.core.process import (
-    FileHandler,
     LogDestinationResolver,
     LoggerDestination,
     LoggerMode,
@@ -326,29 +325,30 @@ def test_resolver_none_output():
   with temporary_dir() as td:
     taskpath = make_taskpath(td)
     r = LogDestinationResolver(taskpath, destination=LoggerDestination.NONE)
-    stdout, stderr = r.get_handlers()
+    stdout, stderr, handlers_are_files = r.get_handlers()
     assert type(stdout) == StreamHandler
     assert type(stderr) == StreamHandler
+    assert not handlers_are_files
 
 
 def test_resolver_console_output():
   with temporary_dir() as td:
     taskpath = make_taskpath(td)
     r = LogDestinationResolver(taskpath, destination=LoggerDestination.CONSOLE)
-    stdout, stderr = r.get_handlers()
-    assert type(stdout) == StreamHandler
-    assert type(stderr) == StreamHandler
-    assert stdout._stream == sys.stdout
-    assert stderr._stream == sys.stderr
+    stdout, stderr, handlers_are_files = r.get_handlers()
+    assert stdout == sys.stdout
+    assert stderr == sys.stderr
+    assert handlers_are_files
 
 
 def test_resolver_file_output():
   with temporary_dir() as td:
     taskpath = make_taskpath(td)
     r = LogDestinationResolver(taskpath, destination=LoggerDestination.FILE)
-    stdout, stderr = r.get_handlers()
-    assert type(stdout) == FileHandler
-    assert type(stderr) == FileHandler
+    stdout, stderr, handlers_are_files = r.get_handlers()
+    assert type(stdout) == file
+    assert type(stderr) == file
+    assert handlers_are_files
     assert_log_file_exists(taskpath, 'stdout')
     assert_log_file_exists(taskpath, 'stderr')
 
@@ -357,9 +357,10 @@ def test_resolver_both_output():
   with temporary_dir() as td:
     taskpath = make_taskpath(td)
     r = LogDestinationResolver(taskpath, destination=LoggerDestination.BOTH)
-    stdout, stderr = r.get_handlers()
+    stdout, stderr, handlers_are_files = r.get_handlers()
     assert type(stdout) == TeeHandler
     assert type(stderr) == TeeHandler
+    assert not handlers_are_files
     assert_log_file_exists(taskpath, 'stdout')
     assert_log_file_exists(taskpath, 'stderr')
 
@@ -371,9 +372,10 @@ def test_resolver_both_with_rotation_output():
                                mode=LoggerMode.ROTATE,
                                rotate_log_size=Amount(70, Data.BYTES),
                                rotate_log_backups=2)
-    stdout, stderr = r.get_handlers()
+    stdout, stderr, handlers_are_files = r.get_handlers()
     assert type(stdout) == TeeHandler
     assert type(stderr) == TeeHandler
+    assert not handlers_are_files
     assert_log_file_exists(taskpath, 'stdout')
     assert_log_file_exists(taskpath, 'stderr')
 

http://git-wip-us.apache.org/repos/asf/aurora/blob/c66a9eee/src/test/sh/org/apache/aurora/e2e/ephemeral_daemon_with_final.aurora
----------------------------------------------------------------------
diff --git a/src/test/sh/org/apache/aurora/e2e/ephemeral_daemon_with_final.aurora b/src/test/sh/org/apache/aurora/e2e/ephemeral_daemon_with_final.aurora
new file mode 100644
index 0000000..6158f28
--- /dev/null
+++ b/src/test/sh/org/apache/aurora/e2e/ephemeral_daemon_with_final.aurora
@@ -0,0 +1,47 @@
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import getpass
+
+ephemeral_daemon_process = Process(
+  name = 'ephemeral_daemon',
+  daemon = True,
+  ephemeral = True,
+  cmdline = 'echo "ephemeral daemon started"; sleep 3600')
+
+main_process = Process(
+  name = 'main',
+  cmdline = 'while [[ ! -e {{stop_file}} ]]; do sleep 1; done; echo "main OK"')
+
+final_process = Process(
+  name = 'final',
+  final = True,
+  cmdline = 'rm {{stop_file}}; echo "final OK"')
+
+test_task = Task(
+  name = 'ephemeral_daemon_with_final',
+  resources = Resources(cpu=0.4, ram=32*MB, disk=64*MB),
+  processes = [ephemeral_daemon_process, main_process, final_process])
+
+job = Job(
+  cluster = 'devcluster',
+  task = test_task,
+  role = getpass.getuser(),
+  environment = 'test',
+  contact = '{{role}}@localhost',
+)
+
+jobs = [
+  job(name = 'ephemeral_daemon_with_final')
+]

http://git-wip-us.apache.org/repos/asf/aurora/blob/c66a9eee/src/test/sh/org/apache/aurora/e2e/test_end_to_end.sh
----------------------------------------------------------------------
diff --git a/src/test/sh/org/apache/aurora/e2e/test_end_to_end.sh b/src/test/sh/org/apache/aurora/e2e/test_end_to_end.sh
index b469f9b..e1c12bb 100755
--- a/src/test/sh/org/apache/aurora/e2e/test_end_to_end.sh
+++ b/src/test/sh/org/apache/aurora/e2e/test_end_to_end.sh
@@ -55,6 +55,23 @@ check_url_live() {
   [[ $(curl -sL -w '%{http_code}' $1 -o /dev/null) == 200 ]]
 }
 
+test_file_removed() {
+  local _file=$1
+  local _success=0
+  for i in $(seq 1 10); do
+    if [[ ! -e $_file ]]; then
+      _success=1
+      break
+    fi
+    sleep 1
+  done
+
+  if [[ "$_success" -ne "1" ]]; then
+    echo "File was not removed."
+    exit 1
+  fi
+}
+
 test_version() {
   # The version number is written to stderr, making it necessary to redirect the output.
   [[ $(aurora --version 2>&1) = $(cat /vagrant/.auroraversion) ]]
@@ -79,8 +96,10 @@ test_inspect() {
 
 test_create() {
   local _jobkey=$1 _config=$2
+  shift; shift
+  local _extra_args="${@}"
 
-  aurora job create $_jobkey $_config
+  aurora job create $_jobkey $_config $_extra_args
 }
 
 test_job_status() {
@@ -288,6 +307,20 @@ test_admin() {
   aurora_admin get_scheduler $_cluster | grep ":8081"
 }
 
+test_ephemeral_daemon_with_final() {
+  local _cluster=$1 _role=$2 _env=$3 _job=$4 _config=$5
+  local _jobkey="$_cluster/$_role/$_env/$_job"
+  local _stop_file=$(mktemp)
+  local _extra_args="--bind stop_file=$_stop_file"
+  rm $_stop_file
+
+  test_create $_jobkey $_config $_extra_args
+  test_observer_ui $_cluster $_role $_job
+  test_job_status $_cluster $_role $_env $_job
+  touch $_stop_file  # Stops 'main_process'.
+  test_file_removed $_stop_file  # Removed by 'final_process'.
+}
+
 restore_netrc() {
   mv ~/.netrc.bak ~/.netrc >/dev/null 2>&1 || true
 }
@@ -323,6 +356,8 @@ TEST_JOB_DOCKER=http_example_docker
 TEST_CONFIG_FILE=$EXAMPLE_DIR/http_example.aurora
 TEST_CONFIG_UPDATED_FILE=$EXAMPLE_DIR/http_example_updated.aurora
 TEST_BAD_HEALTHCHECK_CONFIG_UPDATED_FILE=$EXAMPLE_DIR/http_example_bad_healthcheck.aurora
+TEST_EPHEMERAL_DAEMON_WITH_FINAL_JOB=ephemeral_daemon_with_final
+TEST_EPHEMERAL_DAEMON_WITH_FINAL_CONFIG_FILE=$TEST_ROOT/ephemeral_daemon_with_final.aurora
 
 BASE_ARGS=(
   $TEST_CLUSTER
@@ -341,6 +376,14 @@ TEST_JOB_DOCKER_ARGS=("${BASE_ARGS[@]}" "$TEST_JOB_DOCKER")
 
 TEST_ADMIN_ARGS=($TEST_CLUSTER)
 
+TEST_JOB_EPHEMERAL_DAEMON_WITH_FINAL_ARGS=(
+  $TEST_CLUSTER
+  $TEST_ROLE
+  $TEST_ENV
+  $TEST_EPHEMERAL_DAEMON_WITH_FINAL_JOB
+  $TEST_EPHEMERAL_DAEMON_WITH_FINAL_CONFIG_FILE
+)
+
 trap collect_result EXIT
 
 aurorabuild all
@@ -357,6 +400,8 @@ test_http_example "${TEST_JOB_DOCKER_ARGS[@]}"
 test_admin "${TEST_ADMIN_ARGS[@]}"
 test_basic_auth_unauthenticated  "${TEST_JOB_ARGS[@]}"
 
+test_ephemeral_daemon_with_final "${TEST_JOB_EPHEMERAL_DAEMON_WITH_FINAL_ARGS[@]}"
+
 /vagrant/src/test/sh/org/apache/aurora/e2e/test_kerberos_end_to_end.sh
 /vagrant/src/test/sh/org/apache/aurora/e2e/test_bypass_leader_redirect_end_to_end.sh
 RETCODE=0