You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2020/10/09 15:38:15 UTC

[GitHub] [airflow] djrut opened a new issue #11377: "run id already exists" error when attempting to schedule multiple dag runs per second

djrut opened a new issue #11377:
URL: https://github.com/apache/airflow/issues/11377


   **Apache Airflow version**: 1.10.9+composer
   **Kubernetes version**: 1.16.13-gke.401
   
   **Environment**:
   
   - **Cloud provider or hardware configuration**: Google Cloud Composer
   - **OS** (e.g. from /etc/os-release):
   
   ```NAME="Ubuntu"
   VERSION="16.04.7 LTS (Xenial Xerus)"
   ID=ubuntu
   ID_LIKE=debian
   PRETTY_NAME="Ubuntu 16.04.7 LTS"
   VERSION_ID="16.04"
   ```
   
   - **Kernel**: ```Linux airflow-worker-7d8654b878-5kxxr 4.19.112+ #1 SMP Fri Sep 4 12:00:04 PDT 2020 x86_64 x86_64 x86_64 GNU/Linux```
   - **Install tools**: gcloud
   - **Others**:
   
   **What happened**:
   
   Use-case: ETL DAG triggered by GCS Object Change notification (GCS -> Pub/Sub -> Cloud Function).
   
   Getting the following error reported by triggering function when multiple dag run are triggered within 1 second interval:
   ```'{"error":"Run id manual__2020-10-09T14:12:13+00:00 already exists for dag id example-generation"}```
   
   Full stack trace:
   
   ```
   2020-10-09 09:12:13.723 CDTgenerate_training_examplesgdlq2rp9sgmd Traceback (most recent call last): File "/layers/google.python.functions-framework/functions-framework/lib/python3.8/site-packages/flask/app.py", line 2447, in wsgi_app response = self.full_dispatch_request() File "/layers/google.python.functions-framework/functions-framework/lib/python3.8/site-packages/flask/app.py", line 1952, in full_dispatch_request rv = self.handle_user_exception(e) File "/layers/google.python.functions-framework/functions-framework/lib/python3.8/site-packages/flask/app.py", line 1821, in handle_user_exception reraise(exc_type, exc_value, tb) File "/layers/google.python.functions-framework/functions-framework/lib/python3.8/site-packages/flask/_compat.py", line 39, in reraise raise value File "/layers/google.python.functions-framework/functions-framework/lib/python3.8/site-packages/flask/app.py", line 1950, in full_dispatch_request rv = self.dispatch_request() File "/layers/google.python.functio
 ns-framework/functions-framework/lib/python3.8/site-packages/flask/app.py", line 1936, in dispatch_request return self.view_functions[rule.endpoint](**req.view_args) File "/layers/google.python.functions-framework/functions-framework/lib/python3.8/site-packages/functions_framework/__init__.py", line 102, in view_func function(data, context) File "/workspace/main.py", line 49, in generate_training_examples make_iap_request( File "/workspace/main.py", line 88, in make_iap_request raise Exception( Exception: Bad response from application: 400 / {'Date': 'Fri, 09 Oct 2020 14:12:13 GMT', 'Content-Type': 'application/json', 'Content-Length': '98', 'Server': 'gunicorn/19.10.0', 'Via': '1.1 google', 'Alt-Svc': 'h3-Q050=":443"; ma=2592000,h3-29=":443"; ma=2592000,h3-27=":443"; ma=2592000,h3-T051=":443"; ma=2592000,h3-T050=":443"; ma=2592000,h3-Q046=":443"; ma=2592000,h3-Q043=":443"; ma=2592000,quic=":443"; ma=2592000; v="46,43"'} / '{"error":"Run id manual__2020-10-09T14:12:13+00:00 already 
 exists for dag id <redacted>"}\n'
   ```
   Trigger request code (runs in Cloud Function):
   
   ```
   client_id = os.getenv("CLIENT_ID")
       # This should be part of your webserver's URL:
       # {tenant-project-id}.appspot.com
       webserver_id = os.getenv("TENANT_PROJECT")
       # The name of the DAG you wish to trigger
       dag_name = os.getenv("DAG_NAME")
       webserver_url = (
           'https://'
           + webserver_id
           + '.appspot.com/api/experimental/dags/'
           + dag_name
           + '/dag_runs'
       )
       # Make a POST request to IAP which then Triggers the DAG
       data['run_id'] = datetime.utcnow().strftime('alpaca_%Y-%m-%dT%H:%M:%S.%f')
       data['replace_microseconds'] = False
   
       conf = {"conf": data}
       print(f"JSON body = {conf}")
   
       make_iap_request(
           webserver_url, client_id, method='POST', json={"conf": data})
   ```
   
   The root underlying cause is that run_id are not created with millisecond resolution in time stamp, i.e: ```manual__2020-10-09T14:12:13+00:00```. Needless to say, if multiple dag_runs are initiated within a 1 second period, this will fail as observed.
   
   **What you expected to happen**:
   
   I tried two workarounds to fix this issue.
   
   ### Workaround 1 - pass custom "run_id" to trigger dag call following guidance [here](https://github.com/apache/airflow/issues/9790).
   
   Added ```data['run_id'] = datetime.utcnow().strftime('alpaca_%Y-%m-%dT%H:%M:%S.%f')``` to request body.
   
   I confirmed that this setting is being received as part of dag_run["conf"]:
   ```
   [2020-10-09 15:18:45,862] {logging_mixin.py:112} INFO - Config = {'@type': 'type.googleapis.com/google.pubsub.v1.PubsubMessage', 'attributes': {'bucketId': '...', 'eventTime': '2020-10-09T14:46:45.145167Z', 'eventType': 'OBJECT_FINALIZE', 'notificationConfig': '...', 'objectGeneration': '...', 'objectId': '...', 'payloadFormat': 'JSON_API_V1'}, 'data': '...', 'run_id': 'alpaca_2020-10-09T14:46:50.490986', 'replace_microseconds': False}
   ```
   
   ### Workaround 2 - pass "replace_microseconds"
   
   Added ```data['replace_microseconds'] = False``` to Python code triggering the dag run. I also tried ```data['replace_microseconds'] = "false"```.
   
   I again confirmed the settings is present in dag_run["conf"].
   
   Neither of these workarounds resulted in the run_id being modified from it's default, so subsequent dag runs still have the format "manual__2020-10-09T15:28:17+00:00" and the error persists.
   
   **How to reproduce it**:
   
   Attempt to trigger multiple dag runs  within 1 second interval.
   
   **Anything else we need to know**:
   
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on issue #11377: "run id already exists" error when attempting to schedule multiple dag runs per second

Posted by GitBox <gi...@apache.org>.
ashb commented on issue #11377:
URL: https://github.com/apache/airflow/issues/11377#issuecomment-745233491


   I belive @PalashPaul-Ingka has it right -- the key addition being the `replace_microseconds` parameter.
   
   I'm going to close this, but if that doesn't work @djrut let us know and we can re-open this.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb closed issue #11377: "run id already exists" error when attempting to schedule multiple dag runs per second

Posted by GitBox <gi...@apache.org>.
ashb closed issue #11377:
URL: https://github.com/apache/airflow/issues/11377


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] PalashPaul-Ingka edited a comment on issue #11377: "run id already exists" error when attempting to schedule multiple dag runs per second

Posted by GitBox <gi...@apache.org>.
PalashPaul-Ingka edited a comment on issue #11377:
URL: https://github.com/apache/airflow/issues/11377#issuecomment-745232500


   @djrut please use the below code it working
   
   ```python
       client_id = os.getenv("CLIENT_ID")
       # This should be part of your webserver's URL:
       # {tenant-project-id}.appspot.com
       webserver_id = os.getenv("TENANT_PROJECT")
       # The name of the DAG you wish to trigger
       dag_name = os.getenv("DAG_NAME")
       webserver_url = (
           'https://'
           + webserver_id
           + '.appspot.com/api/experimental/dags/'
           + dag_name
           + '/dag_runs'
       )
       # Make a POST request to IAP which then Triggers the DAG
       run_id = datetime.utcnow().strftime('alpaca_%Y-%m-%dT%H:%M:%S.%f')
   
       conf = {"conf": data}
       print(f"JSON body = {conf}")
   
       make_iap_request(
           webserver_url, client_id, method='POST', json={"conf": data, "run_id": run_id, "replace_microseconds": False})
   ```


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] PalashPaul-Ingka commented on issue #11377: "run id already exists" error when attempting to schedule multiple dag runs per second

Posted by GitBox <gi...@apache.org>.
PalashPaul-Ingka commented on issue #11377:
URL: https://github.com/apache/airflow/issues/11377#issuecomment-745232500


   @djrut please use the below code it working
   
   client_id = os.getenv("CLIENT_ID")
       # This should be part of your webserver's URL:
       # {tenant-project-id}.appspot.com
       webserver_id = os.getenv("TENANT_PROJECT")
       # The name of the DAG you wish to trigger
       dag_name = os.getenv("DAG_NAME")
       webserver_url = (
           'https://'
           + webserver_id
           + '.appspot.com/api/experimental/dags/'
           + dag_name
           + '/dag_runs'
       )
       # Make a POST request to IAP which then Triggers the DAG
       run_id = datetime.utcnow().strftime('alpaca_%Y-%m-%dT%H:%M:%S.%f')
   
       conf = {"conf": data}
       print(f"JSON body = {conf}")
   
       make_iap_request(
           webserver_url, client_id, method='POST', json={"conf": data, **"run_id": run_id, "replace_microseconds": False**})


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] boring-cyborg[bot] commented on issue #11377: "run id already exists" error when attempting to schedule multiple dag runs per second

Posted by GitBox <gi...@apache.org>.
boring-cyborg[bot] commented on issue #11377:
URL: https://github.com/apache/airflow/issues/11377#issuecomment-706253094


   Thanks for opening your first issue here! Be sure to follow the issue template!
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] PalashPaul-Ingka edited a comment on issue #11377: "run id already exists" error when attempting to schedule multiple dag runs per second

Posted by GitBox <gi...@apache.org>.
PalashPaul-Ingka edited a comment on issue #11377:
URL: https://github.com/apache/airflow/issues/11377#issuecomment-745232500


   @djrut please use the below code it working
   
   client_id = os.getenv("CLIENT_ID")
       # This should be part of your webserver's URL:
       # {tenant-project-id}.appspot.com
       webserver_id = os.getenv("TENANT_PROJECT")
       # The name of the DAG you wish to trigger
       dag_name = os.getenv("DAG_NAME")
       webserver_url = (
           'https://'
           + webserver_id
           + '.appspot.com/api/experimental/dags/'
           + dag_name
           + '/dag_runs'
       )
       # Make a POST request to IAP which then Triggers the DAG
       run_id = datetime.utcnow().strftime('alpaca_%Y-%m-%dT%H:%M:%S.%f')
   
       conf = {"conf": data}
       print(f"JSON body = {conf}")
   
       make_iap_request(
           webserver_url, client_id, method='POST', json={"conf": data, "run_id": run_id, "replace_microseconds": False})


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org