You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by as...@apache.org on 2021/01/20 23:28:56 UTC
[airflow-ci-infra] 01/01: A WIP/tmp commit.
This is an automated email from the ASF dual-hosted git repository.
ash pushed a commit to branch user-data-WIP
in repository https://gitbox.apache.org/repos/asf/airflow-ci-infra.git
commit 5893384cd9a839bd61438afae319e205f69ebc7e
Author: Ash Berlin-Taylor <as...@firemirror.com>
AuthorDate: Wed Jan 20 23:27:34 2021 +0000
A WIP/tmp commit.
This runner-creds-fetcher script has been uploaded to S3 (it was too big
to include in the userdata)
---
cloud-init.yml | 222 ++++++++++++++++++++++++++++++++++++++
runner-creds-fetcher.py | 279 ++++++++++++++++++++++++++++++++++++++++++++++++
2 files changed, 501 insertions(+)
diff --git a/cloud-init.yml b/cloud-init.yml
new file mode 100644
index 0000000..9bb37b0
--- /dev/null
+++ b/cloud-init.yml
@@ -0,0 +1,222 @@
+#cloud-config
+# 2021-01-20 22:12
+
+fs_setup:
+ - filesystem: ext4
+ device: /dev/nvme1n1
+ label: docker
+ partition: auto
+ - filesystem: ext4
+ device: /dev/nvme2n1
+ label: local-ssd
+ partition: auto
+
+mounts:
+ - [LABEL=docker, /var/lib/docker, auto, "defaults,nofail"]
+ - [LABEL=local-ssd, /opt/ssd, auto, "defaults,nofail,user=runner"]
+
+users:
+ - default
+ - name: runner
+
+packages:
+ - awscli
+ - build-essential
+ - docker-compose
+ - docker.io
+ - git
+ - iptables-persistent
+ - jq
+ - python3-dev
+ - python3-venv
+ - python3-wheel
+ - yarn
+
+runcmd:
+ -
+ - bash
+ - -c
+ - |
+ set -exu -o pipefail
+ echo "AWS_DEFAULT_REGION=$(cloud-init query region)" >> /etc/environment
+ set -a
+ . /etc/environment
+ set +a
+ echo "ASG_GROUP_NAME=$(aws ec2 describe-tags --filter Name=resource-id,Values=$(cloud-init query instance_id) Name=key,Values=aws:autoscaling:groupName \
+ | jq -r '@sh "\(.Tags[0].Value)"')" >> /etc/environment
+
+ echo 'ACTIONS_SQS_URL=https://sqs.eu-central-1.amazonaws.com/827901512104/actions-runner-requests' >> /etc/environment
+
+ # Add environment to cron job
+ #cat /etc/environment >> /etc/cron.d/cloudwatch-metrics-github-runners
+ - [systemctl, daemon-reload]
+ -
+ - bash
+ - -c
+ - |
+ python3 -mvenv /opt/runner-creds-lock
+ /opt/runner-creds-lock/bin/pip install -U pip python-dynamodb-lock-whatnick==0.9.3 click==7.1.2
+ -
+ - bash
+ - -c
+ - |
+ set -exu -o pipefail
+
+ usermod -G docker -a runner
+
+ mkdir -p ~runner/actions-runner
+ cd ~runner/actions-runner
+
+ RUNNER_VERSION="$0"
+
+ curl -L "https://github.com/ashb/runner/releases/download/v${RUNNER_VERSION}/actions-runner-linux-x64-${RUNNER_VERSION}.tar.gz" | tar -zx
+
+ aws s3 cp s3://airflow-ci-assets/get-runner-creds.py /opt/runner-creds-lock/bin/get-runner-creds
+ chmod 755 /opt/runner-creds-lock/bin/get-runner-creds
+ - 2.276.0-airflow1
+ - [mkdir, -p, /opt/ssd/hostedtoolcache, /opt/sdd/work, /opt/hostedtoolcache, /home/runner/actions-runner/_work ]
+ -
+ - bash
+ - -c
+ - |
+ echo '/opt/ssd/hostedtoolcache /opt/hostedtoolcache none defaults,bind 0 0' >> /etc/fstab
+ mount /opt/hostedtoolcache
+
+ echo '/opt/ssd/work /home/runner/actions-runner/_work none defaults,bind 0 0' >> /etc/fstab
+ mount /home/runner/actions-runner/_work
+ chown runner: /opt/ssd/work /home/runner/actions-runner/_work /opt/ssd/hostedtoolcache /opt/hostedtoolcache
+ - [systemctl, enable, --now, iptables.service]
+ - [systemctl, enable, actions.runner-credentials.service]
+ - [systemctl, enable, --now, actions.runner.service]
+ -
+ - bash
+ - -c
+ - |
+ echo "Pre-loading commonly used docker images from S3"
+ set -eux -o pipefail
+ aws s3 cp s3://airflow-ci-assets/pre-baked-images.tar.gz - | docker load
+
+write_files:
+ - path: /etc/systemd/system/actions.runner.service
+ content: |
+ [Unit]
+ Description=GitHub Actions Runner
+ After=network.target
+ Requires=actions.runner-credentials.service
+
+ [Service]
+ ExecStartPre=!/usr/local/sbin/runner-cleanup-workdir.sh
+ ExecStart=/home/runner/actions-runner/run.sh --once --startuptype service
+ EnvironmentFile=/etc/environment
+ User=runner
+ WorkingDirectory=/home/runner/actions-runner
+ KillMode=process
+ KillSignal=SIGTERM
+ TimeoutStopSec=5min
+ Restart=always
+
+ [Install]
+ WantedBy=multi-user.target
+
+ # Don't put this in ~runner, as these get written before the user is added, and this messes up creating the home dir
+ - path: /usr/local/sbin/runner-cleanup-workdir.sh
+ content: |
+ #!/bin/bash
+
+ if [[ -d ~runner/actions-runner/_work/airflow/airflow ]]; then
+ cd ~runner/actions-runner/_work/airflow/airflow
+
+ chown --changes -R runner: .
+ if [[ -e .git ]]; then
+ sudo -u runner bash -c "
+ git reset --hard && \
+ git submodule deinit --all -f && \
+ git submodule foreach git clean -fxd && \
+ git clean -fxd \
+ "
+ fi
+ docker ps -qa | xargs --no-run-if-empty docker rm -fv
+ fi
+
+ # We're idle now, ASG can kill us if it wants
+ aws autoscaling set-instance-protection --no-protected-from-scale-in --instance-ids "$(cloud-init query instance_id)" --auto-scaling-group-name "$ASG_GROUP_NAME" >/dev/null
+
+ # Wait until we get an SQS message before continuing. This is mostly
+ # just for scailing reasons than anything else, and isn't really needed
+ # for actions runner to work
+ while true
+ do
+ msg="$(aws sqs receive-message --queue-url "$ACTIONS_SQS_URL" --max-number-of-messages 1)"
+ if [[ $? == 0 && -n "$msg" ]]; then
+ # We got a message!
+ aws --profile airflow sqs delete-message --queue-url "$ACTIONS_SQS_URL" --receipt-handle "$(jq '.Messages[0].ReceiptHandle' <<<"$msg" -r)"
+ # Set our instance to "busy" so ASG doesn't try to kill us
+ # TODO: This is a race -- some other runner may get the request from GitHub. We reset the protection
+ # every minute via cron job anyway.
+ aws autoscaling set-instance-protection --protected-from-scale-in --instance-ids "$(cloud-init query instance_id)" --auto-scaling-group-name "$ASG_GROUP_NAME" >/dev/null
+ exit 0
+ fi
+ sleep 5
+
+ done
+ owner: root:root
+ permissions: '0775'
+ - path: /etc/sudoers.d/runner
+ owner: root:root
+ permissions: '0440'
+ content: |
+ runner ALL=(ALL) NOPASSWD:/usr/sbin/swapoff -a, /usr/bin/rm -f /swapfile, /usr/bin/apt clean
+ - path: /etc/iptables/rules.v4
+ content: |
+ # Generated by iptables-save v1.8.4 on Thu Jan 14 13:59:27 2021
+ *filter
+ :INPUT ACCEPT [833:75929]
+ :FORWARD DROP [0:0]
+ :OUTPUT ACCEPT [794:143141]
+ :DOCKER-USER - [0:0]
+ -A FORWARD -j DOCKER-USER
+ # Dis-allow any docker container to access the metadata service
+ -A DOCKER-USER -d 169.254.169.254/32 -j REJECT --reject-with icmp-port-unreachable
+ -A DOCKER-USER -j RETURN
+ COMMIT
+
+ - path: /usr/local/sbin/actions-runner-ec2-reporting
+ permissions: '0775'
+ content: |
+ #!/bin/bash
+
+ if pgrep -c Runner.Worker >/dev/null; then
+ # Only report metric when we're doing something -- no point paying to submit zeros
+ aws cloudwatch put-metric-data --metric-name jobs-running --value "$(pgrep -c Runner.Worker)" --namespace github.actions
+ protection=--protected-from-scale-in
+ else
+ protection=--no-protected-from-scale-in
+ fi
+ aws autoscaling set-instance-protection "$protection" --instance-ids "$(cloud-init query instance_id)" --auto-scaling-group-name "$ASG_GROUP_NAME" >/dev/null
+
+ - path: /etc/cron.d/cloudwatch-metrics-github-runners
+ content: |
+ */1 * * * * nobody /usr/local/sbin/actions-runner-ec2-reporting
+
+ - path: /etc/systemd/system/actions.runner-credentials.service
+ content: |
+ [Unit]
+ Description=Credential fetcher for GitHub Actions Runner
+ After=network.target
+ Before=actions.runner.service
+
+ [Service]
+ Type=notify
+ ExecStart=/opt/runner-creds-lock/bin/python /opt/runner-creds-lock/bin/get-runner-creds
+ User=runner
+ WorkingDirectory=/home/runner/actions-runner
+ Restart=always
+ EnvironmentFile=/etc/environment
+
+ [Install]
+ WantedBy=multi-user.target
+apt:
+ sources:
+ yarn:
+ source: "deb https://dl.yarnpkg.com/debian/ stable main"
+ keyid: "1646B01B86E50310"
diff --git a/runner-creds-fetcher.py b/runner-creds-fetcher.py
new file mode 100755
index 0000000..335d450
--- /dev/null
+++ b/runner-creds-fetcher.py
@@ -0,0 +1,279 @@
+#!/usr/bin/env python3
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import datetime
+import json
+import logging
+import os
+import random
+import shutil
+import signal
+import socket
+from typing import Callable, List
+
+import boto3
+import click
+from python_dynamodb_lock.python_dynamodb_lock import DynamoDBLockClient, DynamoDBLockError
+
+logging.basicConfig(level=logging.INFO)
+log = logging.getLogger(__name__)
+log.setLevel(logging.DEBUG)
+
+
+@click.command()
+@click.option('--repo', default='apache/airflow')
+@click.option(
+ '--output-folder',
+ help="Folder to write credentials to. Default of ~runner/actions-runner",
+ default='~runner/actions-runner',
+)
+def main(repo, output_folder):
+ log.info("Starting...")
+ output_folder = os.path.expanduser(output_folder)
+
+ short_time = datetime.timedelta(microseconds=1)
+
+ dynamodb = boto3.resource('dynamodb')
+ client = DynamoDBLockClient(
+ dynamodb,
+ table_name='GitHubRunnerLocks',
+ expiry_period=datetime.timedelta(0, 300),
+ heartbeat_period=datetime.timedelta(seconds=10),
+ )
+
+ # Just keep trying until we get some credentials.
+ while True:
+ # Have each runner try to get a credential in a random order.
+ possibles = get_possible_credentials(repo)
+ random.shuffle(possibles)
+
+ log.info("Trying to get a set of credentials in this order: %r", possibles)
+
+ notify = get_sd_notify_func()
+
+ for index in possibles:
+ try:
+ lock = client.acquire_lock(
+ f'{repo}/{index}',
+ retry_period=short_time,
+ retry_timeout=short_time,
+ raise_context_exception=True,
+ )
+ except DynamoDBLockError as e:
+ log.info("Could not lock %s (%s)", index, e)
+ continue
+
+ with lock:
+ log.info("Obtained lock on %s", index)
+ write_credentials_to_files(repo, index, output_folder)
+ merge_in_settings(repo, output_folder)
+ notify(f"STATUS=Obtained lock on {index}")
+ complete_asg_lifecycle_hook()
+
+ def sig_handler(signal, frame):
+ # no-op
+ ...
+
+ signal.signal(signal.SIGINT, sig_handler)
+
+ notify("READY=1")
+ log.info("Waiting singal")
+ while True:
+ # sleep until we are told to shut down
+ signal.pause()
+ log.info("Got signal")
+ break
+
+ client.close()
+
+ exit()
+
+
+def get_sd_notify_func() -> Callable[[str], None]:
+ # http://www.freedesktop.org/software/systemd/man/sd_notify.html
+ addr = os.getenv('NOTIFY_SOCKET')
+ if not addr:
+ return lambda status: None
+
+ sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
+ if addr[0] == '@':
+ addr = '\0' + addr[1:]
+ sock.connect(addr)
+
+ def notify(status: str):
+ sock.sendall(status.encode('utf-8'))
+
+ return notify
+
+
+def write_credentials_to_files(repo: str, index: str, out_folder: str = '~runner/actions-runner'):
+ param_path = os.path.join('/runners/', repo, index)
+
+ resp = boto3.client("ssm").get_parameters_by_path(Path=param_path, Recursive=False, WithDecryption=True)
+
+ param_to_file = {
+ 'config': '.runner',
+ 'credentials': '.credentials',
+ 'rsaparams': '.credentials_rsaparams',
+ }
+
+ for param in resp['Parameters']:
+ # "/runners/apache/airflow/config" -> "config"
+ name = os.path.basename(param['Name'])
+ filename = param_to_file.get(name, None)
+ if filename is None:
+ log.info("Unknown Parameter from SSM: %r", param['Name'])
+ continue
+ log.info("Writing %r to %r", param['Name'], filename)
+ with open(os.path.join(out_folder, filename), "w") as fh:
+ fh.write(param['Value'])
+ shutil.chown(fh.name, 'runner')
+ os.chmod(fh.name, 0o600)
+ del param_to_file[name]
+ if param_to_file:
+ raise RuntimeError(f"Missing expected params: {list(param_to_file.keys())}")
+
+
+def merge_in_settings(repo: str, out_folder: str) -> None:
+ client = boto3.client('ssm')
+
+ param_path = os.path.join('/runners/', repo, 'configOverlay')
+ log.info("Loading config overlay from %s", param_path)
+
+ try:
+
+ resp = client.get_parameter(Name=param_path, WithDecryption=True)
+ except client.exceptions.ParameterNotFound:
+ log.debug("Failed to load config overlay", exc_info=True)
+ return
+
+ try:
+ overlay = json.loads(resp['Parameter']['Value'])
+ except ValueError:
+ log.debug("Failed to parse config overlay", exc_info=True)
+ return
+
+ with open(os.path.join(out_folder, ".runner"), "r+") as fh:
+ settings = json.load(fh)
+
+ for key, val in overlay.items():
+ settings[key] = val
+
+ fh.seek(0, os.SEEK_SET)
+ os.ftruncate(fh.fileno(), 0)
+ json.dump(settings, fh, indent=2)
+
+
+def get_possible_credentials(repo: str) -> List[str]:
+ client = boto3.client("ssm")
+ paginator = client.get_paginator("describe_parameters")
+
+ path = os.path.join('/runners/', repo, '')
+ baked_path = os.path.join(path, 'runnersList')
+
+ # Pre-compute the list, to avoid making lots of requests and getting throttled by SSM API in case of
+ # thundering herd
+ try:
+ log.info("Using pre-computed credentials indexes from %s", baked_path)
+ resp = client.get_parameter(Name=baked_path)
+ return resp['Parameter']['Value'].split(',')
+ except client.exceptions.ParameterNotFound:
+ pass
+
+ log.info("Looking at %s for possible credentials", path)
+
+ pages = paginator.paginate(
+ ParameterFilters=[{"Key": "Path", "Option": "Recursive", "Values": [path]}],
+ PaginationConfig={
+ "PageSize": 50,
+ },
+ )
+
+ seen = set()
+
+ for i, page in enumerate(pages):
+ log.info("Page %d", i)
+ for param in page['Parameters']:
+ name = param['Name']
+ log.info("%s", name)
+
+ # '/runners/x/1/config' -> '1/config',
+ # '/runners/x/y/1/config' -> 'y/1/config',
+ local_name = name[len(path) :]
+
+ try:
+ # '1/config' -> '1'
+ index, _ = local_name.split('/')
+ except ValueError:
+ # Ignore any 'x/y' when we asked for 'x'. There should only be an index and a filename
+ log.debug("Ignoring nested path %s", name)
+ continue
+
+ try:
+ # Check it's a number, but keep variable as string
+ int(index)
+ except ValueError:
+ log.debug("Ignoring non-numeric index %s", name)
+ continue
+
+ index = os.path.basename(os.path.dirname(name))
+ seen.add(index)
+
+ if not seen:
+ raise RuntimeError(f'No credentials found in SSM ParameterStore for {repo!r}')
+
+ try:
+ resp = client.put_parameter(
+ Name=baked_path, Type='StringList', Value=','.join(list(seen)), Overwrite=False
+ )
+ log.info("Stored pre-computed credentials indexes at %s", baked_path)
+ except client.exceptions.ParameterAlreadyExists:
+ # Race, we lost, never mind!
+ pass
+
+ return list(seen)
+
+
+def complete_asg_lifecycle_hook():
+ # Notify the ASG LifeCycle hook that we are now In Service and ready to process request
+
+ # Fetch current instance ID from where cloutinit writes it to
+ with open('/var/lib/cloud/data/instance-id') as fh:
+ instance_id = fh.readline().strip()
+
+ # Get the ASG name we are attached to by looking at our own tags
+ ec2 = boto3.client('ec2')
+ tags = ec2.describe_tags(
+ Filters=[
+ {'Name': 'key', 'Values': ['aws:autoscaling:groupName']},
+ {'Name': 'resource-id', 'Values': [instance_id]},
+ ]
+ )
+
+ asg_name = tags['Tags'][0]['Value']
+
+ asg_client = boto3.client('autoscaling')
+ asg_client.complete_lifecycle_action(
+ AutoScalingGroupName=asg_name,
+ InstanceId=instance_id,
+ LifecycleHookName='WaitForInstanceReportReady',
+ LifecycleActionResult='CONTINUE',
+ )
+
+
+if __name__ == "__main__":
+ main()