You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@heron.apache.org by nl...@apache.org on 2018/10/10 20:57:58 UTC

[incubator-heron] branch master updated: Refactor heron_executor to make it easier to customize (#3043)

This is an automated email from the ASF dual-hosted git repository.

nlu90 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-heron.git


The following commit(s) were added to refs/heads/master by this push:
     new 65f90e0  Refactor heron_executor to make it easier to customize (#3043)
65f90e0 is described below

commit 65f90e039291a63de4ceb3f2df45fc3e2ce4357f
Author: Ning Wang <nw...@twitter.com>
AuthorDate: Wed Oct 10 13:57:53 2018 -0700

    Refactor heron_executor to make it easier to customize (#3043)
    
    * Refactor heron_executor to make it easier to customize
    
    * fix unittest
---
 heron/executor/src/python/heron_executor.py        | 249 ++++++++++++---------
 .../tests/python/heron_executor_unittest.py        |   4 +-
 2 files changed, 149 insertions(+), 104 deletions(-)

diff --git a/heron/executor/src/python/heron_executor.py b/heron/executor/src/python/heron_executor.py
index 3f44b53..b411b3e 100755
--- a/heron/executor/src/python/heron_executor.py
+++ b/heron/executor/src/python/heron_executor.py
@@ -171,7 +171,7 @@ class HeronExecutor(object):
   """ Heron executor is a class that is responsible for running each of the process on a given
   container. Based on the container id and the instance distribution, it determines if the container
   is a master node or a worker node and it starts processes accordingly."""
-  def init_parsed_args(self, parsed_args):
+  def init_from_parsed_args(self, parsed_args):
     """ initialize from parsed arguments """
     self.shard = parsed_args.shard
     self.topology_name = parsed_args.topology_name
@@ -263,7 +263,7 @@ class HeronExecutor(object):
 
   def __init__(self, args, shell_env):
     parsed_args = self.parse_args(args)
-    self.init_parsed_args(parsed_args)
+    self.init_from_parsed_args(parsed_args)
 
     self.shell_env = shell_env
     self.max_runs = 100
@@ -552,86 +552,117 @@ class HeronExecutor(object):
   def _get_java_instance_cmd(self, instance_info):
     retval = {}
     # TO DO (Karthik) to be moved into keys and defaults files
-    code_cache_size_mb = 64
-    java_metasize_mb = 128
-
-    java_version = self._get_jvm_version()
-    java_metasize_param = 'MetaspaceSize'
-    if java_version.startswith("1.7") or \
-            java_version.startswith("1.6") or \
-            java_version.startswith("1.5"):
-      java_metasize_param = 'PermSize'
+    instance_class_name = 'org.apache.heron.instance.HeronInstance'
 
     if self.jvm_remote_debugger_ports and \
             (len(instance_info) > len(self.jvm_remote_debugger_ports)):
       Log.warn("Not enough remote debugger ports for all instances!")
 
+    # Create id to java command map
     for (instance_id, component_name, global_task_id, component_index) in instance_info:
-      total_jvm_size = int(self.component_ram_map[component_name] / (1024 * 1024))
-      heap_size_mb = total_jvm_size - code_cache_size_mb - java_metasize_mb
-      Log.info("component name: %s, RAM request: %d, total JVM size: %dM, "
-               "cache size: %dM, metaspace size: %dM"
-               % (component_name, self.component_ram_map[component_name],
-                  total_jvm_size, code_cache_size_mb, java_metasize_mb))
-      xmn_size = int(heap_size_mb / 2)
-      instance_cmd = [os.path.join(self.heron_java_home, 'bin/java'),
-                      '-Xmx%dM' % heap_size_mb,
-                      '-Xms%dM' % heap_size_mb,
-                      '-Xmn%dM' % xmn_size,
-                      '-XX:Max%s=%dM' % (java_metasize_param, java_metasize_mb),
-                      '-XX:%s=%dM' % (java_metasize_param, java_metasize_mb),
-                      '-XX:ReservedCodeCacheSize=%dM' % code_cache_size_mb,
-                      '-XX:+CMSScavengeBeforeRemark',
-                      '-XX:TargetSurvivorRatio=90',
-                      '-XX:+PrintCommandLineFlags',
-                      '-verbosegc',
-                      '-XX:+PrintGCDetails',
-                      '-XX:+PrintGCTimeStamps',
-                      '-XX:+PrintGCDateStamps',
-                      '-XX:+PrintGCCause',
-                      '-XX:+UseGCLogFileRotation',
-                      '-XX:NumberOfGCLogFiles=5',
-                      '-XX:GCLogFileSize=100M',
-                      '-XX:+PrintPromotionFailure',
-                      '-XX:+PrintTenuringDistribution',
-                      '-XX:+PrintHeapAtGC',
-                      '-XX:+HeapDumpOnOutOfMemoryError',
-                      '-XX:+UseConcMarkSweepGC',
-                      '-XX:ParallelGCThreads=4',
-                      '-Xloggc:log-files/gc.%s.log' % instance_id]
-
+      # Append debugger ports
       remote_debugger_port = None
       if self.jvm_remote_debugger_ports:
         remote_debugger_port = self.jvm_remote_debugger_ports.pop()
-        instance_cmd.append('-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=%s'
-                            % remote_debugger_port)
-
-      instance_args = ['-topology_name', self.topology_name,
-                       '-topology_id', self.topology_id,
-                       '-instance_id', instance_id,
-                       '-component_name', component_name,
-                       '-task_id', str(global_task_id),
-                       '-component_index', str(component_index),
-                       '-stmgr_id', self.stmgr_ids[self.shard],
-                       '-stmgr_port', self.tmaster_controller_port,
-                       '-metricsmgr_port', self.metrics_manager_port,
-                       '-system_config_file', self.heron_internals_config_file,
-                       '-override_config_file', self.override_config_file]
-      if remote_debugger_port:
-        instance_args += ['-remote_debugger_port', remote_debugger_port]
-
-      instance_cmd = instance_cmd + self.instance_jvm_opts.split()
-      if component_name in self.component_jvm_opts:
-        instance_cmd = instance_cmd + self.component_jvm_opts[component_name].split()
-
-      instance_cmd.extend(['-Djava.net.preferIPv4Stack=true',
-                           '-cp',
-                           '%s:%s' % (self.instance_classpath, self.classpath),
-                           'org.apache.heron.instance.HeronInstance'] + instance_args)
+
+      instance_cmd = []
+      instance_cmd.append(self._get_jvm_instance_cmd())             # JVM command
+      instance_cmd.extend(                                          # JVM options
+          self._get_jvm_instance_options(
+              instance_id, component_name, remote_debugger_port))
+      instance_cmd.append(instance_class_name)                      # Class name
+      instance_cmd.extend(                                          # JVM arguments
+          self._get_jvm_instance_arguments(
+              instance_id, component_name, global_task_id, component_index, remote_debugger_port))
 
       retval[instance_id] = instance_cmd
+
     return retval
 
+  def _get_jvm_instance_cmd(self):
+    return os.path.join(self.heron_java_home, 'bin/java')
+
+  def _get_jvm_instance_options(self, instance_id, component_name, remote_debugger_port):
+    code_cache_size_mb = 64
+    java_metasize_mb = 128
+
+    total_jvm_size = int(self.component_ram_map[component_name] / (1024 * 1024))
+    heap_size_mb = total_jvm_size - code_cache_size_mb - java_metasize_mb
+    Log.info("component name: %s, RAM request: %d, total JVM size: %dM, "
+             "cache size: %dM, metaspace size: %dM"
+             % (component_name, self.component_ram_map[component_name],
+                total_jvm_size, code_cache_size_mb, java_metasize_mb))
+    xmn_size = int(heap_size_mb / 2)
+
+    java_version = self._get_jvm_version()
+    java_metasize_param = 'MetaspaceSize'
+    if java_version.startswith("1.7") or \
+            java_version.startswith("1.6") or \
+            java_version.startswith("1.5"):
+      java_metasize_param = 'PermSize'
+
+    instance_options = [
+        '-Xmx%dM' % heap_size_mb,
+        '-Xms%dM' % heap_size_mb,
+        '-Xmn%dM' % xmn_size,
+        '-XX:Max%s=%dM' % (java_metasize_param, java_metasize_mb),
+        '-XX:%s=%dM' % (java_metasize_param, java_metasize_mb),
+        '-XX:ReservedCodeCacheSize=%dM' % code_cache_size_mb,
+        '-XX:+CMSScavengeBeforeRemark',
+        '-XX:TargetSurvivorRatio=90',
+        '-XX:+PrintCommandLineFlags',
+        '-verbosegc',
+        '-XX:+PrintGCDetails',
+        '-XX:+PrintGCTimeStamps',
+        '-XX:+PrintGCDateStamps',
+        '-XX:+PrintGCCause',
+        '-XX:+UseGCLogFileRotation',
+        '-XX:NumberOfGCLogFiles=5',
+        '-XX:GCLogFileSize=100M',
+        '-XX:+PrintPromotionFailure',
+        '-XX:+PrintTenuringDistribution',
+        '-XX:+PrintHeapAtGC',
+        '-XX:+HeapDumpOnOutOfMemoryError',
+        '-XX:+UseConcMarkSweepGC',
+        '-XX:ParallelGCThreads=4',
+        '-Xloggc:log-files/gc.%s.log' % instance_id,
+        '-Djava.net.preferIPv4Stack=true',
+        '-cp',
+        '%s:%s'% (self.instance_classpath, self.classpath)]
+
+    # Append debugger ports when it is available
+    if remote_debugger_port:
+      instance_options.append('-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=%s'
+                              % remote_debugger_port)
+
+    # Append user specified jvm options
+    instance_options.extend(self.instance_jvm_opts.split())
+    if component_name in self.component_jvm_opts:
+      instance_options.extend(self.component_jvm_opts[component_name].split())
+
+    return instance_options
+
+  def _get_jvm_instance_arguments(self, instance_id, component_name, global_task_id,
+                                  component_index, remote_debugger_port):
+    instance_args = [
+        '-topology_name', self.topology_name,
+        '-topology_id', self.topology_id,
+        '-instance_id', instance_id,
+        '-component_name', component_name,
+        '-task_id', str(global_task_id),
+        '-component_index', str(component_index),
+        '-stmgr_id', self.stmgr_ids[self.shard],
+        '-stmgr_port', self.tmaster_controller_port,
+        '-metricsmgr_port', self.metrics_manager_port,
+        '-system_config_file', self.heron_internals_config_file,
+        '-override_config_file', self.override_config_file]
+
+    # Append debugger ports when it is available
+    if remote_debugger_port:
+      instance_args += ['-remote_debugger_port', remote_debugger_port]
+
+    return instance_args
+
   def _get_jvm_version(self):
     if not self.jvm_version:
       cmd = [os.path.join(self.heron_java_home, 'bin/java'),
@@ -894,6 +925,7 @@ class HeronExecutor(object):
 
   def _start_processes(self, commands):
     """Start all commands and add them to the dict of processes to be monitored """
+    Log.info("Start processes")
     processes_to_monitor = {}
     # First start all the processes
     for (name, command) in commands.items():
@@ -911,6 +943,7 @@ class HeronExecutor(object):
     restarting any if they fail, up to max_runs times.
     """
     # Now wait for any child to die
+    Log.info("Start process monitor")
     while True:
       if len(self.processes_to_monitor) > 0:
         (pid, status) = os.wait()
@@ -1012,6 +1045,7 @@ class HeronExecutor(object):
     """
     Receive updates to the packing plan from the statemgrs and update processes as needed.
     """
+    Log.info("Start state manager watches")
     statemgr_config = StateMgrConfig()
     statemgr_config.set_state_locations(configloader.load_state_manager_locations(
         self.cluster, state_manager_config_file=self.state_manager_config_file,
@@ -1054,18 +1088,8 @@ class HeronExecutor(object):
     for state_manager in self.state_managers:
       state_manager.stop()
 
-def main():
-  """Register exit handlers, initialize the executor and run it."""
-  # Since Heron on YARN runs as headless users, pex compiled
-  # binaries should be exploded into the container working
-  # directory. In order to do this, we need to set the
-  # PEX_ROOT shell environment before forking the processes
-  shell_env = os.environ.copy()
-  shell_env["PEX_ROOT"] = os.path.join(os.path.abspath('.'), ".pex")
-
-  # Instantiate the executor, bind it to signal handlers and launch it
-  executor = HeronExecutor(sys.argv, shell_env)
-
+def setup(executor):
+  """Set up log, process and signal handlers"""
   # pylint: disable=unused-argument
   def signal_handler(signal_to_handle, frame):
     # We would do nothing here but just exit
@@ -1074,25 +1098,6 @@ def main():
     executor.stop_state_manager_watches()
     sys.exit(signal_to_handle)
 
-  def setup(shardid):
-    # Redirect stdout and stderr to files in append mode
-    # The filename format is heron-executor-<container_id>.stdxxx
-    log.configure(logfile='heron-executor-%s.stdout' % shardid)
-
-    pid = os.getpid()
-    sid = os.getsid(pid)
-
-    # POSIX prohibits the change of the process group ID of a session leader
-    if pid <> sid:
-      Log.info('Set up process group; executor becomes leader')
-      os.setpgrp() # create new process group, become its leader
-
-    Log.info('Register the SIGTERM signal handler')
-    signal.signal(signal.SIGTERM, signal_handler)
-
-    Log.info('Register the atexit clean up')
-    atexit.register(cleanup)
-
   def cleanup():
     """Handler to trigger when receiving the SIGTERM signal
     Do cleanup inside this method, including:
@@ -1102,11 +1107,51 @@ def main():
     # We would not wait or check whether process spawned dead or not
     os.killpg(0, signal.SIGTERM)
 
-  setup(executor.shard)
+  # Redirect stdout and stderr to files in append mode
+  # The filename format is heron-executor-<container_id>.stdxxx
+  shardid = executor.shard
+  log.configure(logfile='heron-executor-%s.stdout' % shardid)
 
-  executor.initialize()
+  pid = os.getpid()
+  sid = os.getsid(pid)
+
+  # POSIX prohibits the change of the process group ID of a session leader
+  if pid <> sid:
+    Log.info('Set up process group; executor becomes leader')
+    os.setpgrp() # create new process group, become its leader
+
+  Log.info('Register the SIGTERM signal handler')
+  signal.signal(signal.SIGTERM, signal_handler)
+
+  Log.info('Register the atexit clean up')
+  atexit.register(cleanup)
+
+def start(executor):
+  """Set up environment and start executor"""
+  setup(executor)
+
+  # Start state manager watches which are responsible for monitoring states and
+  # launch processes
   executor.start_state_manager_watches()
+
+  # Start process monitor which are responsible for restarting processes when
+  # they are dead. This is the main loop of executor
   executor.start_process_monitor()
 
+def main():
+  """Register exit handlers, initialize the executor and run it."""
+  # Since Heron on YARN runs as headless users, pex compiled
+  # binaries should be exploded into the container working
+  # directory. In order to do this, we need to set the
+  # PEX_ROOT shell environment before forking the processes
+  shell_env = os.environ.copy()
+  shell_env["PEX_ROOT"] = os.path.join(os.path.abspath('.'), ".pex")
+
+  # Instantiate the executor, bind it to signal handlers and launch it
+  executor = HeronExecutor(sys.argv, shell_env)
+  executor.initialize()
+
+  start(executor)
+
 if __name__ == "__main__":
   main()
diff --git a/heron/executor/tests/python/heron_executor_unittest.py b/heron/executor/tests/python/heron_executor_unittest.py
index 3ea9e5c..7571d14 100644
--- a/heron/executor/tests/python/heron_executor_unittest.py
+++ b/heron/executor/tests/python/heron_executor_unittest.py
@@ -146,8 +146,8 @@ class HeronExecutorTest(unittest.TestCase):
            "-XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=5 -XX:GCLogFileSize=100M " \
            "-XX:+PrintPromotionFailure -XX:+PrintTenuringDistribution -XX:+PrintHeapAtGC " \
            "-XX:+HeapDumpOnOutOfMemoryError -XX:+UseConcMarkSweepGC -XX:ParallelGCThreads=4 " \
-           "-Xloggc:log-files/gc.%s.log -XX:+HeapDumpOnOutOfMemoryError " \
-           "-Djava.net.preferIPv4Stack=true -cp instance_classpath:classpath " \
+           "-Xloggc:log-files/gc.%s.log -Djava.net.preferIPv4Stack=true " \
+           "-cp instance_classpath:classpath -XX:+HeapDumpOnOutOfMemoryError " \
            "org.apache.heron.instance.HeronInstance -topology_name topname -topology_id topid -instance_id %s -component_name %s -task_id %d -component_index 0 -stmgr_id stmgr-%d " \
            "-stmgr_port tmaster_controller_port -metricsmgr_port metricsmgr_port -system_config_file %s -override_config_file %s" \
            % (instance_name, instance_name, component_name, instance_id,