You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by ma...@apache.org on 2016/03/23 01:22:49 UTC
aurora git commit: Ensure final processes are executed when ephemeral
daemon processes exist.
Repository: aurora
Updated Branches:
refs/heads/master b6d23afdf -> c66a9eeef
Ensure final processes are executed when ephemeral daemon processes exist.
Bugs closed: AURORA-1642
Reviewed at https://reviews.apache.org/r/45115/
Project: http://git-wip-us.apache.org/repos/asf/aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/c66a9eee
Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/c66a9eee
Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/c66a9eee
Branch: refs/heads/master
Commit: c66a9eeef97ebb0be367f76a3992ef9f17ebbde0
Parents: b6d23af
Author: Amol Deshmukh <am...@apache.org>
Authored: Tue Mar 22 17:22:40 2016 -0700
Committer: Maxim Khutornenko <ma...@apache.org>
Committed: Tue Mar 22 17:22:40 2016 -0700
----------------------------------------------------------------------
src/main/python/apache/thermos/core/process.py | 95 ++++++++++++--------
.../python/apache/thermos/core/test_process.py | 26 +++---
.../e2e/ephemeral_daemon_with_final.aurora | 47 ++++++++++
.../sh/org/apache/aurora/e2e/test_end_to_end.sh | 47 +++++++++-
4 files changed, 165 insertions(+), 50 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/aurora/blob/c66a9eee/src/main/python/apache/thermos/core/process.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/thermos/core/process.py b/src/main/python/apache/thermos/core/process.py
index f147af7..1791b5f 100644
--- a/src/main/python/apache/thermos/core/process.py
+++ b/src/main/python/apache/thermos/core/process.py
@@ -417,8 +417,13 @@ class Process(ProcessBase):
mode=self._logger_mode,
rotate_log_size=self._rotate_log_size,
rotate_log_backups=self._rotate_log_backups)
- stdout, stderr = log_destination_resolver.get_handlers()
- executor = PipedSubprocessExecutor(stdout=stdout,
+ stdout, stderr, handlers_are_files = log_destination_resolver.get_handlers()
+ if handlers_are_files:
+ executor = SubprocessExecutor(stdout=stdout,
+ stderr=stderr,
+ **subprocess_args)
+ else:
+ executor = PipedSubprocessExecutor(stdout=stdout,
stderr=stderr,
**subprocess_args)
@@ -486,9 +491,35 @@ class SubprocessExecutorBase(object):
raise NotImplementedError()
+class SubprocessExecutor(SubprocessExecutorBase):
+ """
+ Basic implementation of a SubprocessExecutor that writes stderr/stdout to specified output files.
+ """
+
+ def __init__(self, args, close_fds, cwd, env, pathspec, stdout=None, stderr=None):
+ """
+ See SubprocessExecutorBase.__init__
+
+ Takes additional arguments:
+ stdout = Destination handler for stdout output. Default is /dev/null.
+ stderr = Destination handler for stderr output. Default is /dev/null.
+ """
+ super(SubprocessExecutor, self).__init__(args, close_fds, cwd, env, pathspec)
+ self._stderr = stderr
+ self._stdout = stdout
+
+ def start(self):
+ self._popen = self._start_subprocess(self._stderr, self._stdout)
+ return self._popen.pid
+
+ def wait(self):
+ return self._popen.wait()
+
+
class PipedSubprocessExecutor(SubprocessExecutorBase):
"""
- Implementation of SubprocessExecutorBase that passes logs to provided destinations
+ Implementation of SubprocessExecutorBase that uses pipes to poll the pipes to output streams and
+ copies them to the specified destinations.
"""
READ_BUFFER_SIZE = 2 ** 16
@@ -537,7 +568,7 @@ class PipedSubprocessExecutor(SubprocessExecutorBase):
class LogDestinationResolver(object):
"""
- Resolves correct stdout/stderr destinations based on process configuration
+ Resolves correct stdout/stderr destinations based on process configuration.
"""
STDOUT = 'stdout'
@@ -568,11 +599,20 @@ class LogDestinationResolver(object):
"""
Creates stdout/stderr handler by provided configuration
"""
- return self._get_handler(self.STDOUT), self._get_handler(self.STDERR)
+ return (self._get_handler(self.STDOUT),
+ self._get_handler(self.STDERR),
+ self._handlers_are_files())
+
+ def _handlers_are_files(self):
+ """
+ Returns True if both the handlers are standard file objects.
+ """
+ return (self._destination == LoggerDestination.CONSOLE or
+ (self._destination == LoggerDestination.FILE and self._mode == LoggerMode.STANDARD))
def _get_handler(self, name):
"""
- Constructs correct handler by provided configuration
+ Constructs correct handler or file object based on the provided configuration.
"""
# On no destination write logs to /dev/null
@@ -581,7 +621,7 @@ class LogDestinationResolver(object):
# Streamed logs to predefined outputs
if self._destination == LoggerDestination.CONSOLE:
- return self._get_stream(name)
+ return sys.stdout if name == self.STDOUT else sys.stderr
# Streaming AND file logs are required
if self._destination == LoggerDestination.BOTH:
@@ -592,7 +632,7 @@ class LogDestinationResolver(object):
def _get_file(self, name):
if self._mode == LoggerMode.STANDARD:
- return FileHandler(self._get_log_path(name))
+ return safe_open(self._get_log_path(name), mode='a')
if self._mode == LoggerMode.ROTATE:
log_size = int(self._rotate_log_size.as_(Data.BYTES))
return RotatingFileHandler(self._get_log_path(name),
@@ -612,19 +652,25 @@ class LogDestinationResolver(object):
return self._pathspec.with_filename(log_name).getpath('process_logdir')
-class FileHandler(object):
+class RotatingFileHandler(object):
"""
- Base file handler.
+ File handler that implements max size/rotation.
"""
- def __init__(self, filename, mode='w'):
+ def __init__(self, filename, max_bytes, max_backups, mode='w'):
"""
required:
- filename = The file name.
+ filename = The file name.
+ max_bytes = The maximum size of an individual log file.
+ max_backups = The maximum number of log file backups to create.
optional:
mode = Mode to open the file in.
"""
+ if max_bytes > 0 and max_backups <= 0:
+ raise ValueError('A positive value for max_backups must be specified if max_bytes > 0.')
+ self._max_bytes = max_bytes
+ self._max_backups = max_backups
self.file = safe_open(filename, mode=mode)
self.filename = filename
self.mode = mode
@@ -638,31 +684,6 @@ class FileHandler(object):
def write(self, b):
self.file.write(b)
self.file.flush()
-
-
-class RotatingFileHandler(FileHandler):
- """
- File handler that implements max size/rotation.
- """
-
- def __init__(self, filename, max_bytes, max_backups, mode='w'):
- """
- required:
- filename = The file name.
- max_bytes = The maximum size of an individual log file.
- max_backups = The maximum number of log file backups to create.
-
- optional:
- mode = Mode to open the file in.
- """
- if max_bytes > 0 and max_backups <= 0:
- raise ValueError('A positive value for max_backups must be specified if max_bytes > 0.')
- self._max_bytes = max_bytes
- self._max_backups = max_backups
- super(RotatingFileHandler, self).__init__(filename, mode)
-
- def write(self, b):
- super(RotatingFileHandler, self).write(b)
if self.should_rollover():
self.rollover()
http://git-wip-us.apache.org/repos/asf/aurora/blob/c66a9eee/src/test/python/apache/thermos/core/test_process.py
----------------------------------------------------------------------
diff --git a/src/test/python/apache/thermos/core/test_process.py b/src/test/python/apache/thermos/core/test_process.py
index c339c91..77f644c 100644
--- a/src/test/python/apache/thermos/core/test_process.py
+++ b/src/test/python/apache/thermos/core/test_process.py
@@ -29,7 +29,6 @@ from twitter.common.recordio import ThriftRecordReader
from apache.thermos.common.path import TaskPath
from apache.thermos.core.process import (
- FileHandler,
LogDestinationResolver,
LoggerDestination,
LoggerMode,
@@ -326,29 +325,30 @@ def test_resolver_none_output():
with temporary_dir() as td:
taskpath = make_taskpath(td)
r = LogDestinationResolver(taskpath, destination=LoggerDestination.NONE)
- stdout, stderr = r.get_handlers()
+ stdout, stderr, handlers_are_files = r.get_handlers()
assert type(stdout) == StreamHandler
assert type(stderr) == StreamHandler
+ assert not handlers_are_files
def test_resolver_console_output():
with temporary_dir() as td:
taskpath = make_taskpath(td)
r = LogDestinationResolver(taskpath, destination=LoggerDestination.CONSOLE)
- stdout, stderr = r.get_handlers()
- assert type(stdout) == StreamHandler
- assert type(stderr) == StreamHandler
- assert stdout._stream == sys.stdout
- assert stderr._stream == sys.stderr
+ stdout, stderr, handlers_are_files = r.get_handlers()
+ assert stdout == sys.stdout
+ assert stderr == sys.stderr
+ assert handlers_are_files
def test_resolver_file_output():
with temporary_dir() as td:
taskpath = make_taskpath(td)
r = LogDestinationResolver(taskpath, destination=LoggerDestination.FILE)
- stdout, stderr = r.get_handlers()
- assert type(stdout) == FileHandler
- assert type(stderr) == FileHandler
+ stdout, stderr, handlers_are_files = r.get_handlers()
+ assert type(stdout) == file
+ assert type(stderr) == file
+ assert handlers_are_files
assert_log_file_exists(taskpath, 'stdout')
assert_log_file_exists(taskpath, 'stderr')
@@ -357,9 +357,10 @@ def test_resolver_both_output():
with temporary_dir() as td:
taskpath = make_taskpath(td)
r = LogDestinationResolver(taskpath, destination=LoggerDestination.BOTH)
- stdout, stderr = r.get_handlers()
+ stdout, stderr, handlers_are_files = r.get_handlers()
assert type(stdout) == TeeHandler
assert type(stderr) == TeeHandler
+ assert not handlers_are_files
assert_log_file_exists(taskpath, 'stdout')
assert_log_file_exists(taskpath, 'stderr')
@@ -371,9 +372,10 @@ def test_resolver_both_with_rotation_output():
mode=LoggerMode.ROTATE,
rotate_log_size=Amount(70, Data.BYTES),
rotate_log_backups=2)
- stdout, stderr = r.get_handlers()
+ stdout, stderr, handlers_are_files = r.get_handlers()
assert type(stdout) == TeeHandler
assert type(stderr) == TeeHandler
+ assert not handlers_are_files
assert_log_file_exists(taskpath, 'stdout')
assert_log_file_exists(taskpath, 'stderr')
http://git-wip-us.apache.org/repos/asf/aurora/blob/c66a9eee/src/test/sh/org/apache/aurora/e2e/ephemeral_daemon_with_final.aurora
----------------------------------------------------------------------
diff --git a/src/test/sh/org/apache/aurora/e2e/ephemeral_daemon_with_final.aurora b/src/test/sh/org/apache/aurora/e2e/ephemeral_daemon_with_final.aurora
new file mode 100644
index 0000000..6158f28
--- /dev/null
+++ b/src/test/sh/org/apache/aurora/e2e/ephemeral_daemon_with_final.aurora
@@ -0,0 +1,47 @@
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import getpass
+
+ephemeral_daemon_process = Process(
+ name = 'ephemeral_daemon',
+ daemon = True,
+ ephemeral = True,
+ cmdline = 'echo "ephemeral daemon started"; sleep 3600')
+
+main_process = Process(
+ name = 'main',
+ cmdline = 'while [[ ! -e {{stop_file}} ]]; do sleep 1; done; echo "main OK"')
+
+final_process = Process(
+ name = 'final',
+ final = True,
+ cmdline = 'rm {{stop_file}}; echo "final OK"')
+
+test_task = Task(
+ name = 'ephemeral_daemon_with_final',
+ resources = Resources(cpu=0.4, ram=32*MB, disk=64*MB),
+ processes = [ephemeral_daemon_process, main_process, final_process])
+
+job = Job(
+ cluster = 'devcluster',
+ task = test_task,
+ role = getpass.getuser(),
+ environment = 'test',
+ contact = '{{role}}@localhost',
+)
+
+jobs = [
+ job(name = 'ephemeral_daemon_with_final')
+]
http://git-wip-us.apache.org/repos/asf/aurora/blob/c66a9eee/src/test/sh/org/apache/aurora/e2e/test_end_to_end.sh
----------------------------------------------------------------------
diff --git a/src/test/sh/org/apache/aurora/e2e/test_end_to_end.sh b/src/test/sh/org/apache/aurora/e2e/test_end_to_end.sh
index b469f9b..e1c12bb 100755
--- a/src/test/sh/org/apache/aurora/e2e/test_end_to_end.sh
+++ b/src/test/sh/org/apache/aurora/e2e/test_end_to_end.sh
@@ -55,6 +55,23 @@ check_url_live() {
[[ $(curl -sL -w '%{http_code}' $1 -o /dev/null) == 200 ]]
}
+test_file_removed() {
+ local _file=$1
+ local _success=0
+ for i in $(seq 1 10); do
+ if [[ ! -e $_file ]]; then
+ _success=1
+ break
+ fi
+ sleep 1
+ done
+
+ if [[ "$_success" -ne "1" ]]; then
+ echo "File was not removed."
+ exit 1
+ fi
+}
+
test_version() {
# The version number is written to stderr, making it necessary to redirect the output.
[[ $(aurora --version 2>&1) = $(cat /vagrant/.auroraversion) ]]
@@ -79,8 +96,10 @@ test_inspect() {
test_create() {
local _jobkey=$1 _config=$2
+ shift; shift
+ local _extra_args="${@}"
- aurora job create $_jobkey $_config
+ aurora job create $_jobkey $_config $_extra_args
}
test_job_status() {
@@ -288,6 +307,20 @@ test_admin() {
aurora_admin get_scheduler $_cluster | grep ":8081"
}
+test_ephemeral_daemon_with_final() {
+ local _cluster=$1 _role=$2 _env=$3 _job=$4 _config=$5
+ local _jobkey="$_cluster/$_role/$_env/$_job"
+ local _stop_file=$(mktemp)
+ local _extra_args="--bind stop_file=$_stop_file"
+ rm $_stop_file
+
+ test_create $_jobkey $_config $_extra_args
+ test_observer_ui $_cluster $_role $_job
+ test_job_status $_cluster $_role $_env $_job
+ touch $_stop_file # Stops 'main_process'.
+ test_file_removed $_stop_file # Removed by 'final_process'.
+}
+
restore_netrc() {
mv ~/.netrc.bak ~/.netrc >/dev/null 2>&1 || true
}
@@ -323,6 +356,8 @@ TEST_JOB_DOCKER=http_example_docker
TEST_CONFIG_FILE=$EXAMPLE_DIR/http_example.aurora
TEST_CONFIG_UPDATED_FILE=$EXAMPLE_DIR/http_example_updated.aurora
TEST_BAD_HEALTHCHECK_CONFIG_UPDATED_FILE=$EXAMPLE_DIR/http_example_bad_healthcheck.aurora
+TEST_EPHEMERAL_DAEMON_WITH_FINAL_JOB=ephemeral_daemon_with_final
+TEST_EPHEMERAL_DAEMON_WITH_FINAL_CONFIG_FILE=$TEST_ROOT/ephemeral_daemon_with_final.aurora
BASE_ARGS=(
$TEST_CLUSTER
@@ -341,6 +376,14 @@ TEST_JOB_DOCKER_ARGS=("${BASE_ARGS[@]}" "$TEST_JOB_DOCKER")
TEST_ADMIN_ARGS=($TEST_CLUSTER)
+TEST_JOB_EPHEMERAL_DAEMON_WITH_FINAL_ARGS=(
+ $TEST_CLUSTER
+ $TEST_ROLE
+ $TEST_ENV
+ $TEST_EPHEMERAL_DAEMON_WITH_FINAL_JOB
+ $TEST_EPHEMERAL_DAEMON_WITH_FINAL_CONFIG_FILE
+)
+
trap collect_result EXIT
aurorabuild all
@@ -357,6 +400,8 @@ test_http_example "${TEST_JOB_DOCKER_ARGS[@]}"
test_admin "${TEST_ADMIN_ARGS[@]}"
test_basic_auth_unauthenticated "${TEST_JOB_ARGS[@]}"
+test_ephemeral_daemon_with_final "${TEST_JOB_EPHEMERAL_DAEMON_WITH_FINAL_ARGS[@]}"
+
/vagrant/src/test/sh/org/apache/aurora/e2e/test_kerberos_end_to_end.sh
/vagrant/src/test/sh/org/apache/aurora/e2e/test_bypass_leader_redirect_end_to_end.sh
RETCODE=0