You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/07/12 05:21:49 UTC

[GitHub] [airflow] yangmingfu404 opened a new pull request, #24986: Update dag.py

yangmingfu404 opened a new pull request, #24986:
URL: https://github.com/apache/airflow/pull/24986

   update start bug
   
   <!--
   Thank you for contributing! Please make sure that your code changes
   are covered with tests. And in case of new features or big changes
   remember to adjust the documentation.
   
   Feel free to ping committers for the review!
   
   In case of an existing issue, reference it using one of the following:
   
   closes: #ISSUE
   related: #ISSUE
   
   How to write a good git commit message:
   http://chris.beams.io/posts/git-commit/
   -->
   
   ---
   **^ Add meaningful description above**
   
   Read the **[Pull Request Guidelines](https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst#pull-request-guidelines)** for more information.
   In case of fundamental code changes, an Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals)) is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in a newsfragment file, named `{pr_number}.significant.rst` or `{issue_number}.significant.rst`, in [newsfragments](https://github.com/apache/airflow/tree/main/newsfragments).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] boring-cyborg[bot] commented on pull request #24986: Update dag.py

Posted by GitBox <gi...@apache.org>.
boring-cyborg[bot] commented on PR #24986:
URL: https://github.com/apache/airflow/pull/24986#issuecomment-1181327311

   Congratulations on your first Pull Request and welcome to the Apache Airflow community! If you have any issues or are unsure about any anything please check our Contribution Guide (https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst)
   Here are some useful points:
   - Pay attention to the quality of your code (flake8, mypy and type annotations). Our [pre-commits]( https://github.com/apache/airflow/blob/main/STATIC_CODE_CHECKS.rst#prerequisites-for-pre-commit-hooks) will help you with that.
   - In case of a new feature add useful documentation (in docstrings or in `docs/` directory). Adding a new operator? Check this short [guide](https://github.com/apache/airflow/blob/main/docs/apache-airflow/howto/custom-operator.rst) Consider adding an example DAG that shows how users should use it.
   - Consider using [Breeze environment](https://github.com/apache/airflow/blob/main/BREEZE.rst) for testing locally, it's a heavy docker but it ships with a working Airflow and a lot of integrations.
   - Be patient and persistent. It might take some time to get a review or get the final approval from Committers.
   - Please follow [ASF Code of Conduct](https://www.apache.org/foundation/policies/conduct) for all communication including (but not limited to) comments on Pull Requests, Mailing list and Slack.
   - Be sure to read the [Airflow Coding style]( https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst#coding-style-and-best-practices).
   Apache Airflow is a community-driven project and together we are making it better 🚀.
   In case of doubts contact the developers at:
   Mailing List: dev@airflow.apache.org
   Slack: https://s.apache.org/airflow-slack
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] yangmingfu404 commented on pull request #24986: Update dag.py

Posted by GitBox <gi...@apache.org>.
yangmingfu404 commented on PR #24986:
URL: https://github.com/apache/airflow/pull/24986#issuecomment-1181533277

   When starting a project, an error is reported in the log


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on pull request #24986: Update dag.py

Posted by GitBox <gi...@apache.org>.
potiuk commented on PR #24986:
URL: https://github.com/apache/airflow/pull/24986#issuecomment-1181591413

   WHERE do you see errors, in what logs? when? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] yangmingfu404 commented on pull request #24986: Update dag.py

Posted by GitBox <gi...@apache.org>.
yangmingfu404 commented on PR #24986:
URL: https://github.com/apache/airflow/pull/24986#issuecomment-1181549486

   > 除非你做一些奇
   
   Not to self task_ Dict reassigns values. The configuration file has not been modified
   
   1、Install airflow in the following way
   2、Airflow launched with airflow standalone
   
   Usage scenario:
        Configure a DAG process, call the external server, if it is in a certain state, retry the current node, and fail the current node after a certain number of times
   
   # Airflow needs a home. `~/airflow` is the default, but you can put it
   # somewhere else if you prefer (optional)
   export AIRFLOW_HOME=~/airflow
   
   # Install Airflow using the constraints file
   AIRFLOW_VERSION=2.3.2
   PYTHON_VERSION="$(python --version | cut -d " " -f 2 | cut -d "." -f 1-2)"
   # For example: 3.7
   CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_VERSION}/constraints-${PYTHON_VERSION}.txt"
   # For example: https://raw.githubusercontent.com/apache/airflow/constraints-2.3.2/constraints-3.7.txt
   pip install "apache-airflow==${AIRFLOW_VERSION}" --constraint "${CONSTRAINT_URL}"
   
   # The Standalone command will initialise the database, make a user,
   # and start all components for you.
   airflow standalone
   
   # Visit localhost:8080 in the browser and use the admin account details
   # shown on the terminal to login.
   # Enable the example_bash_operator dag in the home page
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] yangmingfu404 commented on pull request #24986: Update dag.py

Posted by GitBox <gi...@apache.org>.
yangmingfu404 commented on PR #24986:
URL: https://github.com/apache/airflow/pull/24986#issuecomment-1181544394

   > 
   
   When I start a project, if there is no file in the DAG directory, task will not be reported_ Dict gets none, but self task_ When dict.keys() takes value, set self task_ Dict is used as a dictionary, and errors are frequently reported in the log。
   
   The files placed in my DAG directory are similar to this demo format
   
   `
   import random
   import time
   import json
   
   from datetime import datetime, timedelta
   from textwrap import dedent
   
   # The DAG object; we'll need this to instantiate a DAG
   from airflow import DAG
   
   # Operators; we need this to operate!
   from airflow.operators.bash import BashOperator
   from airflow.operators.python import PythonOperator, BranchPythonOperator
   from airflow.providers.http.operators.http import SimpleHttpOperator
   from airflow.providers.http.sensors.http import HttpSensor
   from airflow.utils.trigger_rule import TriggerRule
   from airflow.models import Variable
   
   default_args = {
       'owner': 'airflow',
       'depends_on_past': False,
       'email': ['zhanghe@haomo.ai'],
       'email_on_failure': False,
       'email_on_retry': False,
   }
   
   
   SYNC_TASK_STATE_RUNNING = 8
   SYNC_TASK_STATE_SUCCESS = 0
   SYNC_TASK_STATE_FAIL = 1
   # 2=失败,8=进行中,3=数据准备中, 4=任务排队中, 5=结果输出中, 9=未执行, 0=成功
   SYNC_TASK_STATE_LIST = [3, 4, 5, 8]
   
   
   def http_response_check(rps):
       if not rps.ok:
           return False
   
       rps = rps.json()
       if rps["code"] != 0:
           return False
   
       if not rps["data"]["task_id"]:
           return False
   
       return True
   
   
   def lock_response_check(rps):
       if not rps.ok:
           return False
   
       rps = rps.json()
       if rps["code"] != 0:
           return False
   
       return True
   
   
   def http_response_check_sync(rps):
       """检查任务是否进行中还是结束"""
       if not rps.ok:
           return False
   
       rps = rps.json()
       if rps["code"] != 0:
           return False
   
       if not rps["data"]["task_id"]:
           return False
   
       if int(rps["data"]["task_state"]) in SYNC_TASK_STATE_LIST:
           return False
   
       return True
   
   
   def http_response_filter(rps, execution_date=None, dag_next_task_ids=None):
       rps = rps.json()
       data = rps["data"]
       out = dict(task_id=data["task_id"], instance_id=data["instance_id"],
                  execution_date=data["execution_date"], dag_id=dag_id)
   
       if execution_date is not None:
           out["execution_date"] = execution_date
   
       if dag_next_task_ids is not None:
           out["dag_next_task_ids"] = dag_next_task_ids
       return json.dumps(out)
   
   
   def sync_http_response_filter(rps, next_task_type, business_type):
       rps = rps.json()
       data = rps["data"]
       out = dict(
           task_state=data["task_state"],
           parent_id=data["task_id"],
           instance_id=data["instance_id"],
           execution_date=data["execution_date"],
           task_type=next_task_type,
           business_type=business_type,
           dag_id=dag_id,
       )
       return json.dumps(out)
   
   
   def gen_instance_id():
       rand = random.randint(10000, 99999)
       return '{ts}{plac}{rand}'.format(
           ts=int(time.time() * 1000000),
           plac="000",
           rand=rand,
       )
   
   
   # 配置化调整并行worker数量
   task_worker_num_dict = {}
   task_worker_num_str = Variable.get('task_worker_num_dict')
   if task_worker_num_str:
       try:
           task_worker_num_dict = json.loads(task_worker_num_str)
       except:
           pass
   with DAG(
       dag_id=dag_id,
       default_args=default_args,
       description='通用流程',
       schedule_interval=timedelta(minutes=1),
       start_date=datetime(2022, 1, 1),
       # start_date=datetime.now(),
       tags=['lucas', 'common'],
       catchup=False,
       max_active_runs=task_worker_num_dict.get(dag_id, 1),
       # max_active_tasks=5,
   ) as dag:
   
       # 注意本方法无需修改:这个方法要放在本层中,获取执行task_id 后的 所有task_ids
       def next_task_ids(task_id):
           if not task_id:
               return []
           dags = dag.partial_subset(
               task_ids_or_regex=[task_id],  # [yangmingfu_test]
               include_downstream=True,  # True
               include_upstream=False,  # False
           )
           task_ids = [_task_id for _task_id in dags.task_dict if _task_id != task_id]
           return task_ids
   
       # 送标卡片任务:获取任务
       send_infer_lock = SimpleHttpOperator(
           task_id='send_infer_lock',
           method='POST',
           endpoint='/api/airflow/lock',
           data=json.dumps({'is_common_task': True, 'instance_id': gen_instance_id(),
                            "dag_id": dag_id, "execution_date": "{{execution_date}}"}),
           http_conn_id='haomo-airflow',
           headers=headers,
           log_response=True,
           response_check=lambda rps: http_response_check(rps),
           response_filter=lambda rps: http_response_filter(rps, dag_next_task_ids=next_task_ids(send_infer_lock.task_id)),
           dag=dag,
           retries=120,
           retry_delay=timedelta(minutes=2),
       )
   
       # 送标卡片任务:执行任务
       send_infer_execute = SimpleHttpOperator(
           task_id='send_infer_execute',
           method='POST',
           endpoint='/api/airflow/execute',
           data=send_infer_lock.output,
           http_conn_id='haomo-airflow',
           headers=headers,
           log_response=True,
           response_check=lambda rps: http_response_check(rps),
           response_filter=lambda rps: http_response_filter(rps),
           dag=dag,
       )
   
       # 送标卡片任务:同步结果
       send_infer_sync = SimpleHttpOperator(
           task_id='send_infer_sync',
           method='POST',
           endpoint='/api/airflow/sync',
           data=send_infer_execute.output,
           http_conn_id='haomo-airflow',
           headers=headers,
           log_response=True,
           response_check=lambda rps: http_response_check_sync(rps),
           response_filter=lambda rps: sync_http_response_filter(rps, next_task_type='', business_type=''),
           retries=720,
           retry_delay=timedelta(minutes=6),
           dag=dag,
       )
   
       # 扩帧任务
       send_infer_lock >> send_infer_execute >> send_infer_sync
   `


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] yangmingfu404 commented on pull request #24986: Update dag.py

Posted by GitBox <gi...@apache.org>.
yangmingfu404 commented on PR #24986:
URL: https://github.com/apache/airflow/pull/24986#issuecomment-1181567771

   > > Airflow通过官方文档安装启动,无需任何修改
   > 
   > 你需要解释你做了什么,你得到了什么。日志。屏幕截图,证明到底发生了什么,你拥有的 DAG 是什么。
   > 
   > 您可能在 DAG 中犯了一个可怕的错误。
   
   `with DAG(
       dag_id='common_infer_flow',
       default_args=default_args,
       description='推理流程',
       schedule_interval=timedelta(minutes=1),
       start_date=datetime(2022, 1, 1),
       # start_date=datetime.now(),
       tags=['lucas', 'common'],
       catchup=False,
       max_active_runs=task_worker_num_dict.get("common_infer_flow", 1),
       # max_active_tasks=5,
   ) as dag:
       # 送标卡片任务:获取任务
       send_infer_lock = SimpleHttpOperator(
           task_id='send_infer_lock',
           method='POST',
           endpoint='/api/airflow/lock',
           data=json.dumps({'business_type': business_type, 'task_type': 'common_infer', 'instance_id': gen_instance_id()}),
           http_conn_id='haomo-airflow',
           headers=headers,
           log_response=True,
           response_check=lambda rps: http_response_check(rps),
           response_filter=lambda rps: http_response_filter(rps),
           dag=dag,
           retries=120,
           retry_delay=timedelta(minutes=2),
       )
   
       # 送标卡片任务:执行任务
       send_infer_execute = SimpleHttpOperator(
           task_id='send_infer_execute',
           method='POST',
           endpoint='/api/airflow/execute',
           data=send_infer_lock.output,
           http_conn_id='haomo-airflow',
           headers=headers,
           log_response=True,
           response_check=lambda rps: http_response_check(rps),
           response_filter=lambda rps: http_response_filter(rps),
           dag=dag,
       )
   
       # 送标卡片任务:同步结果
       send_infer_sync = SimpleHttpOperator(
           task_id='send_infer_sync',
           method='POST',
           endpoint='/api/airflow/sync',
           data=send_infer_execute.output,
           http_conn_id='haomo-airflow',
           headers=headers,
           log_response=True,
           response_check=lambda rps: http_response_check_sync(rps),
           response_filter=lambda rps: sync_http_response_filter(rps, next_task_type='', business_type=business_type),
           retries=720,
           retry_delay=timedelta(minutes=6),
           dag=dag,
       )
   
       # 扩帧任务
       send_infer_lock >> send_infer_execute >> send_infer_sync
   `


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] yangmingfu404 commented on pull request #24986: Update dag.py

Posted by GitBox <gi...@apache.org>.
yangmingfu404 commented on PR #24986:
URL: https://github.com/apache/airflow/pull/24986#issuecomment-1181328875

   修复了airflow启动日志报错bug


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on pull request #24986: Update dag.py

Posted by GitBox <gi...@apache.org>.
potiuk commented on PR #24986:
URL: https://github.com/apache/airflow/pull/24986#issuecomment-1181394960

   Could you please add better English description - what problem does it solve?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk closed pull request #24986: Update dag.py

Posted by GitBox <gi...@apache.org>.
potiuk closed pull request #24986: Update dag.py
URL: https://github.com/apache/airflow/pull/24986


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] yangmingfu404 commented on pull request #24986: Update dag.py

Posted by GitBox <gi...@apache.org>.
yangmingfu404 commented on PR #24986:
URL: https://github.com/apache/airflow/pull/24986#issuecomment-1181571025

   > Merging is blocked
   No configuration file has been changed, but self task_ Error with Dict Of none
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on pull request #24986: Update dag.py

Posted by GitBox <gi...@apache.org>.
potiuk commented on PR #24986:
URL: https://github.com/apache/airflow/pull/24986#issuecomment-1181593913

   Stack trace, details etc. What exactly you did to get there. Please, Not in this PR. I will close it, If you can exactly reproduce the issue, open a new Issue - otherwise open a Discussion with all the details. If during the dicussion or issue we will find PR is needed you can recreate it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] yangmingfu404 commented on pull request #24986: Update dag.py

Posted by GitBox <gi...@apache.org>.
yangmingfu404 commented on PR #24986:
URL: https://github.com/apache/airflow/pull/24986#issuecomment-1181555552

   > Airflow is installed and started through official documents without any modification
   Through the following installation method
   
   If there is no DAG file in our DAG directory, the log will not report self task_ Dict is empty. If there is a DAG process, it will report self task_ Dict is not a dictionary error
   export AIRFLOW_HOME=~/airflow
   
   # Install Airflow using the constraints file
   AIRFLOW_VERSION=2.3.2
   PYTHON_VERSION="$(python --version | cut -d " " -f 2 | cut -d "." -f 1-2)"
   # For example: 3.7
   CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_VERSION}/constraints-${PYTHON_VERSION}.txt"
   # For example: https://raw.githubusercontent.com/apache/airflow/constraints-2.3.2/constraints-3.7.txt
   pip install "apache-airflow==${AIRFLOW_VERSION}" --constraint "${CONSTRAINT_URL}"
   
   # The Standalone command will initialise the database, make a user,
   # and start all components for you.
   airflow standalone


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on pull request #24986: Update dag.py

Posted by GitBox <gi...@apache.org>.
potiuk commented on PR #24986:
URL: https://github.com/apache/airflow/pull/24986#issuecomment-1181543836

   Nope. you do something wrong. self.task_dict  is set to empty dict in the constructor. it's impossible to not have it unless you do something strage. You have not explained what and how you install and run airflow, but almost for sure you do something very wrong.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on pull request #24986: Update dag.py

Posted by GitBox <gi...@apache.org>.
potiuk commented on PR #24986:
URL: https://github.com/apache/airflow/pull/24986#issuecomment-1181556576

   > Airflow is installed and started through official documents without any modification
   
   You need to explain what you do and what you get. Logs. screenshots, evidences what exactly happens, what is the DAG you have .
   
   You probably made a horrible mistake in the DAG you have.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] yangmingfu404 commented on pull request #24986: Update dag.py

Posted by GitBox <gi...@apache.org>.
yangmingfu404 commented on PR #24986:
URL: https://github.com/apache/airflow/pull/24986#issuecomment-1181552890

   
   
   Airflow is installed and started through official documents without any modification


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] yangmingfu404 commented on pull request #24986: Update dag.py

Posted by GitBox <gi...@apache.org>.
yangmingfu404 commented on PR #24986:
URL: https://github.com/apache/airflow/pull/24986#issuecomment-1181565945

   > 
   The following is a simple DAG process I configured
   
   `import random
   import time
   import json
   
   from datetime import datetime, timedelta
   from textwrap import dedent
   
   # The DAG object; we'll need this to instantiate a DAG
   from airflow import DAG
   
   # Operators; we need this to operate!
   from airflow.operators.dummy import DummyOperator
   from airflow.operators.bash import BashOperator
   from airflow.operators.python import PythonOperator, BranchPythonOperator
   from airflow.providers.http.operators.http import SimpleHttpOperator
   from airflow.providers.http.sensors.http import HttpSensor
   from airflow.utils.trigger_rule import TriggerRule
   from airflow.models import Variable
   
   default_args = {
       'owner': 'airflow',
       'depends_on_past': False,
       'email': ['zhanghe@haomo.ai'],
       'email_on_failure': False,
       'email_on_retry': False,
   }
   
   
   SYNC_TASK_STATE_RUNNING = 8
   SYNC_TASK_STATE_SUCCESS = 0
   SYNC_TASK_STATE_FAIL = 1
   # 2=失败,8=进行中,3=数据准备中, 4=任务排队中, 5=结果输出中, 9=未执行, 0=成功
   SYNC_TASK_STATE_LIST = [3, 4, 5, 8]
   
   
   def http_response_check(rps):
       if not rps.ok:
           return False
   
       rps = rps.json()
       if rps["code"] != 0:
           return False
   
       if not rps["data"]["task_id"]:
           return False
   
       return True
   
   
   def lock_response_check(rps):
       if not rps.ok:
           return False
   
       rps = rps.json()
       if rps["code"] != 0:
           return False
   
       return True
   
   
   def http_response_check_sync(rps):
       """检查任务是否进行中还是结束"""
       if not rps.ok:
           return False
   
       rps = rps.json()
       if rps["code"] != 0:
           return False
   
       if not rps["data"]["task_id"]:
           return False
   
       if int(rps["data"]["task_state"]) in SYNC_TASK_STATE_LIST:
           return False
   
       return True
   
   
   def http_response_filter(rps):
       rps = rps.json()
       data = rps["data"]
       out = dict(task_id=data["task_id"], instance_id=data["instance_id"])
       return json.dumps(out)
   
   
   def sync_http_response_filter(rps, next_task_type, business_type):
       rps = rps.json()
       data = rps["data"]
       out = dict(
           task_state=data["task_state"],
           parent_id=data["task_id"],
           instance_id=data["instance_id"],
           task_type=next_task_type,
           business_type=business_type,
       )
       return json.dumps(out)
   
   
   def gen_instance_id():
       rand = random.randint(10000, 99999)
       return '{ts}{plac}{rand}'.format(
           ts=int(time.time() * 1000000),
           plac="000",
           rand=rand,
       )
   
   
   # 配置化调整并行worker数量
   task_worker_num_dict = {}
   task_worker_num_str = Variable.get('task_worker_num_dict')
   if task_worker_num_str:
       try:
           task_worker_num_dict = json.loads(task_worker_num_str)
       except:
           pass
   
   with DAG(
       dag_id='common_infer_flow',
       default_args=default_args,
       description='推理流程',
       schedule_interval=timedelta(minutes=1),
       start_date=datetime(2022, 1, 1),
       # start_date=datetime.now(),
       tags=['lucas', 'common'],
       catchup=False,
       max_active_runs=task_worker_num_dict.get("common_infer_flow", 1),
       # max_active_tasks=5,
   ) as dag:
       # 送标卡片任务:获取任务
       send_infer_lock = SimpleHttpOperator(
           task_id='send_infer_lock',
           method='POST',
           endpoint='/api/airflow/lock',
           data=json.dumps({'business_type': business_type, 'task_type': 'common_infer', 'instance_id': gen_instance_id()}),
           http_conn_id='haomo-airflow',
           headers=headers,
           log_response=True,
           response_check=lambda rps: http_response_check(rps),
           response_filter=lambda rps: http_response_filter(rps),
           dag=dag,
           retries=120,
           retry_delay=timedelta(minutes=2),
       )
   
       # 送标卡片任务:执行任务
       send_infer_execute = SimpleHttpOperator(
           task_id='send_infer_execute',
           method='POST',
           endpoint='/api/airflow/execute',
           data=send_infer_lock.output,
           http_conn_id='haomo-airflow',
           headers=headers,
           log_response=True,
           response_check=lambda rps: http_response_check(rps),
           response_filter=lambda rps: http_response_filter(rps),
           dag=dag,
       )
   
       # 送标卡片任务:同步结果
       send_infer_sync = SimpleHttpOperator(
           task_id='send_infer_sync',
           method='POST',
           endpoint='/api/airflow/sync',
           data=send_infer_execute.output,
           http_conn_id='haomo-airflow',
           headers=headers,
           log_response=True,
           response_check=lambda rps: http_response_check_sync(rps),
           response_filter=lambda rps: sync_http_response_filter(rps, next_task_type='', business_type=business_type),
           retries=720,
           retry_delay=timedelta(minutes=6),
           dag=dag,
       )
   
       # 扩帧任务
       send_infer_lock >> send_infer_execute >> send_infer_sync
   `


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on pull request #24986: Update dag.py

Posted by GitBox <gi...@apache.org>.
potiuk commented on PR #24986:
URL: https://github.com/apache/airflow/pull/24986#issuecomment-1181539798

   > When starting a project, an error is reported in the log
   
   When? How? How to reproduce it ? What makes it happen this way. It does not work like that for others so yuo must have been doing something wrong. 
   
   Also any change we accept here must have corresponding unit tests, so you will have to figure out what causes it and how to get there and reproduce it in unit test (failing without this change), so you need to come with the scenario where it fails anyway.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] yangmingfu404 commented on pull request #24986: Update dag.py

Posted by GitBox <gi...@apache.org>.
yangmingfu404 commented on PR #24986:
URL: https://github.com/apache/airflow/pull/24986#issuecomment-1181540395

   When I start a project, if there is no file in the DAG directory, task will not be reported_ Dict gets none, but self task_ When dict.keys() takes value, set self task_ Dict is used as a dictionary, and errors are frequently reported in the log
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org