You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/11/04 20:10:12 UTC

[GitHub] [airflow] eitanme commented on pull request #27190: External task sensor fail fix

eitanme commented on PR #27190:
URL: https://github.com/apache/airflow/pull/27190#issuecomment-1304137163

   @eladkal I noticed that the following test failed for the "Tests / Sqlite Py3.7: API Always CLI Core Integration Other Providers WWW (pull_request)" pre-submit job but I'm pretty sure this test has nothing to do with my code. Does that seem right? Is there anything I should be doing? Or do these tests fail intermittently or something?
   
   Thanks for the help!
   
   ```
   =================================== FAILURES ===================================
   _________________ TestLocalTaskJob.test_heartbeat_failed_fast __________________
   
   self = <tests.jobs.test_local_task_job.TestLocalTaskJob object at 0x7f1378e24390>
   
       def test_heartbeat_failed_fast(self):
           """
           Test that task heartbeat will sleep when it fails fast
           """
           self.mock_base_job_sleep.side_effect = time.sleep
           dag_id = "test_heartbeat_failed_fast"
           task_id = "test_heartbeat_failed_fast_op"
           with create_session() as session:
       
               dag_id = "test_heartbeat_failed_fast"
               task_id = "test_heartbeat_failed_fast_op"
               dag = self.dagbag.get_dag(dag_id)
               task = dag.get_task(task_id)
       
               dr = dag.create_dagrun(
                   run_id="test_heartbeat_failed_fast_run",
                   state=State.RUNNING,
                   execution_date=DEFAULT_DATE,
                   start_date=DEFAULT_DATE,
                   session=session,
               )
       
               ti = dr.task_instances[0]
               ti.refresh_from_task(task)
               ti.state = State.QUEUED
               ti.hostname = get_hostname()
               ti.pid = 1
               session.commit()
       
               job = LocalTaskJob(task_instance=ti, executor=MockExecutor(do_update=False))
               job.heartrate = 2
               heartbeat_records = []
               job.heartbeat_callback = lambda session: heartbeat_records.append(job.latest_heartbeat)
               job._execute()
               assert len(heartbeat_records) > 2
               for i in range(1, len(heartbeat_records)):
                   time1 = heartbeat_records[i - 1]
                   time2 = heartbeat_records[i]
                   # Assert that difference small enough
                   delta = (time2 - time1).total_seconds()
   >               assert abs(delta - job.heartrate) < 0.8
   E               assert 1.0317020000000001 < 0.8
   E                +  where 1.0317020000000001 = abs((3.031702 - 2))
   E                +    where 2 = <airflow.jobs.local_task_job.LocalTaskJob object at 0x7f1378e35cd0>.heartrate
   
   tests/jobs/test_local_task_job.py:312: AssertionError
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org