You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@heron.apache.org by nl...@apache.org on 2018/10/10 20:57:58 UTC
[incubator-heron] branch master updated: Refactor heron_executor to
make it easier to customize (#3043)
This is an automated email from the ASF dual-hosted git repository.
nlu90 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-heron.git
The following commit(s) were added to refs/heads/master by this push:
new 65f90e0 Refactor heron_executor to make it easier to customize (#3043)
65f90e0 is described below
commit 65f90e039291a63de4ceb3f2df45fc3e2ce4357f
Author: Ning Wang <nw...@twitter.com>
AuthorDate: Wed Oct 10 13:57:53 2018 -0700
Refactor heron_executor to make it easier to customize (#3043)
* Refactor heron_executor to make it easier to customize
* fix unittest
---
heron/executor/src/python/heron_executor.py | 249 ++++++++++++---------
.../tests/python/heron_executor_unittest.py | 4 +-
2 files changed, 149 insertions(+), 104 deletions(-)
diff --git a/heron/executor/src/python/heron_executor.py b/heron/executor/src/python/heron_executor.py
index 3f44b53..b411b3e 100755
--- a/heron/executor/src/python/heron_executor.py
+++ b/heron/executor/src/python/heron_executor.py
@@ -171,7 +171,7 @@ class HeronExecutor(object):
""" Heron executor is a class that is responsible for running each of the process on a given
container. Based on the container id and the instance distribution, it determines if the container
is a master node or a worker node and it starts processes accordingly."""
- def init_parsed_args(self, parsed_args):
+ def init_from_parsed_args(self, parsed_args):
""" initialize from parsed arguments """
self.shard = parsed_args.shard
self.topology_name = parsed_args.topology_name
@@ -263,7 +263,7 @@ class HeronExecutor(object):
def __init__(self, args, shell_env):
parsed_args = self.parse_args(args)
- self.init_parsed_args(parsed_args)
+ self.init_from_parsed_args(parsed_args)
self.shell_env = shell_env
self.max_runs = 100
@@ -552,86 +552,117 @@ class HeronExecutor(object):
def _get_java_instance_cmd(self, instance_info):
retval = {}
# TO DO (Karthik) to be moved into keys and defaults files
- code_cache_size_mb = 64
- java_metasize_mb = 128
-
- java_version = self._get_jvm_version()
- java_metasize_param = 'MetaspaceSize'
- if java_version.startswith("1.7") or \
- java_version.startswith("1.6") or \
- java_version.startswith("1.5"):
- java_metasize_param = 'PermSize'
+ instance_class_name = 'org.apache.heron.instance.HeronInstance'
if self.jvm_remote_debugger_ports and \
(len(instance_info) > len(self.jvm_remote_debugger_ports)):
Log.warn("Not enough remote debugger ports for all instances!")
+ # Create id to java command map
for (instance_id, component_name, global_task_id, component_index) in instance_info:
- total_jvm_size = int(self.component_ram_map[component_name] / (1024 * 1024))
- heap_size_mb = total_jvm_size - code_cache_size_mb - java_metasize_mb
- Log.info("component name: %s, RAM request: %d, total JVM size: %dM, "
- "cache size: %dM, metaspace size: %dM"
- % (component_name, self.component_ram_map[component_name],
- total_jvm_size, code_cache_size_mb, java_metasize_mb))
- xmn_size = int(heap_size_mb / 2)
- instance_cmd = [os.path.join(self.heron_java_home, 'bin/java'),
- '-Xmx%dM' % heap_size_mb,
- '-Xms%dM' % heap_size_mb,
- '-Xmn%dM' % xmn_size,
- '-XX:Max%s=%dM' % (java_metasize_param, java_metasize_mb),
- '-XX:%s=%dM' % (java_metasize_param, java_metasize_mb),
- '-XX:ReservedCodeCacheSize=%dM' % code_cache_size_mb,
- '-XX:+CMSScavengeBeforeRemark',
- '-XX:TargetSurvivorRatio=90',
- '-XX:+PrintCommandLineFlags',
- '-verbosegc',
- '-XX:+PrintGCDetails',
- '-XX:+PrintGCTimeStamps',
- '-XX:+PrintGCDateStamps',
- '-XX:+PrintGCCause',
- '-XX:+UseGCLogFileRotation',
- '-XX:NumberOfGCLogFiles=5',
- '-XX:GCLogFileSize=100M',
- '-XX:+PrintPromotionFailure',
- '-XX:+PrintTenuringDistribution',
- '-XX:+PrintHeapAtGC',
- '-XX:+HeapDumpOnOutOfMemoryError',
- '-XX:+UseConcMarkSweepGC',
- '-XX:ParallelGCThreads=4',
- '-Xloggc:log-files/gc.%s.log' % instance_id]
-
+ # Append debugger ports
remote_debugger_port = None
if self.jvm_remote_debugger_ports:
remote_debugger_port = self.jvm_remote_debugger_ports.pop()
- instance_cmd.append('-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=%s'
- % remote_debugger_port)
-
- instance_args = ['-topology_name', self.topology_name,
- '-topology_id', self.topology_id,
- '-instance_id', instance_id,
- '-component_name', component_name,
- '-task_id', str(global_task_id),
- '-component_index', str(component_index),
- '-stmgr_id', self.stmgr_ids[self.shard],
- '-stmgr_port', self.tmaster_controller_port,
- '-metricsmgr_port', self.metrics_manager_port,
- '-system_config_file', self.heron_internals_config_file,
- '-override_config_file', self.override_config_file]
- if remote_debugger_port:
- instance_args += ['-remote_debugger_port', remote_debugger_port]
-
- instance_cmd = instance_cmd + self.instance_jvm_opts.split()
- if component_name in self.component_jvm_opts:
- instance_cmd = instance_cmd + self.component_jvm_opts[component_name].split()
-
- instance_cmd.extend(['-Djava.net.preferIPv4Stack=true',
- '-cp',
- '%s:%s' % (self.instance_classpath, self.classpath),
- 'org.apache.heron.instance.HeronInstance'] + instance_args)
+
+ instance_cmd = []
+ instance_cmd.append(self._get_jvm_instance_cmd()) # JVM command
+ instance_cmd.extend( # JVM options
+ self._get_jvm_instance_options(
+ instance_id, component_name, remote_debugger_port))
+ instance_cmd.append(instance_class_name) # Class name
+ instance_cmd.extend( # JVM arguments
+ self._get_jvm_instance_arguments(
+ instance_id, component_name, global_task_id, component_index, remote_debugger_port))
retval[instance_id] = instance_cmd
+
return retval
+ def _get_jvm_instance_cmd(self):
+ return os.path.join(self.heron_java_home, 'bin/java')
+
+ def _get_jvm_instance_options(self, instance_id, component_name, remote_debugger_port):
+ code_cache_size_mb = 64
+ java_metasize_mb = 128
+
+ total_jvm_size = int(self.component_ram_map[component_name] / (1024 * 1024))
+ heap_size_mb = total_jvm_size - code_cache_size_mb - java_metasize_mb
+ Log.info("component name: %s, RAM request: %d, total JVM size: %dM, "
+ "cache size: %dM, metaspace size: %dM"
+ % (component_name, self.component_ram_map[component_name],
+ total_jvm_size, code_cache_size_mb, java_metasize_mb))
+ xmn_size = int(heap_size_mb / 2)
+
+ java_version = self._get_jvm_version()
+ java_metasize_param = 'MetaspaceSize'
+ if java_version.startswith("1.7") or \
+ java_version.startswith("1.6") or \
+ java_version.startswith("1.5"):
+ java_metasize_param = 'PermSize'
+
+ instance_options = [
+ '-Xmx%dM' % heap_size_mb,
+ '-Xms%dM' % heap_size_mb,
+ '-Xmn%dM' % xmn_size,
+ '-XX:Max%s=%dM' % (java_metasize_param, java_metasize_mb),
+ '-XX:%s=%dM' % (java_metasize_param, java_metasize_mb),
+ '-XX:ReservedCodeCacheSize=%dM' % code_cache_size_mb,
+ '-XX:+CMSScavengeBeforeRemark',
+ '-XX:TargetSurvivorRatio=90',
+ '-XX:+PrintCommandLineFlags',
+ '-verbosegc',
+ '-XX:+PrintGCDetails',
+ '-XX:+PrintGCTimeStamps',
+ '-XX:+PrintGCDateStamps',
+ '-XX:+PrintGCCause',
+ '-XX:+UseGCLogFileRotation',
+ '-XX:NumberOfGCLogFiles=5',
+ '-XX:GCLogFileSize=100M',
+ '-XX:+PrintPromotionFailure',
+ '-XX:+PrintTenuringDistribution',
+ '-XX:+PrintHeapAtGC',
+ '-XX:+HeapDumpOnOutOfMemoryError',
+ '-XX:+UseConcMarkSweepGC',
+ '-XX:ParallelGCThreads=4',
+ '-Xloggc:log-files/gc.%s.log' % instance_id,
+ '-Djava.net.preferIPv4Stack=true',
+ '-cp',
+ '%s:%s'% (self.instance_classpath, self.classpath)]
+
+ # Append debugger ports when it is available
+ if remote_debugger_port:
+ instance_options.append('-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=%s'
+ % remote_debugger_port)
+
+ # Append user specified jvm options
+ instance_options.extend(self.instance_jvm_opts.split())
+ if component_name in self.component_jvm_opts:
+ instance_options.extend(self.component_jvm_opts[component_name].split())
+
+ return instance_options
+
+ def _get_jvm_instance_arguments(self, instance_id, component_name, global_task_id,
+ component_index, remote_debugger_port):
+ instance_args = [
+ '-topology_name', self.topology_name,
+ '-topology_id', self.topology_id,
+ '-instance_id', instance_id,
+ '-component_name', component_name,
+ '-task_id', str(global_task_id),
+ '-component_index', str(component_index),
+ '-stmgr_id', self.stmgr_ids[self.shard],
+ '-stmgr_port', self.tmaster_controller_port,
+ '-metricsmgr_port', self.metrics_manager_port,
+ '-system_config_file', self.heron_internals_config_file,
+ '-override_config_file', self.override_config_file]
+
+ # Append debugger ports when it is available
+ if remote_debugger_port:
+ instance_args += ['-remote_debugger_port', remote_debugger_port]
+
+ return instance_args
+
def _get_jvm_version(self):
if not self.jvm_version:
cmd = [os.path.join(self.heron_java_home, 'bin/java'),
@@ -894,6 +925,7 @@ class HeronExecutor(object):
def _start_processes(self, commands):
"""Start all commands and add them to the dict of processes to be monitored """
+ Log.info("Start processes")
processes_to_monitor = {}
# First start all the processes
for (name, command) in commands.items():
@@ -911,6 +943,7 @@ class HeronExecutor(object):
restarting any if they fail, up to max_runs times.
"""
# Now wait for any child to die
+ Log.info("Start process monitor")
while True:
if len(self.processes_to_monitor) > 0:
(pid, status) = os.wait()
@@ -1012,6 +1045,7 @@ class HeronExecutor(object):
"""
Receive updates to the packing plan from the statemgrs and update processes as needed.
"""
+ Log.info("Start state manager watches")
statemgr_config = StateMgrConfig()
statemgr_config.set_state_locations(configloader.load_state_manager_locations(
self.cluster, state_manager_config_file=self.state_manager_config_file,
@@ -1054,18 +1088,8 @@ class HeronExecutor(object):
for state_manager in self.state_managers:
state_manager.stop()
-def main():
- """Register exit handlers, initialize the executor and run it."""
- # Since Heron on YARN runs as headless users, pex compiled
- # binaries should be exploded into the container working
- # directory. In order to do this, we need to set the
- # PEX_ROOT shell environment before forking the processes
- shell_env = os.environ.copy()
- shell_env["PEX_ROOT"] = os.path.join(os.path.abspath('.'), ".pex")
-
- # Instantiate the executor, bind it to signal handlers and launch it
- executor = HeronExecutor(sys.argv, shell_env)
-
+def setup(executor):
+ """Set up log, process and signal handlers"""
# pylint: disable=unused-argument
def signal_handler(signal_to_handle, frame):
# We would do nothing here but just exit
@@ -1074,25 +1098,6 @@ def main():
executor.stop_state_manager_watches()
sys.exit(signal_to_handle)
- def setup(shardid):
- # Redirect stdout and stderr to files in append mode
- # The filename format is heron-executor-<container_id>.stdxxx
- log.configure(logfile='heron-executor-%s.stdout' % shardid)
-
- pid = os.getpid()
- sid = os.getsid(pid)
-
- # POSIX prohibits the change of the process group ID of a session leader
- if pid <> sid:
- Log.info('Set up process group; executor becomes leader')
- os.setpgrp() # create new process group, become its leader
-
- Log.info('Register the SIGTERM signal handler')
- signal.signal(signal.SIGTERM, signal_handler)
-
- Log.info('Register the atexit clean up')
- atexit.register(cleanup)
-
def cleanup():
"""Handler to trigger when receiving the SIGTERM signal
Do cleanup inside this method, including:
@@ -1102,11 +1107,51 @@ def main():
# We would not wait or check whether process spawned dead or not
os.killpg(0, signal.SIGTERM)
- setup(executor.shard)
+ # Redirect stdout and stderr to files in append mode
+ # The filename format is heron-executor-<container_id>.stdxxx
+ shardid = executor.shard
+ log.configure(logfile='heron-executor-%s.stdout' % shardid)
- executor.initialize()
+ pid = os.getpid()
+ sid = os.getsid(pid)
+
+ # POSIX prohibits the change of the process group ID of a session leader
+ if pid <> sid:
+ Log.info('Set up process group; executor becomes leader')
+ os.setpgrp() # create new process group, become its leader
+
+ Log.info('Register the SIGTERM signal handler')
+ signal.signal(signal.SIGTERM, signal_handler)
+
+ Log.info('Register the atexit clean up')
+ atexit.register(cleanup)
+
+def start(executor):
+ """Set up environment and start executor"""
+ setup(executor)
+
+ # Start state manager watches which are responsible for monitoring states and
+ # launch processes
executor.start_state_manager_watches()
+
+ # Start process monitor which are responsible for restarting processes when
+ # they are dead. This is the main loop of executor
executor.start_process_monitor()
+def main():
+ """Register exit handlers, initialize the executor and run it."""
+ # Since Heron on YARN runs as headless users, pex compiled
+ # binaries should be exploded into the container working
+ # directory. In order to do this, we need to set the
+ # PEX_ROOT shell environment before forking the processes
+ shell_env = os.environ.copy()
+ shell_env["PEX_ROOT"] = os.path.join(os.path.abspath('.'), ".pex")
+
+ # Instantiate the executor, bind it to signal handlers and launch it
+ executor = HeronExecutor(sys.argv, shell_env)
+ executor.initialize()
+
+ start(executor)
+
if __name__ == "__main__":
main()
diff --git a/heron/executor/tests/python/heron_executor_unittest.py b/heron/executor/tests/python/heron_executor_unittest.py
index 3ea9e5c..7571d14 100644
--- a/heron/executor/tests/python/heron_executor_unittest.py
+++ b/heron/executor/tests/python/heron_executor_unittest.py
@@ -146,8 +146,8 @@ class HeronExecutorTest(unittest.TestCase):
"-XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=5 -XX:GCLogFileSize=100M " \
"-XX:+PrintPromotionFailure -XX:+PrintTenuringDistribution -XX:+PrintHeapAtGC " \
"-XX:+HeapDumpOnOutOfMemoryError -XX:+UseConcMarkSweepGC -XX:ParallelGCThreads=4 " \
- "-Xloggc:log-files/gc.%s.log -XX:+HeapDumpOnOutOfMemoryError " \
- "-Djava.net.preferIPv4Stack=true -cp instance_classpath:classpath " \
+ "-Xloggc:log-files/gc.%s.log -Djava.net.preferIPv4Stack=true " \
+ "-cp instance_classpath:classpath -XX:+HeapDumpOnOutOfMemoryError " \
"org.apache.heron.instance.HeronInstance -topology_name topname -topology_id topid -instance_id %s -component_name %s -task_id %d -component_index 0 -stmgr_id stmgr-%d " \
"-stmgr_port tmaster_controller_port -metricsmgr_port metricsmgr_port -system_config_file %s -override_config_file %s" \
% (instance_name, instance_name, component_name, instance_id,