You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by as...@apache.org on 2021/05/18 09:46:28 UTC

[airflow-ci-infra] 01/01: Periodically try to complete the lifecycle hook if we are Pending

This is an automated email from the ASF dual-hosted git repository.

ash pushed a commit to branch better-lifecycle-hook
in repository https://gitbox.apache.org/repos/asf/airflow-ci-infra.git

commit 83c1c8009c8e5f021e01e46951f92bc857acbd17
Author: Ash Berlin-Taylor <as...@firemirror.com>
AuthorDate: Tue May 18 10:41:32 2021 +0100

    Periodically try to complete the lifecycle hook if we are Pending
    
    Thanks to now using the AMI, the instance can come up a lot quicker than
    previously, meaning it is possible the supervisor might start up when
    the instance state is still "Pending", which results in the
    complete_lifecycle_action call failing.
    
    Rather than blocking the actions.runner service from starting (and not
    running jobs) we continue, and check later if we are still pending, and
    try to complete the lifecycle action again.
---
 .../packer/files/runner-supervisor.py              | 73 ++++++++++------------
 1 file changed, 32 insertions(+), 41 deletions(-)

diff --git a/github-runner-ami/packer/files/runner-supervisor.py b/github-runner-ami/packer/files/runner-supervisor.py
index d3d0e0c..6f1fa30 100755
--- a/github-runner-ami/packer/files/runner-supervisor.py
+++ b/github-runner-ami/packer/files/runner-supervisor.py
@@ -162,7 +162,9 @@ def main(repo, output_folder, user):
                 write_credentials_to_files(repo, index, output_folder, user)
                 merge_in_settings(repo, output_folder)
                 notify(f"STATUS=Obtained lock on {index}")
-                complete_asg_lifecycle_hook()
+
+                if get_lifecycle_state() == "Pending:Wait":
+                    complete_asg_lifecycle_hook()
 
                 notify("READY=1")
                 log.info("Watching for Runner.Worker processes")
@@ -324,33 +326,40 @@ OWN_ASG = None
 INSTANCE_ID = None
 
 
-def complete_asg_lifecycle_hook(hook_name='WaitForInstanceReportReady'):
-    global OWN_ASG, INSTANCE_ID
-    # Notify the ASG LifeCycle hook that we are now In Service and ready to
-    # process requests/safe to be shut down
+def get_lifecycle_state() -> str:
+    global INSTANCE_ID, OWN_ASG
 
-    # Fetch current instance ID from where cloutinit writes it to
     if not INSTANCE_ID:
         with open('/var/lib/cloud/data/instance-id') as fh:
             INSTANCE_ID = fh.readline().strip()
 
-    # Get the ASG name we are attached to by looking at our own tags
+    asg_client = boto3.client('autoscaling')
+
+    try:
+        instances = asg_client.describe_auto_scaling_instances(
+            InstanceIds=[INSTANCE_ID],
+        )['AutoScalingInstances']
+    except asg_client.exceptions.ClientError:
+        return "UNKNOWN"
+
+    if len(instances) != 1:
+        return "UNKNOWN"
+
+    details = instances[0]
+
     if not OWN_ASG:
-        ec2 = boto3.client('ec2')
-        tags = ec2.describe_tags(
-            Filters=[
-                {'Name': 'key', 'Values': ['aws:autoscaling:groupName']},
-                {'Name': 'resource-id', 'Values': [INSTANCE_ID]},
-            ]
-        )
+        OWN_ASG = details['AutoScalingGroupName']
+
+    return details['LifecycleState']
 
-        if len(tags['Tags']) == 0:
-            # Not part of an ASG.
-            return
 
-        OWN_ASG = tags['Tags'][0]['Value']
+def complete_asg_lifecycle_hook(hook_name='WaitForInstanceReportReady', retry=False):
+    global OWN_ASG, INSTANCE_ID
+    # Notify the ASG LifeCycle hook that we are now InService and ready to
+    # process requests/safe to be shut down
 
     asg_client = boto3.client('autoscaling')
+
     try:
         asg_client.complete_lifecycle_action(
             AutoScalingGroupName=OWN_ASG,
@@ -364,7 +373,7 @@ def complete_asg_lifecycle_hook(hook_name='WaitForInstanceReportReady'):
         # completed, so this would fail. That is not an error
 
         # We don't want the stacktrace here, just the message
-        log.warning("Failed to complete lifecycle hook: %s", str(e))
+        log.warning("Failed to complete lifecycle hook %s: %s", hook_name, str(e))
         pass
 
 
@@ -564,9 +573,12 @@ class ProcessWatcher:
     def check_still_alive(self):
         # Check ASG status
         if not self.in_termating_lifecycle:
-            if self.check_for_asg_termiate_requested():
+            state = get_lifecycle_state()
+            if state == 'Terminating:Wait':
                 self.in_termating_lifecycle = True
                 self.gracefully_terminate_runner()
+            elif state == 'Pending:Wait':
+                complete_asg_lifecycle_hook()
 
         # proc_connector is un-reliable (UDP) so periodically check if the processes are still alive
         if not self.interesting_processes:
@@ -593,27 +605,6 @@ class ProcessWatcher:
     def gracefully_terminate_runner(self):
         check_call(['systemctl', 'stop', 'actions.runner', '--no-block'])
 
-    def check_for_asg_termiate_requested(self):
-        """Check if the ASG stat of this instance is in the "Terminating:Wait" lifecycle state"""
-        if not OWN_ASG:
-            # Not part of an ASG
-            return False
-
-        asg_client = boto3.client('autoscaling')
-        try:
-            instances = asg_client.describe_auto_scaling_instances(
-                InstanceIds=[INSTANCE_ID],
-            )['AutoScalingInstances']
-
-            if len(instances) != 1:
-                return False
-
-            state = instances[0]['LifecycleState']
-
-            return state == 'Terminating:Wait'
-        except asg_client.exceptions.ClientError:
-            return False
-
     def protect_from_scale_in(self, protect: bool = True):
         """ Set (or unset) ProtectedFromScaleIn on our instance"""
         if not OWN_ASG: