You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by bo...@apache.org on 2017/02/10 13:17:36 UTC

incubator-airflow git commit: [AIRFLOW-856] Make sure execution date is set for local client

Repository: incubator-airflow
Updated Branches:
  refs/heads/master 51a3118a3 -> b7c828bf0


[AIRFLOW-856] Make sure execution date is set for local client

In the local api client the execution date was
hardi coded to None.
Secondly, when no execution date was specified the
execution date
was set to datetime.now(). Datetime.now() includes
the fractional seconds
that are supported in the database, but they are
not supported in
a.o. the current logging setup. Now we cut off
fractional seconds for
the execution date.

Closes #2064 from bolkedebruin/AIRFLOW-856


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/b7c828bf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/b7c828bf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/b7c828bf

Branch: refs/heads/master
Commit: b7c828bf094d3aa1eae310979a82addf7e423bb0
Parents: 51a3118
Author: Bolke de Bruin <bo...@xs4all.nl>
Authored: Fri Feb 10 14:17:26 2017 +0100
Committer: Bolke de Bruin <bo...@xs4all.nl>
Committed: Fri Feb 10 14:17:26 2017 +0100

----------------------------------------------------------------------
 airflow/api/client/local_client.py             |   2 +-
 airflow/api/common/experimental/trigger_dag.py |   9 +-
 tests/__init__.py                              |   1 +
 tests/api/__init__.py                          |  17 ++++
 tests/api/client/__init__.py                   |  13 +++
 tests/api/client/local_client.py               | 107 ++++++++++++++++++++
 6 files changed, 144 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b7c828bf/airflow/api/client/local_client.py
----------------------------------------------------------------------
diff --git a/airflow/api/client/local_client.py b/airflow/api/client/local_client.py
index a4d1f93..05f27f6 100644
--- a/airflow/api/client/local_client.py
+++ b/airflow/api/client/local_client.py
@@ -21,5 +21,5 @@ class Client(api_client.Client):
         dr = trigger_dag.trigger_dag(dag_id=dag_id,
                                      run_id=run_id,
                                      conf=conf,
-                                     execution_date=None)
+                                     execution_date=execution_date)
         return "Created {}".format(dr)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b7c828bf/airflow/api/common/experimental/trigger_dag.py
----------------------------------------------------------------------
diff --git a/airflow/api/common/experimental/trigger_dag.py b/airflow/api/common/experimental/trigger_dag.py
index 0905017..2c5a462 100644
--- a/airflow/api/common/experimental/trigger_dag.py
+++ b/airflow/api/common/experimental/trigger_dag.py
@@ -12,15 +12,13 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from datetime import datetime
+import datetime
 import json
 
 from airflow.exceptions import AirflowException
 from airflow.models import DagRun, DagBag
 from airflow.utils.state import State
 
-import logging
-
 
 def trigger_dag(dag_id, run_id=None, conf=None, execution_date=None):
     dagbag = DagBag()
@@ -31,7 +29,10 @@ def trigger_dag(dag_id, run_id=None, conf=None, execution_date=None):
     dag = dagbag.get_dag(dag_id)
 
     if not execution_date:
-        execution_date = datetime.now()
+        execution_date = datetime.datetime.now()
+
+    assert isinstance(execution_date, datetime.datetime)
+    execution_date = execution_date.replace(microsecond=0)
 
     if not run_id:
         run_id = "manual__{0}".format(execution_date.isoformat())

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b7c828bf/tests/__init__.py
----------------------------------------------------------------------
diff --git a/tests/__init__.py b/tests/__init__.py
index e1e8551..7ddf22d 100644
--- a/tests/__init__.py
+++ b/tests/__init__.py
@@ -14,6 +14,7 @@
 
 from __future__ import absolute_import
 
+from .api import *
 from .configuration import *
 from .contrib import *
 from .core import *

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b7c828bf/tests/api/__init__.py
----------------------------------------------------------------------
diff --git a/tests/api/__init__.py b/tests/api/__init__.py
new file mode 100644
index 0000000..2db97ad
--- /dev/null
+++ b/tests/api/__init__.py
@@ -0,0 +1,17 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from __future__ import absolute_import
+
+from .client import *

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b7c828bf/tests/api/client/__init__.py
----------------------------------------------------------------------
diff --git a/tests/api/client/__init__.py b/tests/api/client/__init__.py
new file mode 100644
index 0000000..9d7677a
--- /dev/null
+++ b/tests/api/client/__init__.py
@@ -0,0 +1,13 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b7c828bf/tests/api/client/local_client.py
----------------------------------------------------------------------
diff --git a/tests/api/client/local_client.py b/tests/api/client/local_client.py
new file mode 100644
index 0000000..a36b71f
--- /dev/null
+++ b/tests/api/client/local_client.py
@@ -0,0 +1,107 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import json
+import unittest
+import datetime
+
+from mock import patch
+
+from airflow import AirflowException
+from airflow import models
+
+from airflow.api.client.local_client import Client
+from airflow.utils.state import State
+
+EXECDATE = datetime.datetime.now()
+EXECDATE_NOFRACTIONS = EXECDATE.replace(microsecond=0)
+EXECDATE_ISO = EXECDATE_NOFRACTIONS.isoformat()
+
+real_datetime_class = datetime.datetime
+
+
+def mock_datetime_now(target, dt):
+    class DatetimeSubclassMeta(type):
+        @classmethod
+        def __instancecheck__(mcs, obj):
+            return isinstance(obj, real_datetime_class)
+
+    class BaseMockedDatetime(real_datetime_class):
+        @classmethod
+        def now(cls, tz=None):
+            return target.replace(tzinfo=tz)
+
+        @classmethod
+        def utcnow(cls):
+            return target
+
+    # Python2 & Python3 compatible metaclass
+    MockedDatetime = DatetimeSubclassMeta('datetime', (BaseMockedDatetime,), {})
+
+    return patch.object(dt, 'datetime', MockedDatetime)
+
+
+class TestLocalClient(unittest.TestCase):
+    def setUp(self):
+        self.client = Client(api_base_url=None, auth=None)
+
+    @patch.object(models.DAG, 'create_dagrun')
+    def test_trigger_dag(self, mock):
+        client = self.client
+
+        # non existent
+        with self.assertRaises(AirflowException):
+            client.trigger_dag(dag_id="blablabla")
+
+        import airflow.api.common.experimental.trigger_dag
+        with mock_datetime_now(EXECDATE, airflow.api.common.experimental.trigger_dag.datetime):
+            # no execution date, execution date should be set automatically
+            client.trigger_dag(dag_id="test_start_date_scheduling")
+            mock.assert_called_once_with(run_id="manual__{0}".format(EXECDATE_ISO),
+                                         execution_date=EXECDATE_NOFRACTIONS,
+                                         state=State.RUNNING,
+                                         conf=None,
+                                         external_trigger=True)
+            mock.reset_mock()
+
+            # execution date with microseconds cutoff
+            client.trigger_dag(dag_id="test_start_date_scheduling", execution_date=EXECDATE)
+            mock.assert_called_once_with(run_id="manual__{0}".format(EXECDATE_ISO),
+                                         execution_date=EXECDATE_NOFRACTIONS,
+                                         state=State.RUNNING,
+                                         conf=None,
+                                         external_trigger=True)
+            mock.reset_mock()
+
+            # run id
+            run_id = "my_run_id"
+            client.trigger_dag(dag_id="test_start_date_scheduling", run_id=run_id)
+            mock.assert_called_once_with(run_id=run_id,
+                                         execution_date=EXECDATE_NOFRACTIONS,
+                                         state=State.RUNNING,
+                                         conf=None,
+                                         external_trigger=True)
+            mock.reset_mock()
+
+            # test conf
+            conf = '{"name": "John"}'
+            client.trigger_dag(dag_id="test_start_date_scheduling", conf=conf)
+            mock.assert_called_once_with(run_id="manual__{0}".format(EXECDATE_ISO),
+                                         execution_date=EXECDATE_NOFRACTIONS,
+                                         state=State.RUNNING,
+                                         conf=json.loads(conf),
+                                         external_trigger=True)
+            mock.reset_mock()
+
+            # this is a unit test only, cannot verify existing dag run