You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2019/01/08 10:40:12 UTC

[GitHub] kaxil closed pull request #4320: [AIRFLOW-3515] Remove the run_duration option

kaxil closed pull request #4320: [AIRFLOW-3515] Remove the run_duration option
URL: https://github.com/apache/airflow/pull/4320
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/UPDATING.md b/UPDATING.md
index 5a5fce681e..7114dc99c7 100644
--- a/UPDATING.md
+++ b/UPDATING.md
@@ -24,6 +24,10 @@ assists users migrating to a new version.
 
 ## Airflow Master
 
+#### Remove run_duration
+
+We should not use the `run_duration` option anymore. This used to be for restarting the scheduler from time to time, but right now the scheduler is getting more stable and therefore using this setting is considered bad and might cause an inconsistent state.
+
 ### Modification to config file discovery
 
 If the `AIRFLOW_CONFIG` environment variable was not set and the
@@ -33,7 +37,7 @@ will discover its config file using the `$AIRFLOW_CONFIG` and `$AIRFLOW_HOME`
 environment variables rather than checking for the presence of a file.
 
 ### Modification to `ts_nodash` macro
-`ts_nodash` previously contained TimeZone information alongwith execution date. For Example: `20150101T000000+0000`. This is not user-friendly for file or folder names which was a popular use case for `ts_nodash`. Hence this behavior has been changed and using `ts_nodash` will no longer contain TimeZone information, restoring the pre-1.10 behavior of this macro. And a new macro `ts_nodash_with_tz` has been added which can be used to get a string with execution date and timezone info without dashes. 
+`ts_nodash` previously contained TimeZone information alongwith execution date. For Example: `20150101T000000+0000`. This is not user-friendly for file or folder names which was a popular use case for `ts_nodash`. Hence this behavior has been changed and using `ts_nodash` will no longer contain TimeZone information, restoring the pre-1.10 behavior of this macro. And a new macro `ts_nodash_with_tz` has been added which can be used to get a string with execution date and timezone info without dashes.
 
 Examples:
   * `ts_nodash`: `20150101T000000`
@@ -206,7 +210,7 @@ There are five roles created for Airflow by default: Admin, User, Op, Viewer, an
 - All ModelViews in Flask-AppBuilder follow a different pattern from Flask-Admin. The `/admin` part of the URL path will no longer exist. For example: `/admin/connection` becomes `/connection/list`, `/admin/connection/new` becomes `/connection/add`, `/admin/connection/edit` becomes `/connection/edit`, etc.
 - Due to security concerns, the new webserver will no longer support the features in the `Data Profiling` menu of old UI, including `Ad Hoc Query`, `Charts`, and `Known Events`.
 - HiveServer2Hook.get_results() always returns a list of tuples, even when a single column is queried, as per Python API 2.
-- **UTC is now the default timezone**: Either reconfigure your workflows scheduling in UTC or set `default_timezone` as explained in https://airflow.apache.org/timezone.html#default-time-zone 
+- **UTC is now the default timezone**: Either reconfigure your workflows scheduling in UTC or set `default_timezone` as explained in https://airflow.apache.org/timezone.html#default-time-zone
 
 ### airflow.contrib.sensors.hdfs_sensors renamed to airflow.contrib.sensors.hdfs_sensor
 
diff --git a/airflow/bin/cli.py b/airflow/bin/cli.py
index 877bf34e20..3f3bdc3ad0 100644
--- a/airflow/bin/cli.py
+++ b/airflow/bin/cli.py
@@ -970,7 +970,6 @@ def scheduler(args):
     job = jobs.SchedulerJob(
         dag_id=args.dag_id,
         subdir=process_subdir(args.subdir),
-        run_duration=args.run_duration,
         num_runs=args.num_runs,
         do_pickle=args.do_pickle)
 
@@ -1768,10 +1767,6 @@ class CLIFactory(object):
                  "stderr."),
         # scheduler
         'dag_id_opt': Arg(("-d", "--dag_id"), help="The id of the dag to run"),
-        'run_duration': Arg(
-            ("-r", "--run-duration"),
-            default=None, type=int,
-            help="Set number of seconds to execute before exiting"),
         'num_runs': Arg(
             ("-n", "--num_runs"),
             default=-1, type=int,
@@ -2057,7 +2052,7 @@ class CLIFactory(object):
         }, {
             'func': scheduler,
             'help': "Start a scheduler instance",
-            'args': ('dag_id_opt', 'subdir', 'run_duration', 'num_runs',
+            'args': ('dag_id_opt', 'subdir', 'num_runs',
                      'do_pickle', 'pid', 'daemon', 'stdout', 'stderr',
                      'log_file'),
         }, {
diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg
index b924c6ecb0..99c08908fc 100644
--- a/airflow/config_templates/default_airflow.cfg
+++ b/airflow/config_templates/default_airflow.cfg
@@ -456,10 +456,6 @@ job_heartbeat_sec = 5
 # how often the scheduler should run (in seconds).
 scheduler_heartbeat_sec = 5
 
-# after how much time should the scheduler terminate in seconds
-# -1 indicates to run continuously (see also num_runs)
-run_duration = -1
-
 # after how much time (seconds) a new DAGs should be picked up from the filesystem
 min_file_process_interval = 0
 
diff --git a/airflow/jobs.py b/airflow/jobs.py
index f71fa3cd63..8771405c48 100644
--- a/airflow/jobs.py
+++ b/airflow/jobs.py
@@ -545,7 +545,6 @@ def __init__(
             subdir=settings.DAGS_FOLDER,
             num_runs=-1,
             processor_poll_interval=1.0,
-            run_duration=None,
             do_pickle=False,
             log=None,
             *args, **kwargs):
@@ -563,8 +562,6 @@ def __init__(
         :param processor_poll_interval: The number of seconds to wait between
             polls of running processors
         :type processor_poll_interval: int
-        :param run_duration: how long to run (in seconds) before exiting
-        :type run_duration: int
         :param do_pickle: once a DAG object is obtained by executing the Python
             file, whether to serialize the DAG object to the DB
         :type do_pickle: bool
@@ -578,7 +575,6 @@ def __init__(
         self.subdir = subdir
 
         self.num_runs = num_runs
-        self.run_duration = run_duration
         self._processor_poll_interval = processor_poll_interval
 
         self.do_pickle = do_pickle
@@ -595,10 +591,6 @@ def __init__(
             self.using_sqlite = True
 
         self.max_tis_per_query = conf.getint('scheduler', 'max_tis_per_query')
-        if run_duration is None:
-            self.run_duration = conf.getint('scheduler',
-                                            'run_duration')
-
         self.processor_agent = None
         self._last_loop = False
 
@@ -1499,7 +1491,6 @@ def _execute(self):
                 (executors.LocalExecutor, executors.SequentialExecutor):
             pickle_dags = True
 
-        self.log.info("Running execute loop for %s seconds", self.run_duration)
         self.log.info("Processing each file at most %s times", self.num_runs)
 
         # Build up a list of Python files that could contain DAGs
@@ -1562,8 +1553,7 @@ def _execute_helper(self):
         last_self_heartbeat_time = timezone.utcnow()
 
         # For the execute duration, parse and schedule DAGs
-        while (timezone.utcnow() - execute_start_time).total_seconds() < \
-                self.run_duration or self.run_duration < 0:
+        while True:
             self.log.debug("Starting Loop...")
             loop_start_time = time.time()
 
diff --git a/scripts/ci/kubernetes/kube/templates/configmaps.template.yaml b/scripts/ci/kubernetes/kube/templates/configmaps.template.yaml
index 7761a6bcdc..4137140709 100644
--- a/scripts/ci/kubernetes/kube/templates/configmaps.template.yaml
+++ b/scripts/ci/kubernetes/kube/templates/configmaps.template.yaml
@@ -45,10 +45,6 @@ data:
     # how often the scheduler should run (in seconds).
     scheduler_heartbeat_sec = 5
 
-    # after how much time should the scheduler terminate in seconds
-    # -1 indicates to run continuously (see also num_runs)
-    run_duration = -1
-
     # after how much time a new DAGs should be picked up from the filesystem
     min_file_process_interval = 0
 
diff --git a/tests/test_jobs.py b/tests/test_jobs.py
index 75deba44e9..9094a580b5 100644
--- a/tests/test_jobs.py
+++ b/tests/test_jobs.py
@@ -1988,7 +1988,7 @@ def test_change_state_for_tis_without_dagrun(self):
         session.commit()
 
         dagbag = self._make_simple_dag_bag([dag1, dag2, dag3])
-        scheduler = SchedulerJob(num_runs=0, run_duration=0)
+        scheduler = SchedulerJob(num_runs=0)
         scheduler._change_state_for_tis_without_dagrun(
             simple_dag_bag=dagbag,
             old_states=[State.SCHEDULED, State.QUEUED],
@@ -2110,7 +2110,7 @@ def test_execute_helper_reset_orphaned_tasks(self):
 
         processor = mock.MagicMock()
 
-        scheduler = SchedulerJob(num_runs=0, run_duration=0)
+        scheduler = SchedulerJob(num_runs=0)
         executor = TestExecutor()
         scheduler.executor = executor
         scheduler.processor_agent = processor
@@ -3059,30 +3059,6 @@ def test_retry_handling_job(self):
         self.assertEqual(ti.try_number, 2)
         self.assertEqual(ti.state, State.UP_FOR_RETRY)
 
-    def test_scheduler_run_duration(self):
-        """
-        Verifies that the scheduler run duration limit is followed.
-        """
-        dag_id = 'test_start_date_scheduling'
-        dag = self.dagbag.get_dag(dag_id)
-        dag.clear()
-        self.assertTrue(dag.start_date > DEFAULT_DATE)
-
-        expected_run_duration = 5
-        start_time = timezone.utcnow()
-        scheduler = SchedulerJob(dag_id,
-                                 run_duration=expected_run_duration)
-        scheduler.run()
-        end_time = timezone.utcnow()
-
-        run_duration = (end_time - start_time).total_seconds()
-        logging.info("Test ran in %.2fs, expected %.2fs",
-                     run_duration,
-                     expected_run_duration)
-        # 5s to wait for child process to exit, 1s dummy sleep
-        # in scheduler loop to prevent excessive logs and 1s for last loop to finish.
-        self.assertLess(run_duration - expected_run_duration, 6.0)
-
     def test_dag_with_system_exit(self):
         """
         Test to check that a DAG with a system.exit() doesn't break the scheduler.


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services