You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by as...@apache.org on 2021/01/20 23:28:55 UTC

[airflow-ci-infra] branch user-data-WIP created (now 5893384)

This is an automated email from the ASF dual-hosted git repository.

ash pushed a change to branch user-data-WIP
in repository https://gitbox.apache.org/repos/asf/airflow-ci-infra.git.


      at 5893384  A WIP/tmp commit.

This branch includes the following new commits:

     new 5893384  A WIP/tmp commit.

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[airflow-ci-infra] 01/01: A WIP/tmp commit.

Posted by as...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ash pushed a commit to branch user-data-WIP
in repository https://gitbox.apache.org/repos/asf/airflow-ci-infra.git

commit 5893384cd9a839bd61438afae319e205f69ebc7e
Author: Ash Berlin-Taylor <as...@firemirror.com>
AuthorDate: Wed Jan 20 23:27:34 2021 +0000

    A WIP/tmp commit.
    
    This runner-creds-fetcher script has been uploaded to S3 (it was too big
    to include in the userdata)
---
 cloud-init.yml          | 222 ++++++++++++++++++++++++++++++++++++++
 runner-creds-fetcher.py | 279 ++++++++++++++++++++++++++++++++++++++++++++++++
 2 files changed, 501 insertions(+)

diff --git a/cloud-init.yml b/cloud-init.yml
new file mode 100644
index 0000000..9bb37b0
--- /dev/null
+++ b/cloud-init.yml
@@ -0,0 +1,222 @@
+#cloud-config
+# 2021-01-20 22:12
+
+fs_setup:
+  - filesystem: ext4
+    device: /dev/nvme1n1
+    label: docker
+    partition: auto
+  - filesystem: ext4
+    device: /dev/nvme2n1
+    label: local-ssd
+    partition: auto
+
+mounts:
+  - [LABEL=docker, /var/lib/docker, auto, "defaults,nofail"]
+  - [LABEL=local-ssd, /opt/ssd, auto, "defaults,nofail,user=runner"]
+
+users:
+  - default
+  - name: runner
+
+packages:
+  - awscli
+  - build-essential
+  - docker-compose
+  - docker.io
+  - git
+  - iptables-persistent
+  - jq
+  - python3-dev
+  - python3-venv
+  - python3-wheel
+  - yarn
+
+runcmd:
+  -
+    - bash
+    - -c
+    - |
+      set -exu -o pipefail
+      echo "AWS_DEFAULT_REGION=$(cloud-init query region)" >> /etc/environment
+      set -a
+      . /etc/environment
+      set +a
+      echo "ASG_GROUP_NAME=$(aws ec2 describe-tags --filter Name=resource-id,Values=$(cloud-init query instance_id) Name=key,Values=aws:autoscaling:groupName \
+            | jq -r '@sh "\(.Tags[0].Value)"')" >> /etc/environment
+
+      echo 'ACTIONS_SQS_URL=https://sqs.eu-central-1.amazonaws.com/827901512104/actions-runner-requests' >> /etc/environment
+
+      # Add environment to cron job
+      #cat /etc/environment >> /etc/cron.d/cloudwatch-metrics-github-runners
+  - [systemctl, daemon-reload]
+  -
+    - bash
+    - -c
+    - |
+      python3 -mvenv /opt/runner-creds-lock
+      /opt/runner-creds-lock/bin/pip install -U pip python-dynamodb-lock-whatnick==0.9.3 click==7.1.2
+  -
+    - bash
+    - -c
+    - |
+      set -exu -o pipefail
+
+      usermod -G docker -a runner
+
+      mkdir -p ~runner/actions-runner
+      cd ~runner/actions-runner
+
+      RUNNER_VERSION="$0"
+
+      curl -L "https://github.com/ashb/runner/releases/download/v${RUNNER_VERSION}/actions-runner-linux-x64-${RUNNER_VERSION}.tar.gz" | tar -zx
+
+      aws s3 cp s3://airflow-ci-assets/get-runner-creds.py /opt/runner-creds-lock/bin/get-runner-creds
+      chmod 755 /opt/runner-creds-lock/bin/get-runner-creds
+    - 2.276.0-airflow1
+  - [mkdir, -p, /opt/ssd/hostedtoolcache, /opt/sdd/work, /opt/hostedtoolcache, /home/runner/actions-runner/_work ]
+  -
+    - bash
+    - -c
+    - |
+      echo '/opt/ssd/hostedtoolcache /opt/hostedtoolcache none defaults,bind 0 0' >> /etc/fstab
+      mount /opt/hostedtoolcache
+
+      echo '/opt/ssd/work /home/runner/actions-runner/_work none defaults,bind 0 0' >> /etc/fstab
+      mount /home/runner/actions-runner/_work
+      chown runner: /opt/ssd/work /home/runner/actions-runner/_work /opt/ssd/hostedtoolcache /opt/hostedtoolcache
+  - [systemctl, enable, --now, iptables.service]
+  - [systemctl, enable, actions.runner-credentials.service]
+  - [systemctl, enable, --now, actions.runner.service]
+  -
+    - bash
+    - -c
+    - |
+      echo "Pre-loading commonly used docker images from S3"
+      set -eux -o pipefail
+      aws s3 cp s3://airflow-ci-assets/pre-baked-images.tar.gz - | docker load
+
+write_files:
+  - path: /etc/systemd/system/actions.runner.service
+    content: |
+      [Unit]
+      Description=GitHub Actions Runner
+      After=network.target
+      Requires=actions.runner-credentials.service
+
+      [Service]
+      ExecStartPre=!/usr/local/sbin/runner-cleanup-workdir.sh
+      ExecStart=/home/runner/actions-runner/run.sh --once --startuptype service
+      EnvironmentFile=/etc/environment
+      User=runner
+      WorkingDirectory=/home/runner/actions-runner
+      KillMode=process
+      KillSignal=SIGTERM
+      TimeoutStopSec=5min
+      Restart=always
+
+      [Install]
+      WantedBy=multi-user.target
+
+  # Don't put this in ~runner, as these get written before the user is added, and this messes up creating the home dir
+  - path: /usr/local/sbin/runner-cleanup-workdir.sh
+    content: |
+      #!/bin/bash
+
+      if [[ -d ~runner/actions-runner/_work/airflow/airflow ]]; then
+        cd ~runner/actions-runner/_work/airflow/airflow
+
+        chown --changes -R runner: .
+        if [[ -e .git ]]; then
+          sudo -u runner bash -c "
+            git reset --hard && \
+            git submodule deinit --all -f && \
+            git submodule foreach git clean -fxd && \
+            git clean -fxd \
+          "
+        fi
+        docker ps -qa | xargs --no-run-if-empty docker rm -fv
+      fi
+
+      # We're idle now, ASG can kill us if it wants
+      aws autoscaling set-instance-protection  --no-protected-from-scale-in --instance-ids "$(cloud-init query instance_id)" --auto-scaling-group-name "$ASG_GROUP_NAME" >/dev/null
+
+      # Wait until we get an SQS message before continuing. This is mostly
+      # just for scailing reasons than anything else, and isn't really needed
+      # for actions runner to work
+      while true
+      do
+        msg="$(aws sqs receive-message --queue-url "$ACTIONS_SQS_URL" --max-number-of-messages 1)"
+        if [[ $? == 0 && -n "$msg" ]]; then
+          # We got a message!
+          aws --profile airflow sqs delete-message --queue-url "$ACTIONS_SQS_URL" --receipt-handle "$(jq '.Messages[0].ReceiptHandle' <<<"$msg" -r)"
+          # Set our instance to "busy" so ASG doesn't try to kill us
+          # TODO: This is a race -- some other runner may get the request from GitHub. We reset the protection
+          # every minute via cron job anyway.
+          aws autoscaling set-instance-protection --protected-from-scale-in --instance-ids "$(cloud-init query instance_id)" --auto-scaling-group-name "$ASG_GROUP_NAME" >/dev/null
+          exit 0
+        fi
+        sleep 5
+
+      done
+    owner: root:root
+    permissions: '0775'
+  - path: /etc/sudoers.d/runner
+    owner: root:root
+    permissions: '0440'
+    content: |
+      runner ALL=(ALL) NOPASSWD:/usr/sbin/swapoff -a, /usr/bin/rm -f /swapfile, /usr/bin/apt clean
+  - path: /etc/iptables/rules.v4
+    content: |
+      # Generated by iptables-save v1.8.4 on Thu Jan 14 13:59:27 2021
+      *filter
+      :INPUT ACCEPT [833:75929]
+      :FORWARD DROP [0:0]
+      :OUTPUT ACCEPT [794:143141]
+      :DOCKER-USER - [0:0]
+      -A FORWARD -j DOCKER-USER
+      # Dis-allow any docker container to access the metadata service
+      -A DOCKER-USER -d 169.254.169.254/32 -j REJECT --reject-with icmp-port-unreachable
+      -A DOCKER-USER -j RETURN
+      COMMIT
+
+  - path: /usr/local/sbin/actions-runner-ec2-reporting
+    permissions: '0775'
+    content: |
+      #!/bin/bash
+
+      if pgrep -c Runner.Worker >/dev/null; then
+          # Only report metric when we're doing something -- no point paying to submit zeros
+          aws cloudwatch put-metric-data --metric-name jobs-running --value "$(pgrep -c Runner.Worker)" --namespace github.actions
+          protection=--protected-from-scale-in
+      else
+          protection=--no-protected-from-scale-in
+      fi
+      aws autoscaling set-instance-protection "$protection" --instance-ids "$(cloud-init query instance_id)" --auto-scaling-group-name "$ASG_GROUP_NAME" >/dev/null
+
+  - path: /etc/cron.d/cloudwatch-metrics-github-runners
+    content: |
+      */1 * * * * nobody /usr/local/sbin/actions-runner-ec2-reporting
+
+  - path: /etc/systemd/system/actions.runner-credentials.service
+    content: |
+      [Unit]
+      Description=Credential fetcher for GitHub Actions Runner
+      After=network.target
+      Before=actions.runner.service
+
+      [Service]
+      Type=notify
+      ExecStart=/opt/runner-creds-lock/bin/python /opt/runner-creds-lock/bin/get-runner-creds
+      User=runner
+      WorkingDirectory=/home/runner/actions-runner
+      Restart=always
+      EnvironmentFile=/etc/environment
+
+      [Install]
+      WantedBy=multi-user.target
+apt:
+  sources:
+    yarn:
+      source: "deb https://dl.yarnpkg.com/debian/ stable main"
+      keyid: "1646B01B86E50310"
diff --git a/runner-creds-fetcher.py b/runner-creds-fetcher.py
new file mode 100755
index 0000000..335d450
--- /dev/null
+++ b/runner-creds-fetcher.py
@@ -0,0 +1,279 @@
+#!/usr/bin/env python3
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import datetime
+import json
+import logging
+import os
+import random
+import shutil
+import signal
+import socket
+from typing import Callable, List
+
+import boto3
+import click
+from python_dynamodb_lock.python_dynamodb_lock import DynamoDBLockClient, DynamoDBLockError
+
+logging.basicConfig(level=logging.INFO)
+log = logging.getLogger(__name__)
+log.setLevel(logging.DEBUG)
+
+
+@click.command()
+@click.option('--repo', default='apache/airflow')
+@click.option(
+    '--output-folder',
+    help="Folder to write credentials to. Default of ~runner/actions-runner",
+    default='~runner/actions-runner',
+)
+def main(repo, output_folder):
+    log.info("Starting...")
+    output_folder = os.path.expanduser(output_folder)
+
+    short_time = datetime.timedelta(microseconds=1)
+
+    dynamodb = boto3.resource('dynamodb')
+    client = DynamoDBLockClient(
+        dynamodb,
+        table_name='GitHubRunnerLocks',
+        expiry_period=datetime.timedelta(0, 300),
+        heartbeat_period=datetime.timedelta(seconds=10),
+    )
+
+    # Just keep trying until we get some credentials.
+    while True:
+        # Have each runner try to get a credential in a random order.
+        possibles = get_possible_credentials(repo)
+        random.shuffle(possibles)
+
+        log.info("Trying to get a set of credentials in this order: %r", possibles)
+
+        notify = get_sd_notify_func()
+
+        for index in possibles:
+            try:
+                lock = client.acquire_lock(
+                    f'{repo}/{index}',
+                    retry_period=short_time,
+                    retry_timeout=short_time,
+                    raise_context_exception=True,
+                )
+            except DynamoDBLockError as e:
+                log.info("Could not lock %s (%s)", index, e)
+                continue
+
+            with lock:
+                log.info("Obtained lock on %s", index)
+                write_credentials_to_files(repo, index, output_folder)
+                merge_in_settings(repo, output_folder)
+                notify(f"STATUS=Obtained lock on {index}")
+                complete_asg_lifecycle_hook()
+
+                def sig_handler(signal, frame):
+                    # no-op
+                    ...
+
+                signal.signal(signal.SIGINT, sig_handler)
+
+                notify("READY=1")
+                log.info("Waiting singal")
+                while True:
+                    # sleep until we are told to shut down
+                    signal.pause()
+                    log.info("Got signal")
+                    break
+
+            client.close()
+
+            exit()
+
+
+def get_sd_notify_func() -> Callable[[str], None]:
+    # http://www.freedesktop.org/software/systemd/man/sd_notify.html
+    addr = os.getenv('NOTIFY_SOCKET')
+    if not addr:
+        return lambda status: None
+
+    sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
+    if addr[0] == '@':
+        addr = '\0' + addr[1:]
+    sock.connect(addr)
+
+    def notify(status: str):
+        sock.sendall(status.encode('utf-8'))
+
+    return notify
+
+
+def write_credentials_to_files(repo: str, index: str, out_folder: str = '~runner/actions-runner'):
+    param_path = os.path.join('/runners/', repo, index)
+
+    resp = boto3.client("ssm").get_parameters_by_path(Path=param_path, Recursive=False, WithDecryption=True)
+
+    param_to_file = {
+        'config': '.runner',
+        'credentials': '.credentials',
+        'rsaparams': '.credentials_rsaparams',
+    }
+
+    for param in resp['Parameters']:
+        # "/runners/apache/airflow/config" -> "config"
+        name = os.path.basename(param['Name'])
+        filename = param_to_file.get(name, None)
+        if filename is None:
+            log.info("Unknown Parameter from SSM: %r", param['Name'])
+            continue
+        log.info("Writing %r to %r", param['Name'], filename)
+        with open(os.path.join(out_folder, filename), "w") as fh:
+            fh.write(param['Value'])
+            shutil.chown(fh.name, 'runner')
+            os.chmod(fh.name, 0o600)
+        del param_to_file[name]
+    if param_to_file:
+        raise RuntimeError(f"Missing expected params: {list(param_to_file.keys())}")
+
+
+def merge_in_settings(repo: str, out_folder: str) -> None:
+    client = boto3.client('ssm')
+
+    param_path = os.path.join('/runners/', repo, 'configOverlay')
+    log.info("Loading config overlay from %s", param_path)
+
+    try:
+
+        resp = client.get_parameter(Name=param_path, WithDecryption=True)
+    except client.exceptions.ParameterNotFound:
+        log.debug("Failed to load config overlay", exc_info=True)
+        return
+
+    try:
+        overlay = json.loads(resp['Parameter']['Value'])
+    except ValueError:
+        log.debug("Failed to parse config overlay", exc_info=True)
+        return
+
+    with open(os.path.join(out_folder, ".runner"), "r+") as fh:
+        settings = json.load(fh)
+
+        for key, val in overlay.items():
+            settings[key] = val
+
+        fh.seek(0, os.SEEK_SET)
+        os.ftruncate(fh.fileno(), 0)
+        json.dump(settings, fh, indent=2)
+
+
+def get_possible_credentials(repo: str) -> List[str]:
+    client = boto3.client("ssm")
+    paginator = client.get_paginator("describe_parameters")
+
+    path = os.path.join('/runners/', repo, '')
+    baked_path = os.path.join(path, 'runnersList')
+
+    # Pre-compute the list, to avoid making lots of requests and getting throttled by SSM API in case of
+    # thundering herd
+    try:
+        log.info("Using pre-computed credentials indexes from %s", baked_path)
+        resp = client.get_parameter(Name=baked_path)
+        return resp['Parameter']['Value'].split(',')
+    except client.exceptions.ParameterNotFound:
+        pass
+
+    log.info("Looking at %s for possible credentials", path)
+
+    pages = paginator.paginate(
+        ParameterFilters=[{"Key": "Path", "Option": "Recursive", "Values": [path]}],
+        PaginationConfig={
+            "PageSize": 50,
+        },
+    )
+
+    seen = set()
+
+    for i, page in enumerate(pages):
+        log.info("Page %d", i)
+        for param in page['Parameters']:
+            name = param['Name']
+            log.info("%s", name)
+
+            # '/runners/x/1/config' -> '1/config',
+            # '/runners/x/y/1/config' -> 'y/1/config',
+            local_name = name[len(path) :]
+
+            try:
+                # '1/config' -> '1'
+                index, _ = local_name.split('/')
+            except ValueError:
+                # Ignore any 'x/y' when we asked for 'x'. There should only be an index and a filename
+                log.debug("Ignoring nested path %s", name)
+                continue
+
+            try:
+                # Check it's a number, but keep variable as string
+                int(index)
+            except ValueError:
+                log.debug("Ignoring non-numeric index %s", name)
+                continue
+
+            index = os.path.basename(os.path.dirname(name))
+            seen.add(index)
+
+    if not seen:
+        raise RuntimeError(f'No credentials found in SSM ParameterStore for {repo!r}')
+
+    try:
+        resp = client.put_parameter(
+            Name=baked_path, Type='StringList', Value=','.join(list(seen)), Overwrite=False
+        )
+        log.info("Stored pre-computed credentials indexes at %s", baked_path)
+    except client.exceptions.ParameterAlreadyExists:
+        # Race, we lost, never mind!
+        pass
+
+    return list(seen)
+
+
+def complete_asg_lifecycle_hook():
+    # Notify the ASG LifeCycle hook that we are now In Service and ready to process request
+
+    # Fetch current instance ID from where cloutinit writes it to
+    with open('/var/lib/cloud/data/instance-id') as fh:
+        instance_id = fh.readline().strip()
+
+    # Get the ASG name we are attached to by looking at our own tags
+    ec2 = boto3.client('ec2')
+    tags = ec2.describe_tags(
+        Filters=[
+            {'Name': 'key', 'Values': ['aws:autoscaling:groupName']},
+            {'Name': 'resource-id', 'Values': [instance_id]},
+        ]
+    )
+
+    asg_name = tags['Tags'][0]['Value']
+
+    asg_client = boto3.client('autoscaling')
+    asg_client.complete_lifecycle_action(
+        AutoScalingGroupName=asg_name,
+        InstanceId=instance_id,
+        LifecycleHookName='WaitForInstanceReportReady',
+        LifecycleActionResult='CONTINUE',
+    )
+
+
+if __name__ == "__main__":
+    main()