You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2018/08/05 03:54:15 UTC

[GitHub] feng-tao closed pull request #3693: [AIRFLOW-2848] Ensure dag_id in metadata "job" for LocalTaskJob

feng-tao closed pull request #3693: [AIRFLOW-2848] Ensure dag_id in metadata "job" for LocalTaskJob
URL: https://github.com/apache/incubator-airflow/pull/3693
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/airflow/jobs.py b/airflow/jobs.py
index 2f1dec0f08..cc26feee53 100644
--- a/airflow/jobs.py
+++ b/airflow/jobs.py
@@ -2576,6 +2576,7 @@ def __init__(
             pool=None,
             *args, **kwargs):
         self.task_instance = task_instance
+        self.dag_id = task_instance.dag_id
         self.ignore_all_deps = ignore_all_deps
         self.ignore_depends_on_past = ignore_depends_on_past
         self.ignore_task_deps = ignore_task_deps
diff --git a/tests/jobs.py b/tests/jobs.py
index c701214f1e..fd3a96a4d8 100644
--- a/tests/jobs.py
+++ b/tests/jobs.py
@@ -1046,6 +1046,39 @@ class LocalTaskJobTest(unittest.TestCase):
     def setUp(self):
         pass
 
+    def test_localtaskjob_essential_attr(self):
+        """
+        Check whether essential attributes
+        of LocalTaskJob can be assigned with
+        proper values without intervention
+        """
+        dag = DAG(
+            'test_localtaskjob_essential_attr',
+            start_date=DEFAULT_DATE,
+            default_args={'owner': 'owner1'})
+
+        with dag:
+            op1 = DummyOperator(task_id='op1')
+
+        dag.clear()
+        dr = dag.create_dagrun(run_id="test",
+                               state=State.SUCCESS,
+                               execution_date=DEFAULT_DATE,
+                               start_date=DEFAULT_DATE)
+        ti = dr.get_task_instance(task_id=op1.task_id)
+
+        job1 = LocalTaskJob(task_instance=ti,
+                            ignore_ti_state=True,
+                            executor=SequentialExecutor())
+
+        essential_attr = ["dag_id", "job_type", "start_date", "hostname"]
+
+        check_result_1 = [hasattr(job1, attr) for attr in essential_attr]
+        self.assertTrue(all(check_result_1))
+
+        check_result_2 = [getattr(job1, attr) is not None for attr in essential_attr]
+        self.assertTrue(all(check_result_2))
+
     @patch('os.getpid')
     def test_localtaskjob_heartbeat(self, mock_pid):
         session = settings.Session()


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services