You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2018/08/05 03:54:15 UTC
[GitHub] feng-tao closed pull request #3693: [AIRFLOW-2848] Ensure dag_id in
metadata "job" for LocalTaskJob
feng-tao closed pull request #3693: [AIRFLOW-2848] Ensure dag_id in metadata "job" for LocalTaskJob
URL: https://github.com/apache/incubator-airflow/pull/3693
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/airflow/jobs.py b/airflow/jobs.py
index 2f1dec0f08..cc26feee53 100644
--- a/airflow/jobs.py
+++ b/airflow/jobs.py
@@ -2576,6 +2576,7 @@ def __init__(
pool=None,
*args, **kwargs):
self.task_instance = task_instance
+ self.dag_id = task_instance.dag_id
self.ignore_all_deps = ignore_all_deps
self.ignore_depends_on_past = ignore_depends_on_past
self.ignore_task_deps = ignore_task_deps
diff --git a/tests/jobs.py b/tests/jobs.py
index c701214f1e..fd3a96a4d8 100644
--- a/tests/jobs.py
+++ b/tests/jobs.py
@@ -1046,6 +1046,39 @@ class LocalTaskJobTest(unittest.TestCase):
def setUp(self):
pass
+ def test_localtaskjob_essential_attr(self):
+ """
+ Check whether essential attributes
+ of LocalTaskJob can be assigned with
+ proper values without intervention
+ """
+ dag = DAG(
+ 'test_localtaskjob_essential_attr',
+ start_date=DEFAULT_DATE,
+ default_args={'owner': 'owner1'})
+
+ with dag:
+ op1 = DummyOperator(task_id='op1')
+
+ dag.clear()
+ dr = dag.create_dagrun(run_id="test",
+ state=State.SUCCESS,
+ execution_date=DEFAULT_DATE,
+ start_date=DEFAULT_DATE)
+ ti = dr.get_task_instance(task_id=op1.task_id)
+
+ job1 = LocalTaskJob(task_instance=ti,
+ ignore_ti_state=True,
+ executor=SequentialExecutor())
+
+ essential_attr = ["dag_id", "job_type", "start_date", "hostname"]
+
+ check_result_1 = [hasattr(job1, attr) for attr in essential_attr]
+ self.assertTrue(all(check_result_1))
+
+ check_result_2 = [getattr(job1, attr) is not None for attr in essential_attr]
+ self.assertTrue(all(check_result_2))
+
@patch('os.getpid')
def test_localtaskjob_heartbeat(self, mock_pid):
session = settings.Session()
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services