You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by as...@apache.org on 2021/01/19 15:03:55 UTC

[airflow-ci-infra] 01/01: Lambda function to scale ASG based on Github webhooks

This is an automated email from the ASF dual-hosted git repository.

ash pushed a commit to branch autoscaling-lambda
in repository https://gitbox.apache.org/repos/asf/airflow-ci-infra.git

commit 9337f78138cc38b6d2ecee2838c5e68d6194c7ee
Author: Ash Berlin-Taylor <as...@firemirror.com>
AuthorDate: Tue Jan 19 15:01:20 2021 +0000

    Lambda function to scale ASG based on Github webhooks
---
 .pre-commit-config.yaml                        |   2 +-
 webhooks/scale_out_runner/.chalice/config.json |  10 ++
 webhooks/scale_out_runner/.gitignore           |  20 +++
 webhooks/scale_out_runner/app.py               | 177 +++++++++++++++++++++++++
 webhooks/scale_out_runner/requirements.txt     |  19 +++
 5 files changed, 227 insertions(+), 1 deletion(-)

diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml
index 02a11bd..fa32fe1 100644
--- a/.pre-commit-config.yaml
+++ b/.pre-commit-config.yaml
@@ -31,7 +31,7 @@ repos:
       - id: forbid-tabs
       - id: insert-license
         name: Add license
-        exclude: ^\.github/.*$|^license-templates/
+        exclude: ^\.github/.*$|^license-templates/|\.json$
         args:
           - --comment-style
           - "|#|"
diff --git a/webhooks/scale_out_runner/.chalice/config.json b/webhooks/scale_out_runner/.chalice/config.json
new file mode 100644
index 0000000..3bec265
--- /dev/null
+++ b/webhooks/scale_out_runner/.chalice/config.json
@@ -0,0 +1,10 @@
+{
+  "version": "2.0",
+  "app_name": "scale_out_runner",
+  "stages": {
+    "dev": {
+      "api_gateway_stage": "api"
+    }
+  },
+  "automatic_layer": true
+}
diff --git a/webhooks/scale_out_runner/.gitignore b/webhooks/scale_out_runner/.gitignore
new file mode 100644
index 0000000..9e4df26
--- /dev/null
+++ b/webhooks/scale_out_runner/.gitignore
@@ -0,0 +1,20 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+.chalice/deployments/
+.chalice/venv/
+__pycache__/
diff --git a/webhooks/scale_out_runner/app.py b/webhooks/scale_out_runner/app.py
new file mode 100644
index 0000000..fa34d9a
--- /dev/null
+++ b/webhooks/scale_out_runner/app.py
@@ -0,0 +1,177 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import hmac
+import json
+import os
+from typing import cast
+
+import boto3
+import pendulum
+from chalice import BadRequestError, Chalice, ForbiddenError
+from chalice.app import Request
+
+app = Chalice(app_name='scale_out_runner')
+
+INTERESTED_REPOS = os.getenv('REPOS', 'apache/airflow').split(',')
+ASG_GROUP_NAME = os.getenv('ASG_NAME', 'AshbRunnerASG')
+
+
+_commiters = set()
+
+
+@app.route('/', methods=['POST'])
+def index():
+    validate_gh_sig(app.current_request)
+
+    body = app.current_request.json_body
+
+    repo = body['repository']['full_name']
+
+    # Other repos configured with this app, but we don't do anything with them
+    # yet.
+    if repo not in INTERESTED_REPOS:
+        app.log.debug("Ignoring event for %r", repo)
+        return {'ignored': 'Other repo'}
+
+    sender = body['sender']['login']
+
+    use_self_hosted = sender in commiters()
+
+    payload = {'sender': sender, 'use_self_hosted': use_self_hosted}
+    if use_self_hosted:
+        if has_idle_instances():
+            payload['idle_instances'] = True
+        else:
+            payload['scaled_out'] = scale_out_runner_asg()
+    return payload
+
+
+def commiters(ssm_repo_name: str = os.getenv('SSM_REPO_NAME', 'apache/airflow')):
+    global _commiters
+
+    if not _commiters:
+        client = boto3.client('ssm')
+        param_path = os.path.join('/runners/', ssm_repo_name, 'configOverlay')
+        app.log.info("Loading config overlay from %s", param_path)
+
+        try:
+
+            resp = client.get_parameter(Name=param_path, WithDecryption=True)
+        except client.exceptions.ParameterNotFound:
+            app.log.debug("Failed to load config overlay", exc_info=True)
+            return set()
+
+        try:
+            overlay = json.loads(resp['Parameter']['Value'])
+        except ValueError:
+            app.log.debug("Failed to parse config overlay", exc_info=True)
+            return set()
+
+        _commiters = set(overlay['pullRequestSecurity']['allowedAuthors'])
+
+    return _commiters
+
+
+def validate_gh_sig(request: Request):
+    sig = request.headers['X-Hub-Signature-256']
+    if not sig.startswith('sha256='):
+        raise BadRequestError('X-Hub-Signature-256 not of expected format')
+
+    sig = sig[len('sha256=') :]
+    calculated_sig = sign_request_body(request)
+
+    app.log.debug('Checksum verification - expected %s got %s', calculated_sig, sig)
+
+    if not hmac.compare_digest(sig, calculated_sig):
+        raise ForbiddenError('Spoofed request')
+
+
+def sign_request_body(request: Request) -> str:
+    key = os.environ['GH_WEBHOOK_TOKEN'].encode('utf-8')
+    body = cast(bytes, request.raw_body)
+    return hmac.new(key, body, digestmod='SHA256').hexdigest()  # type: ignore
+
+
+def has_idle_instances():
+    client = boto3.client('cloudwatch')
+
+    end_time = pendulum.now().start_of('minute')
+    start_time = end_time.subtract(minutes=1)
+
+    resp = client.get_metric_data(
+        StartTime=start_time,
+        EndTime=end_time,
+        MaxDatapoints=1,
+        # This is likely far from perfect, as it only looks at the snapshot reported a minute ago.
+        MetricDataQueries=[
+            {"Id": "e1", "Expression": "m2 - m1", "Label": "Idle instances", "ReturnData": True},
+            {
+                "Id": "m2",
+                "MetricStat": {
+                    "Metric": {
+                        "Namespace": "AWS/AutoScaling",
+                        "MetricName": "GroupInServiceInstances",
+                        "Dimensions": [
+                            {
+                                "Name": "AutoScalingGroupName",
+                                "Value": ASG_GROUP_NAME,
+                            }
+                        ],
+                    },
+                    "Period": 60,
+                    "Stat": "Average",
+                },
+                "ReturnData": False,
+            },
+            {
+                "Id": "m1",
+                "MetricStat": {
+                    "Metric": {"Namespace": "github.actions", "MetricName": "jobs-running", "Dimensions": []},
+                    "Period": 60,
+                    "Stat": "Sum",
+                },
+                "ReturnData": False,
+            },
+        ],
+    )
+
+    idle_instances: float = resp['MetricDataResults'][0]['Values'][0]
+
+    return idle_instances > 0
+
+
+def scale_out_runner_asg():
+    asg = boto3.client('autoscaling')
+
+    resp = asg.describe_auto_scaling_groups(
+        AutoScalingGroupNames=[ASG_GROUP_NAME],
+    )
+
+    group = resp['AutoScalingGroups'][0]
+
+    desired = group['DesiredCapacity']
+    max_size = group['MaxSize']
+
+    try:
+        if desired < max_size:
+            asg.set_desired_capacity(AutoScalingGroupName=ASG_GROUP_NAME, DesiredCapacity=desired + 1)
+            return {'new capcity': desired + 1}
+        else:
+            return {'capcity_at_max': True}
+    except asg.exceptions.ScalingActivityInProgressFault as e:
+        return {'error': str(e)}
diff --git a/webhooks/scale_out_runner/requirements.txt b/webhooks/scale_out_runner/requirements.txt
new file mode 100644
index 0000000..97a5e83
--- /dev/null
+++ b/webhooks/scale_out_runner/requirements.txt
@@ -0,0 +1,19 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+boto3
+pendulum ~=2.0