You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by as...@apache.org on 2021/01/20 23:28:56 UTC

[airflow-ci-infra] 01/01: A WIP/tmp commit.

This is an automated email from the ASF dual-hosted git repository.

ash pushed a commit to branch user-data-WIP
in repository https://gitbox.apache.org/repos/asf/airflow-ci-infra.git

commit 5893384cd9a839bd61438afae319e205f69ebc7e
Author: Ash Berlin-Taylor <as...@firemirror.com>
AuthorDate: Wed Jan 20 23:27:34 2021 +0000

    A WIP/tmp commit.
    
    This runner-creds-fetcher script has been uploaded to S3 (it was too big
    to include in the userdata)
---
 cloud-init.yml          | 222 ++++++++++++++++++++++++++++++++++++++
 runner-creds-fetcher.py | 279 ++++++++++++++++++++++++++++++++++++++++++++++++
 2 files changed, 501 insertions(+)

diff --git a/cloud-init.yml b/cloud-init.yml
new file mode 100644
index 0000000..9bb37b0
--- /dev/null
+++ b/cloud-init.yml
@@ -0,0 +1,222 @@
+#cloud-config
+# 2021-01-20 22:12
+
+fs_setup:
+  - filesystem: ext4
+    device: /dev/nvme1n1
+    label: docker
+    partition: auto
+  - filesystem: ext4
+    device: /dev/nvme2n1
+    label: local-ssd
+    partition: auto
+
+mounts:
+  - [LABEL=docker, /var/lib/docker, auto, "defaults,nofail"]
+  - [LABEL=local-ssd, /opt/ssd, auto, "defaults,nofail,user=runner"]
+
+users:
+  - default
+  - name: runner
+
+packages:
+  - awscli
+  - build-essential
+  - docker-compose
+  - docker.io
+  - git
+  - iptables-persistent
+  - jq
+  - python3-dev
+  - python3-venv
+  - python3-wheel
+  - yarn
+
+runcmd:
+  -
+    - bash
+    - -c
+    - |
+      set -exu -o pipefail
+      echo "AWS_DEFAULT_REGION=$(cloud-init query region)" >> /etc/environment
+      set -a
+      . /etc/environment
+      set +a
+      echo "ASG_GROUP_NAME=$(aws ec2 describe-tags --filter Name=resource-id,Values=$(cloud-init query instance_id) Name=key,Values=aws:autoscaling:groupName \
+            | jq -r '@sh "\(.Tags[0].Value)"')" >> /etc/environment
+
+      echo 'ACTIONS_SQS_URL=https://sqs.eu-central-1.amazonaws.com/827901512104/actions-runner-requests' >> /etc/environment
+
+      # Add environment to cron job
+      #cat /etc/environment >> /etc/cron.d/cloudwatch-metrics-github-runners
+  - [systemctl, daemon-reload]
+  -
+    - bash
+    - -c
+    - |
+      python3 -mvenv /opt/runner-creds-lock
+      /opt/runner-creds-lock/bin/pip install -U pip python-dynamodb-lock-whatnick==0.9.3 click==7.1.2
+  -
+    - bash
+    - -c
+    - |
+      set -exu -o pipefail
+
+      usermod -G docker -a runner
+
+      mkdir -p ~runner/actions-runner
+      cd ~runner/actions-runner
+
+      RUNNER_VERSION="$0"
+
+      curl -L "https://github.com/ashb/runner/releases/download/v${RUNNER_VERSION}/actions-runner-linux-x64-${RUNNER_VERSION}.tar.gz" | tar -zx
+
+      aws s3 cp s3://airflow-ci-assets/get-runner-creds.py /opt/runner-creds-lock/bin/get-runner-creds
+      chmod 755 /opt/runner-creds-lock/bin/get-runner-creds
+    - 2.276.0-airflow1
+  - [mkdir, -p, /opt/ssd/hostedtoolcache, /opt/sdd/work, /opt/hostedtoolcache, /home/runner/actions-runner/_work ]
+  -
+    - bash
+    - -c
+    - |
+      echo '/opt/ssd/hostedtoolcache /opt/hostedtoolcache none defaults,bind 0 0' >> /etc/fstab
+      mount /opt/hostedtoolcache
+
+      echo '/opt/ssd/work /home/runner/actions-runner/_work none defaults,bind 0 0' >> /etc/fstab
+      mount /home/runner/actions-runner/_work
+      chown runner: /opt/ssd/work /home/runner/actions-runner/_work /opt/ssd/hostedtoolcache /opt/hostedtoolcache
+  - [systemctl, enable, --now, iptables.service]
+  - [systemctl, enable, actions.runner-credentials.service]
+  - [systemctl, enable, --now, actions.runner.service]
+  -
+    - bash
+    - -c
+    - |
+      echo "Pre-loading commonly used docker images from S3"
+      set -eux -o pipefail
+      aws s3 cp s3://airflow-ci-assets/pre-baked-images.tar.gz - | docker load
+
+write_files:
+  - path: /etc/systemd/system/actions.runner.service
+    content: |
+      [Unit]
+      Description=GitHub Actions Runner
+      After=network.target
+      Requires=actions.runner-credentials.service
+
+      [Service]
+      ExecStartPre=!/usr/local/sbin/runner-cleanup-workdir.sh
+      ExecStart=/home/runner/actions-runner/run.sh --once --startuptype service
+      EnvironmentFile=/etc/environment
+      User=runner
+      WorkingDirectory=/home/runner/actions-runner
+      KillMode=process
+      KillSignal=SIGTERM
+      TimeoutStopSec=5min
+      Restart=always
+
+      [Install]
+      WantedBy=multi-user.target
+
+  # Don't put this in ~runner, as these get written before the user is added, and this messes up creating the home dir
+  - path: /usr/local/sbin/runner-cleanup-workdir.sh
+    content: |
+      #!/bin/bash
+
+      if [[ -d ~runner/actions-runner/_work/airflow/airflow ]]; then
+        cd ~runner/actions-runner/_work/airflow/airflow
+
+        chown --changes -R runner: .
+        if [[ -e .git ]]; then
+          sudo -u runner bash -c "
+            git reset --hard && \
+            git submodule deinit --all -f && \
+            git submodule foreach git clean -fxd && \
+            git clean -fxd \
+          "
+        fi
+        docker ps -qa | xargs --no-run-if-empty docker rm -fv
+      fi
+
+      # We're idle now, ASG can kill us if it wants
+      aws autoscaling set-instance-protection  --no-protected-from-scale-in --instance-ids "$(cloud-init query instance_id)" --auto-scaling-group-name "$ASG_GROUP_NAME" >/dev/null
+
+      # Wait until we get an SQS message before continuing. This is mostly
+      # just for scailing reasons than anything else, and isn't really needed
+      # for actions runner to work
+      while true
+      do
+        msg="$(aws sqs receive-message --queue-url "$ACTIONS_SQS_URL" --max-number-of-messages 1)"
+        if [[ $? == 0 && -n "$msg" ]]; then
+          # We got a message!
+          aws --profile airflow sqs delete-message --queue-url "$ACTIONS_SQS_URL" --receipt-handle "$(jq '.Messages[0].ReceiptHandle' <<<"$msg" -r)"
+          # Set our instance to "busy" so ASG doesn't try to kill us
+          # TODO: This is a race -- some other runner may get the request from GitHub. We reset the protection
+          # every minute via cron job anyway.
+          aws autoscaling set-instance-protection --protected-from-scale-in --instance-ids "$(cloud-init query instance_id)" --auto-scaling-group-name "$ASG_GROUP_NAME" >/dev/null
+          exit 0
+        fi
+        sleep 5
+
+      done
+    owner: root:root
+    permissions: '0775'
+  - path: /etc/sudoers.d/runner
+    owner: root:root
+    permissions: '0440'
+    content: |
+      runner ALL=(ALL) NOPASSWD:/usr/sbin/swapoff -a, /usr/bin/rm -f /swapfile, /usr/bin/apt clean
+  - path: /etc/iptables/rules.v4
+    content: |
+      # Generated by iptables-save v1.8.4 on Thu Jan 14 13:59:27 2021
+      *filter
+      :INPUT ACCEPT [833:75929]
+      :FORWARD DROP [0:0]
+      :OUTPUT ACCEPT [794:143141]
+      :DOCKER-USER - [0:0]
+      -A FORWARD -j DOCKER-USER
+      # Dis-allow any docker container to access the metadata service
+      -A DOCKER-USER -d 169.254.169.254/32 -j REJECT --reject-with icmp-port-unreachable
+      -A DOCKER-USER -j RETURN
+      COMMIT
+
+  - path: /usr/local/sbin/actions-runner-ec2-reporting
+    permissions: '0775'
+    content: |
+      #!/bin/bash
+
+      if pgrep -c Runner.Worker >/dev/null; then
+          # Only report metric when we're doing something -- no point paying to submit zeros
+          aws cloudwatch put-metric-data --metric-name jobs-running --value "$(pgrep -c Runner.Worker)" --namespace github.actions
+          protection=--protected-from-scale-in
+      else
+          protection=--no-protected-from-scale-in
+      fi
+      aws autoscaling set-instance-protection "$protection" --instance-ids "$(cloud-init query instance_id)" --auto-scaling-group-name "$ASG_GROUP_NAME" >/dev/null
+
+  - path: /etc/cron.d/cloudwatch-metrics-github-runners
+    content: |
+      */1 * * * * nobody /usr/local/sbin/actions-runner-ec2-reporting
+
+  - path: /etc/systemd/system/actions.runner-credentials.service
+    content: |
+      [Unit]
+      Description=Credential fetcher for GitHub Actions Runner
+      After=network.target
+      Before=actions.runner.service
+
+      [Service]
+      Type=notify
+      ExecStart=/opt/runner-creds-lock/bin/python /opt/runner-creds-lock/bin/get-runner-creds
+      User=runner
+      WorkingDirectory=/home/runner/actions-runner
+      Restart=always
+      EnvironmentFile=/etc/environment
+
+      [Install]
+      WantedBy=multi-user.target
+apt:
+  sources:
+    yarn:
+      source: "deb https://dl.yarnpkg.com/debian/ stable main"
+      keyid: "1646B01B86E50310"
diff --git a/runner-creds-fetcher.py b/runner-creds-fetcher.py
new file mode 100755
index 0000000..335d450
--- /dev/null
+++ b/runner-creds-fetcher.py
@@ -0,0 +1,279 @@
+#!/usr/bin/env python3
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import datetime
+import json
+import logging
+import os
+import random
+import shutil
+import signal
+import socket
+from typing import Callable, List
+
+import boto3
+import click
+from python_dynamodb_lock.python_dynamodb_lock import DynamoDBLockClient, DynamoDBLockError
+
+logging.basicConfig(level=logging.INFO)
+log = logging.getLogger(__name__)
+log.setLevel(logging.DEBUG)
+
+
+@click.command()
+@click.option('--repo', default='apache/airflow')
+@click.option(
+    '--output-folder',
+    help="Folder to write credentials to. Default of ~runner/actions-runner",
+    default='~runner/actions-runner',
+)
+def main(repo, output_folder):
+    log.info("Starting...")
+    output_folder = os.path.expanduser(output_folder)
+
+    short_time = datetime.timedelta(microseconds=1)
+
+    dynamodb = boto3.resource('dynamodb')
+    client = DynamoDBLockClient(
+        dynamodb,
+        table_name='GitHubRunnerLocks',
+        expiry_period=datetime.timedelta(0, 300),
+        heartbeat_period=datetime.timedelta(seconds=10),
+    )
+
+    # Just keep trying until we get some credentials.
+    while True:
+        # Have each runner try to get a credential in a random order.
+        possibles = get_possible_credentials(repo)
+        random.shuffle(possibles)
+
+        log.info("Trying to get a set of credentials in this order: %r", possibles)
+
+        notify = get_sd_notify_func()
+
+        for index in possibles:
+            try:
+                lock = client.acquire_lock(
+                    f'{repo}/{index}',
+                    retry_period=short_time,
+                    retry_timeout=short_time,
+                    raise_context_exception=True,
+                )
+            except DynamoDBLockError as e:
+                log.info("Could not lock %s (%s)", index, e)
+                continue
+
+            with lock:
+                log.info("Obtained lock on %s", index)
+                write_credentials_to_files(repo, index, output_folder)
+                merge_in_settings(repo, output_folder)
+                notify(f"STATUS=Obtained lock on {index}")
+                complete_asg_lifecycle_hook()
+
+                def sig_handler(signal, frame):
+                    # no-op
+                    ...
+
+                signal.signal(signal.SIGINT, sig_handler)
+
+                notify("READY=1")
+                log.info("Waiting singal")
+                while True:
+                    # sleep until we are told to shut down
+                    signal.pause()
+                    log.info("Got signal")
+                    break
+
+            client.close()
+
+            exit()
+
+
+def get_sd_notify_func() -> Callable[[str], None]:
+    # http://www.freedesktop.org/software/systemd/man/sd_notify.html
+    addr = os.getenv('NOTIFY_SOCKET')
+    if not addr:
+        return lambda status: None
+
+    sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
+    if addr[0] == '@':
+        addr = '\0' + addr[1:]
+    sock.connect(addr)
+
+    def notify(status: str):
+        sock.sendall(status.encode('utf-8'))
+
+    return notify
+
+
+def write_credentials_to_files(repo: str, index: str, out_folder: str = '~runner/actions-runner'):
+    param_path = os.path.join('/runners/', repo, index)
+
+    resp = boto3.client("ssm").get_parameters_by_path(Path=param_path, Recursive=False, WithDecryption=True)
+
+    param_to_file = {
+        'config': '.runner',
+        'credentials': '.credentials',
+        'rsaparams': '.credentials_rsaparams',
+    }
+
+    for param in resp['Parameters']:
+        # "/runners/apache/airflow/config" -> "config"
+        name = os.path.basename(param['Name'])
+        filename = param_to_file.get(name, None)
+        if filename is None:
+            log.info("Unknown Parameter from SSM: %r", param['Name'])
+            continue
+        log.info("Writing %r to %r", param['Name'], filename)
+        with open(os.path.join(out_folder, filename), "w") as fh:
+            fh.write(param['Value'])
+            shutil.chown(fh.name, 'runner')
+            os.chmod(fh.name, 0o600)
+        del param_to_file[name]
+    if param_to_file:
+        raise RuntimeError(f"Missing expected params: {list(param_to_file.keys())}")
+
+
+def merge_in_settings(repo: str, out_folder: str) -> None:
+    client = boto3.client('ssm')
+
+    param_path = os.path.join('/runners/', repo, 'configOverlay')
+    log.info("Loading config overlay from %s", param_path)
+
+    try:
+
+        resp = client.get_parameter(Name=param_path, WithDecryption=True)
+    except client.exceptions.ParameterNotFound:
+        log.debug("Failed to load config overlay", exc_info=True)
+        return
+
+    try:
+        overlay = json.loads(resp['Parameter']['Value'])
+    except ValueError:
+        log.debug("Failed to parse config overlay", exc_info=True)
+        return
+
+    with open(os.path.join(out_folder, ".runner"), "r+") as fh:
+        settings = json.load(fh)
+
+        for key, val in overlay.items():
+            settings[key] = val
+
+        fh.seek(0, os.SEEK_SET)
+        os.ftruncate(fh.fileno(), 0)
+        json.dump(settings, fh, indent=2)
+
+
+def get_possible_credentials(repo: str) -> List[str]:
+    client = boto3.client("ssm")
+    paginator = client.get_paginator("describe_parameters")
+
+    path = os.path.join('/runners/', repo, '')
+    baked_path = os.path.join(path, 'runnersList')
+
+    # Pre-compute the list, to avoid making lots of requests and getting throttled by SSM API in case of
+    # thundering herd
+    try:
+        log.info("Using pre-computed credentials indexes from %s", baked_path)
+        resp = client.get_parameter(Name=baked_path)
+        return resp['Parameter']['Value'].split(',')
+    except client.exceptions.ParameterNotFound:
+        pass
+
+    log.info("Looking at %s for possible credentials", path)
+
+    pages = paginator.paginate(
+        ParameterFilters=[{"Key": "Path", "Option": "Recursive", "Values": [path]}],
+        PaginationConfig={
+            "PageSize": 50,
+        },
+    )
+
+    seen = set()
+
+    for i, page in enumerate(pages):
+        log.info("Page %d", i)
+        for param in page['Parameters']:
+            name = param['Name']
+            log.info("%s", name)
+
+            # '/runners/x/1/config' -> '1/config',
+            # '/runners/x/y/1/config' -> 'y/1/config',
+            local_name = name[len(path) :]
+
+            try:
+                # '1/config' -> '1'
+                index, _ = local_name.split('/')
+            except ValueError:
+                # Ignore any 'x/y' when we asked for 'x'. There should only be an index and a filename
+                log.debug("Ignoring nested path %s", name)
+                continue
+
+            try:
+                # Check it's a number, but keep variable as string
+                int(index)
+            except ValueError:
+                log.debug("Ignoring non-numeric index %s", name)
+                continue
+
+            index = os.path.basename(os.path.dirname(name))
+            seen.add(index)
+
+    if not seen:
+        raise RuntimeError(f'No credentials found in SSM ParameterStore for {repo!r}')
+
+    try:
+        resp = client.put_parameter(
+            Name=baked_path, Type='StringList', Value=','.join(list(seen)), Overwrite=False
+        )
+        log.info("Stored pre-computed credentials indexes at %s", baked_path)
+    except client.exceptions.ParameterAlreadyExists:
+        # Race, we lost, never mind!
+        pass
+
+    return list(seen)
+
+
+def complete_asg_lifecycle_hook():
+    # Notify the ASG LifeCycle hook that we are now In Service and ready to process request
+
+    # Fetch current instance ID from where cloutinit writes it to
+    with open('/var/lib/cloud/data/instance-id') as fh:
+        instance_id = fh.readline().strip()
+
+    # Get the ASG name we are attached to by looking at our own tags
+    ec2 = boto3.client('ec2')
+    tags = ec2.describe_tags(
+        Filters=[
+            {'Name': 'key', 'Values': ['aws:autoscaling:groupName']},
+            {'Name': 'resource-id', 'Values': [instance_id]},
+        ]
+    )
+
+    asg_name = tags['Tags'][0]['Value']
+
+    asg_client = boto3.client('autoscaling')
+    asg_client.complete_lifecycle_action(
+        AutoScalingGroupName=asg_name,
+        InstanceId=instance_id,
+        LifecycleHookName='WaitForInstanceReportReady',
+        LifecycleActionResult='CONTINUE',
+    )
+
+
+if __name__ == "__main__":
+    main()