You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by bo...@apache.org on 2017/02/19 08:16:21 UTC
[1/2] incubator-airflow git commit: [AIRFLOW-863] Example DAGs should
have recent start dates
Repository: incubator-airflow
Updated Branches:
refs/heads/v1-8-stable 310fb589a -> 70c79d6c9
[AIRFLOW-863] Example DAGs should have recent start dates
Avoid unnecessary backfills by having start dates
of
just a few days ago. Adds a utility function
airflow.utils.dates.days_ago().
Closes #2068 from jlowin/example-start-date
(cherry picked from commit bbfd43df4663547abda4ac6fdc3a6ed730a75b57)
Signed-off-by: Bolke de Bruin <bo...@xs4all.nl>
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/77e310bf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/77e310bf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/77e310bf
Branch: refs/heads/v1-8-stable
Commit: 77e310bf3410255a252e9ebf569540156b798b82
Parents: 310fb58
Author: Jeremiah Lowin <jl...@apache.org>
Authored: Sun Feb 12 15:37:56 2017 -0500
Committer: Bolke de Bruin <bo...@xs4all.nl>
Committed: Sun Feb 19 09:15:20 2017 +0100
----------------------------------------------------------------------
.../example_emr_job_flow_automatic_steps.py | 6 +--
.../example_emr_job_flow_manual_steps.py | 5 ++-
.../example_dags/example_qubole_operator.py | 6 +--
.../contrib/example_dags/example_twitter_dag.py | 5 ++-
airflow/example_dags/example_bash_operator.py | 9 ++--
airflow/example_dags/example_branch_operator.py | 8 ++--
.../example_branch_python_dop_operator_3.py | 5 +--
airflow/example_dags/example_http_operator.py | 7 ++-
airflow/example_dags/example_latest_only.py | 4 +-
.../example_latest_only_with_trigger.py | 4 +-
.../example_passing_params_via_test_command.py | 6 +--
airflow/example_dags/example_python_operator.py | 7 +--
.../example_short_circuit_operator.py | 7 ++-
airflow/example_dags/example_skip_dag.py | 9 ++--
airflow/example_dags/example_subdag_operator.py | 4 +-
airflow/example_dags/example_xcom.py | 9 ++--
airflow/example_dags/test_utils.py | 3 +-
airflow/example_dags/tutorial.py | 7 ++-
airflow/utils/dates.py | 13 ++++++
dags/test_dag.py | 2 +-
scripts/perf/dags/perf_dag_1.py | 7 ++-
scripts/perf/dags/perf_dag_2.py | 8 ++--
tests/utils/__init__.py | 16 +++++++
tests/utils/dates.py | 45 ++++++++++++++++++++
24 files changed, 132 insertions(+), 70 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/77e310bf/airflow/contrib/example_dags/example_emr_job_flow_automatic_steps.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/example_dags/example_emr_job_flow_automatic_steps.py b/airflow/contrib/example_dags/example_emr_job_flow_automatic_steps.py
index 18399c7..7f57ad1 100644
--- a/airflow/contrib/example_dags/example_emr_job_flow_automatic_steps.py
+++ b/airflow/contrib/example_dags/example_emr_job_flow_automatic_steps.py
@@ -12,8 +12,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from datetime import timedelta, datetime
-
+from datetime import timedelta
+import airflow
from airflow import DAG
from airflow.contrib.operators.emr_create_job_flow_operator import EmrCreateJobFlowOperator
from airflow.contrib.sensors.emr_job_flow_sensor import EmrJobFlowSensor
@@ -21,7 +21,7 @@ from airflow.contrib.sensors.emr_job_flow_sensor import EmrJobFlowSensor
DEFAULT_ARGS = {
'owner': 'airflow',
'depends_on_past': False,
- 'start_date': datetime(2016, 3, 13),
+ 'start_date': airflow.utils.dates.days_ago(2),
'email': ['airflow@airflow.com'],
'email_on_failure': False,
'email_on_retry': False
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/77e310bf/airflow/contrib/example_dags/example_emr_job_flow_manual_steps.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/example_dags/example_emr_job_flow_manual_steps.py b/airflow/contrib/example_dags/example_emr_job_flow_manual_steps.py
index b498d50..caa6943 100644
--- a/airflow/contrib/example_dags/example_emr_job_flow_manual_steps.py
+++ b/airflow/contrib/example_dags/example_emr_job_flow_manual_steps.py
@@ -12,8 +12,9 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from datetime import timedelta, datetime
+from datetime import timedelta
+import airflow
from airflow import DAG
from airflow.contrib.operators.emr_create_job_flow_operator import EmrCreateJobFlowOperator
from airflow.contrib.operators.emr_add_steps_operator import EmrAddStepsOperator
@@ -23,7 +24,7 @@ from airflow.contrib.operators.emr_terminate_job_flow_operator import EmrTermina
DEFAULT_ARGS = {
'owner': 'airflow',
'depends_on_past': False,
- 'start_date': datetime(2016, 3, 13),
+ 'start_date': airflow.utils.dates.days_ago(2),
'email': ['airflow@airflow.com'],
'email_on_failure': False,
'email_on_retry': False
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/77e310bf/airflow/contrib/example_dags/example_qubole_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/example_dags/example_qubole_operator.py b/airflow/contrib/example_dags/example_qubole_operator.py
index b482cf4..fce0175 100644
--- a/airflow/contrib/example_dags/example_qubole_operator.py
+++ b/airflow/contrib/example_dags/example_qubole_operator.py
@@ -16,17 +16,15 @@ from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator, BranchPythonOperator
from airflow.contrib.operators.qubole_operator import QuboleOperator
-from datetime import datetime, timedelta
import filecmp
import random
-seven_days_ago = datetime.combine(datetime.today() - timedelta(7),
- datetime.min.time())
+
default_args = {
'owner': 'airflow',
'depends_on_past': False,
- 'start_date': seven_days_ago,
+ 'start_date': airflow.utils.dates.days_ago(2)
'email': ['airflow@airflow.com'],
'email_on_failure': False,
'email_on_retry': False
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/77e310bf/airflow/contrib/example_dags/example_twitter_dag.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/example_dags/example_twitter_dag.py b/airflow/contrib/example_dags/example_twitter_dag.py
index d63b4e8..a25c8d0 100644
--- a/airflow/contrib/example_dags/example_twitter_dag.py
+++ b/airflow/contrib/example_dags/example_twitter_dag.py
@@ -22,11 +22,12 @@
# Load The Dependencies
# --------------------------------------------------------------------------------
+import airflow
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from airflow.operators.hive_operator import HiveOperator
-from datetime import datetime, date, timedelta
+from datetime import date, timedelta
# --------------------------------------------------------------------------------
# Create a few placeholder scripts. In practice these would be different python
@@ -57,7 +58,7 @@ def transfertodb():
default_args = {
'owner': 'Ekhtiar',
'depends_on_past': False,
- 'start_date': datetime(2016, 3, 13),
+ 'start_date': airflow.utils.dates.days_ago(5),
'email': ['airflow@airflow.com'],
'email_on_failure': False,
'email_on_retry': False,
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/77e310bf/airflow/example_dags/example_bash_operator.py
----------------------------------------------------------------------
diff --git a/airflow/example_dags/example_bash_operator.py b/airflow/example_dags/example_bash_operator.py
index 0d18bcf..6887fa9 100644
--- a/airflow/example_dags/example_bash_operator.py
+++ b/airflow/example_dags/example_bash_operator.py
@@ -11,17 +11,18 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
+
+import airflow
from builtins import range
from airflow.operators.bash_operator import BashOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.models import DAG
-from datetime import datetime, timedelta
+from datetime import timedelta
+
-seven_days_ago = datetime.combine(datetime.today() - timedelta(7),
- datetime.min.time())
args = {
'owner': 'airflow',
- 'start_date': seven_days_ago,
+ 'start_date': airflow.utils.dates.days_ago(2)
}
dag = DAG(
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/77e310bf/airflow/example_dags/example_branch_operator.py
----------------------------------------------------------------------
diff --git a/airflow/example_dags/example_branch_operator.py b/airflow/example_dags/example_branch_operator.py
index cc559d0..2b11d91 100644
--- a/airflow/example_dags/example_branch_operator.py
+++ b/airflow/example_dags/example_branch_operator.py
@@ -11,17 +11,17 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
+
+import airflow
from airflow.operators.python_operator import BranchPythonOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.models import DAG
-from datetime import datetime, timedelta
import random
-seven_days_ago = datetime.combine(datetime.today() - timedelta(7),
- datetime.min.time())
+
args = {
'owner': 'airflow',
- 'start_date': seven_days_ago,
+ 'start_date': airflow.utils.dates.days_ago(2)
}
dag = DAG(
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/77e310bf/airflow/example_dags/example_branch_python_dop_operator_3.py
----------------------------------------------------------------------
diff --git a/airflow/example_dags/example_branch_python_dop_operator_3.py b/airflow/example_dags/example_branch_python_dop_operator_3.py
index 1dd190e..6da7b68 100644
--- a/airflow/example_dags/example_branch_python_dop_operator_3.py
+++ b/airflow/example_dags/example_branch_python_dop_operator_3.py
@@ -13,16 +13,15 @@
# limitations under the License.
#
+import airflow
from airflow.operators.python_operator import BranchPythonOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.models import DAG
from datetime import datetime, timedelta
-two_days_ago = datetime.combine(datetime.today() - timedelta(2),
- datetime.min.time())
args = {
'owner': 'airflow',
- 'start_date': two_days_ago,
+ 'start_date': airflow.utils.dates.days_ago(2),
'depends_on_past': True,
}
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/77e310bf/airflow/example_dags/example_http_operator.py
----------------------------------------------------------------------
diff --git a/airflow/example_dags/example_http_operator.py b/airflow/example_dags/example_http_operator.py
index 18a67f5..0cc23b9 100644
--- a/airflow/example_dags/example_http_operator.py
+++ b/airflow/example_dags/example_http_operator.py
@@ -14,19 +14,18 @@
"""
### Example HTTP operator and sensor
"""
+import airflow
from airflow import DAG
from airflow.operators.http_operator import SimpleHttpOperator
from airflow.operators.sensors import HttpSensor
-from datetime import datetime, timedelta
+from datetime import timedelta
import json
-seven_days_ago = datetime.combine(datetime.today() - timedelta(7),
- datetime.min.time())
default_args = {
'owner': 'airflow',
'depends_on_past': False,
- 'start_date': seven_days_ago,
+ 'start_date': airflow.utils.dates.days_ago(2),
'email': ['airflow@airflow.com'],
'email_on_failure': False,
'email_on_retry': False,
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/77e310bf/airflow/example_dags/example_latest_only.py
----------------------------------------------------------------------
diff --git a/airflow/example_dags/example_latest_only.py b/airflow/example_dags/example_latest_only.py
index 9ce03b9..38ee900 100644
--- a/airflow/example_dags/example_latest_only.py
+++ b/airflow/example_dags/example_latest_only.py
@@ -16,16 +16,16 @@ Example of the LatestOnlyOperator
"""
import datetime as dt
+import airflow
from airflow.models import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.latest_only_operator import LatestOnlyOperator
from airflow.utils.trigger_rule import TriggerRule
-
dag = DAG(
dag_id='latest_only',
schedule_interval=dt.timedelta(hours=4),
- start_date=dt.datetime(2016, 9, 20),
+ start_date=airflow.utils.dates.days_ago(2),
)
latest_only = LatestOnlyOperator(task_id='latest_only', dag=dag)
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/77e310bf/airflow/example_dags/example_latest_only_with_trigger.py
----------------------------------------------------------------------
diff --git a/airflow/example_dags/example_latest_only_with_trigger.py b/airflow/example_dags/example_latest_only_with_trigger.py
index e3a88b7..f2afdcf 100644
--- a/airflow/example_dags/example_latest_only_with_trigger.py
+++ b/airflow/example_dags/example_latest_only_with_trigger.py
@@ -16,16 +16,16 @@ Example LatestOnlyOperator and TriggerRule interactions
"""
import datetime as dt
+import airflow
from airflow.models import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.latest_only_operator import LatestOnlyOperator
from airflow.utils.trigger_rule import TriggerRule
-
dag = DAG(
dag_id='latest_only_with_trigger',
schedule_interval=dt.timedelta(hours=4),
- start_date=dt.datetime(2016, 9, 20),
+ start_date=airflow.utils.dates.days_ago(2),
)
latest_only = LatestOnlyOperator(task_id='latest_only', dag=dag)
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/77e310bf/airflow/example_dags/example_passing_params_via_test_command.py
----------------------------------------------------------------------
diff --git a/airflow/example_dags/example_passing_params_via_test_command.py b/airflow/example_dags/example_passing_params_via_test_command.py
index e337f3b..448effb 100644
--- a/airflow/example_dags/example_passing_params_via_test_command.py
+++ b/airflow/example_dags/example_passing_params_via_test_command.py
@@ -13,15 +13,15 @@
# limitations under the License.
#
-from datetime import datetime, timedelta
-
+from datetime import timedelta
+import airflow
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
dag = DAG("example_passing_params_via_test_command",
default_args={"owner": "airflow",
- "start_date":datetime.now()},
+ "start_date": airflow.utils.dates.days_ago(1)},
schedule_interval='*/1 * * * *',
dagrun_timeout=timedelta(minutes=4)
)
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/77e310bf/airflow/example_dags/example_python_operator.py
----------------------------------------------------------------------
diff --git a/airflow/example_dags/example_python_operator.py b/airflow/example_dags/example_python_operator.py
index c5d7193..8108e1e 100644
--- a/airflow/example_dags/example_python_operator.py
+++ b/airflow/example_dags/example_python_operator.py
@@ -13,19 +13,16 @@
# limitations under the License.
from __future__ import print_function
from builtins import range
+import airflow
from airflow.operators.python_operator import PythonOperator
from airflow.models import DAG
-from datetime import datetime, timedelta
import time
from pprint import pprint
-seven_days_ago = datetime.combine(
- datetime.today() - timedelta(7), datetime.min.time())
-
args = {
'owner': 'airflow',
- 'start_date': seven_days_ago,
+ 'start_date': airflow.utils.dates.days_ago(2)
}
dag = DAG(
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/77e310bf/airflow/example_dags/example_short_circuit_operator.py
----------------------------------------------------------------------
diff --git a/airflow/example_dags/example_short_circuit_operator.py b/airflow/example_dags/example_short_circuit_operator.py
index 92efe99..c9812ac 100644
--- a/airflow/example_dags/example_short_circuit_operator.py
+++ b/airflow/example_dags/example_short_circuit_operator.py
@@ -11,17 +11,16 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
+import airflow
from airflow.operators.python_operator import ShortCircuitOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.models import DAG
import airflow.utils.helpers
-from datetime import datetime, timedelta
-seven_days_ago = datetime.combine(datetime.today() - timedelta(7),
- datetime.min.time())
+
args = {
'owner': 'airflow',
- 'start_date': seven_days_ago,
+ 'start_date': airflow.utils.dates.days_ago(2)
}
dag = DAG(dag_id='example_short_circuit_operator', default_args=args)
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/77e310bf/airflow/example_dags/example_skip_dag.py
----------------------------------------------------------------------
diff --git a/airflow/example_dags/example_skip_dag.py b/airflow/example_dags/example_skip_dag.py
index a38b126..b936020 100644
--- a/airflow/example_dags/example_skip_dag.py
+++ b/airflow/example_dags/example_skip_dag.py
@@ -12,16 +12,15 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+import airflow
from airflow.operators.dummy_operator import DummyOperator
from airflow.models import DAG
-from datetime import datetime, timedelta
from airflow.exceptions import AirflowSkipException
-seven_days_ago = datetime.combine(datetime.today() - timedelta(1),
- datetime.min.time())
+
args = {
'owner': 'airflow',
- 'start_date': seven_days_ago,
+ 'start_date': airflow.utils.dates.days_ago(2)
}
@@ -53,5 +52,3 @@ def create_test_pipeline(suffix, trigger_rule, dag):
create_test_pipeline('1', 'all_success', dag)
create_test_pipeline('2', 'one_success', dag)
-
-
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/77e310bf/airflow/example_dags/example_subdag_operator.py
----------------------------------------------------------------------
diff --git a/airflow/example_dags/example_subdag_operator.py b/airflow/example_dags/example_subdag_operator.py
index b872f43..0c11787 100644
--- a/airflow/example_dags/example_subdag_operator.py
+++ b/airflow/example_dags/example_subdag_operator.py
@@ -11,7 +11,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-from datetime import datetime
+import airflow
from airflow.models import DAG
from airflow.operators.dummy_operator import DummyOperator
@@ -24,7 +24,7 @@ DAG_NAME = 'example_subdag_operator'
args = {
'owner': 'airflow',
- 'start_date': datetime(2016, 1, 1),
+ 'start_date': airflow.utils.dates.days_ago(2),
}
dag = DAG(
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/77e310bf/airflow/example_dags/example_xcom.py
----------------------------------------------------------------------
diff --git a/airflow/example_dags/example_xcom.py b/airflow/example_dags/example_xcom.py
index 50728c3..b41421b 100644
--- a/airflow/example_dags/example_xcom.py
+++ b/airflow/example_dags/example_xcom.py
@@ -12,22 +12,18 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import print_function
+import airflow
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
-from datetime import datetime, timedelta
-seven_days_ago = datetime.combine(
- datetime.today() - timedelta(7),
- datetime.min.time())
args = {
'owner': 'airflow',
- 'start_date': seven_days_ago,
+ 'start_date': airflow.utils.dates.days_ago(2),
'provide_context': True
}
dag = DAG(
'example_xcom',
- start_date=datetime(2015, 1, 1),
schedule_interval="@once",
default_args=args)
@@ -60,6 +56,7 @@ def puller(**kwargs):
v1, v2 = ti.xcom_pull(key=None, task_ids=['push', 'push_by_returning'])
assert (v1, v2) == (value_1, value_2)
+
push1 = PythonOperator(
task_id='push', dag=dag, python_callable=push)
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/77e310bf/airflow/example_dags/test_utils.py
----------------------------------------------------------------------
diff --git a/airflow/example_dags/test_utils.py b/airflow/example_dags/test_utils.py
index 70391c3..0ed9bdb 100644
--- a/airflow/example_dags/test_utils.py
+++ b/airflow/example_dags/test_utils.py
@@ -12,6 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
"""Used for unit tests"""
+import airflow
from airflow.operators.bash_operator import BashOperator
from airflow.models import DAG
from datetime import datetime
@@ -25,5 +26,5 @@ task = BashOperator(
task_id='sleeps_forever',
dag=dag,
bash_command="sleep 10000000000",
- start_date=datetime(2016, 1, 1),
+ start_date=airflow.utils.dates.days_ago(2),
owner='airflow')
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/77e310bf/airflow/example_dags/tutorial.py
----------------------------------------------------------------------
diff --git a/airflow/example_dags/tutorial.py b/airflow/example_dags/tutorial.py
index c7b2e0f..6ede09a 100644
--- a/airflow/example_dags/tutorial.py
+++ b/airflow/example_dags/tutorial.py
@@ -17,19 +17,18 @@
Documentation that goes along with the Airflow tutorial located
[here](http://pythonhosted.org/airflow/tutorial.html)
"""
+import airflow
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
-from datetime import datetime, timedelta
+from datetime import timedelta
-seven_days_ago = datetime.combine(datetime.today() - timedelta(7),
- datetime.min.time())
# these args will get passed on to each operator
# you can override them on a per-task basis during operator initialization
default_args = {
'owner': 'airflow',
'depends_on_past': False,
- 'start_date': seven_days_ago,
+ 'start_date': airflow.utils.dates.days_ago(2),
'email': ['airflow@airflow.com'],
'email_on_failure': False,
'email_on_retry': False,
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/77e310bf/airflow/utils/dates.py
----------------------------------------------------------------------
diff --git a/airflow/utils/dates.py b/airflow/utils/dates.py
index 84fd791..f89b20c 100644
--- a/airflow/utils/dates.py
+++ b/airflow/utils/dates.py
@@ -212,3 +212,16 @@ def scale_time_units(time_seconds_arr, unit):
elif unit == 'days':
return list(map(lambda x: x*1.0/(24*60*60), time_seconds_arr))
return time_seconds_arr
+
+
+def days_ago(n, hour=0, minute=0, second=0, microsecond=0):
+ """
+ Get a datetime object representing `n` days ago. By default the time is
+ set to midnight.
+ """
+ today = datetime.today().replace(
+ hour=hour,
+ minute=minute,
+ second=second,
+ microsecond=microsecond)
+ return today - timedelta(days=n)
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/77e310bf/dags/test_dag.py
----------------------------------------------------------------------
diff --git a/dags/test_dag.py b/dags/test_dag.py
index a1cbb74..db0b648 100644
--- a/dags/test_dag.py
+++ b/dags/test_dag.py
@@ -24,7 +24,7 @@ DAG_NAME = 'test_dag_v1'
default_args = {
'owner': 'airflow',
'depends_on_past': True,
- 'start_date': START_DATE,
+ 'start_date': airflow.utils.dates.days_ago(2)
}
dag = DAG(DAG_NAME, schedule_interval='*/10 * * * *', default_args=default_args)
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/77e310bf/scripts/perf/dags/perf_dag_1.py
----------------------------------------------------------------------
diff --git a/scripts/perf/dags/perf_dag_1.py b/scripts/perf/dags/perf_dag_1.py
index d97c830..fe71303 100644
--- a/scripts/perf/dags/perf_dag_1.py
+++ b/scripts/perf/dags/perf_dag_1.py
@@ -11,15 +11,14 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
+import airflow
from airflow.operators.bash_operator import BashOperator
from airflow.models import DAG
-from datetime import datetime, timedelta
+from datetime import timedelta
-five_days_ago = datetime.combine(datetime.today() - timedelta(5),
- datetime.min.time())
args = {
'owner': 'airflow',
- 'start_date': five_days_ago,
+ 'start_date': airflow.utils.dates.days_ago(3),
}
dag = DAG(
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/77e310bf/scripts/perf/dags/perf_dag_2.py
----------------------------------------------------------------------
diff --git a/scripts/perf/dags/perf_dag_2.py b/scripts/perf/dags/perf_dag_2.py
index cccd547..16948d4 100644
--- a/scripts/perf/dags/perf_dag_2.py
+++ b/scripts/perf/dags/perf_dag_2.py
@@ -11,15 +11,15 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
+
+import airflow
from airflow.operators.bash_operator import BashOperator
from airflow.models import DAG
-from datetime import datetime, timedelta
+from datetime import timedelta
-five_days_ago = datetime.combine(datetime.today() - timedelta(5),
- datetime.min.time())
args = {
'owner': 'airflow',
- 'start_date': five_days_ago,
+ 'start_date': airflow.utils.dates.days_ago(3),
}
dag = DAG(
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/77e310bf/tests/utils/__init__.py
----------------------------------------------------------------------
diff --git a/tests/utils/__init__.py b/tests/utils/__init__.py
new file mode 100644
index 0000000..6b15998
--- /dev/null
+++ b/tests/utils/__init__.py
@@ -0,0 +1,16 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from .compression import *
+from .dates import *
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/77e310bf/tests/utils/dates.py
----------------------------------------------------------------------
diff --git a/tests/utils/dates.py b/tests/utils/dates.py
new file mode 100644
index 0000000..dc0c87e
--- /dev/null
+++ b/tests/utils/dates.py
@@ -0,0 +1,45 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from datetime import datetime, timedelta
+import unittest
+
+from airflow.utils import dates
+
+class Dates(unittest.TestCase):
+
+ def test_days_ago(self):
+ today = datetime.today()
+ today_midnight = datetime.fromordinal(today.date().toordinal())
+
+ self.assertTrue(dates.days_ago(0) == today_midnight)
+
+ self.assertTrue(
+ dates.days_ago(100) == today_midnight + timedelta(days=-100))
+
+ self.assertTrue(
+ dates.days_ago(0, hour=3) == today_midnight + timedelta(hours=3))
+ self.assertTrue(
+ dates.days_ago(0, minute=3)
+ == today_midnight + timedelta(minutes=3))
+ self.assertTrue(
+ dates.days_ago(0, second=3)
+ == today_midnight + timedelta(seconds=3))
+ self.assertTrue(
+ dates.days_ago(0, microsecond=3)
+ == today_midnight + timedelta(microseconds=3))
+
+
+if __name__ == '__main__':
+ unittest.main()
[2/2] incubator-airflow git commit: 1.8.0 does not have compression
tests
Posted by bo...@apache.org.
1.8.0 does not have compression tests
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/70c79d6c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/70c79d6c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/70c79d6c
Branch: refs/heads/v1-8-stable
Commit: 70c79d6c9d422e0260a2721319f72add8fdabb87
Parents: 77e310b
Author: Bolke de Bruin <bo...@xs4all.nl>
Authored: Sun Feb 19 09:15:58 2017 +0100
Committer: Bolke de Bruin <bo...@xs4all.nl>
Committed: Sun Feb 19 09:15:58 2017 +0100
----------------------------------------------------------------------
tests/utils/__init__.py | 1 -
1 file changed, 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/70c79d6c/tests/utils/__init__.py
----------------------------------------------------------------------
diff --git a/tests/utils/__init__.py b/tests/utils/__init__.py
index 6b15998..a0dd779 100644
--- a/tests/utils/__init__.py
+++ b/tests/utils/__init__.py
@@ -12,5 +12,4 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from .compression import *
from .dates import *