You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2020/01/27 21:57:31 UTC

[GitHub] [airflow] saguziel opened a new pull request #7269: [AIRFLOW-6651][WIP] Add Redis Heartbeat option

saguziel opened a new pull request #7269: [AIRFLOW-6651][WIP] Add Redis Heartbeat option
URL: https://github.com/apache/airflow/pull/7269
 
 
   Adds the option to use Redis to store the heartbeat data. This will reduce load on the DB. All the guarantees a DB provides is not needed for the heartbeat and Redis is pretty simple to use and common.
   
   Unit tests WIP
   ---
   Issue link: WILL BE INSERTED BY [boring-cyborg](https://github.com/kaxil/boring-cyborg)
   
   Make sure to mark the boxes below before creating PR: [x]
   
   - [x] Description above provides context of the change
   - [x] Commit message/PR title starts with `[AIRFLOW-NNNN]`. AIRFLOW-NNNN = JIRA ID<sup>*</sup>
   - [ ] Unit tests coverage for changes (not needed for documentation changes)
   - [x] Commits follow "[How to write a good git commit message](http://chris.beams.io/posts/git-commit/)"
   - [ ] Relevant documentation is updated including usage instructions.
   - [ ] I will engage committers as explained in [Contribution Workflow Example](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#contribution-workflow-example).
   
   <sup>*</sup> For document-only changes commit message can start with `[AIRFLOW-XXXX]`.
   
   ---
   In case of fundamental code change, Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals)) is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in [UPDATING.md](https://github.com/apache/airflow/blob/master/UPDATING.md).
   Read the [Pull Request Guidelines](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#pull-request-guidelines) for more information.
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] ashb commented on issue #7269: [AIRFLOW-6651] Add Redis Heartbeat option

Posted by GitBox <gi...@apache.org>.
ashb commented on issue #7269: [AIRFLOW-6651] Add Redis Heartbeat option
URL: https://github.com/apache/airflow/pull/7269#issuecomment-590799186
 
 
   I can see this being useful so long as it does actually reduce DB load/active connections :)

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] kaxil commented on a change in pull request #7269: [AIRFLOW-6651] Add Redis Heartbeat option

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #7269: [AIRFLOW-6651] Add Redis Heartbeat option
URL: https://github.com/apache/airflow/pull/7269#discussion_r383516404
 
 

 ##########
 File path: airflow/jobs/base_job.py
 ##########
 @@ -21,6 +21,8 @@
 from time import sleep
 from typing import Optional
 
+from pendulum import utcfromtimestamp
+from redis import Redis
 
 Review comment:
   Will this mean a hard requirement on Redis for all ?
   
   Can we make it optional import

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] ashb commented on issue #7269: [AIRFLOW-6651] Add Redis Heartbeat option

Posted by GitBox <gi...@apache.org>.
ashb commented on issue #7269: [AIRFLOW-6651] Add Redis Heartbeat option
URL: https://github.com/apache/airflow/pull/7269#issuecomment-590800040
 
 
   A question: Have you looked to see if this feature is built in to celery? Is it possible we could rely on celery more for this?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] saguziel commented on issue #7269: [AIRFLOW-6651] Add Redis Heartbeat option

Posted by GitBox <gi...@apache.org>.
saguziel commented on issue #7269: [AIRFLOW-6651] Add Redis Heartbeat option
URL: https://github.com/apache/airflow/pull/7269#issuecomment-592382747
 
 
   DB load. One example is from like from 9% to 6% load.. but this is very dependent on the configs used. It would reduce writes by 30-40% on our main cluster, so would also free up I/O.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] saguziel commented on a change in pull request #7269: [AIRFLOW-6651] Add Redis Heartbeat option

Posted by GitBox <gi...@apache.org>.
saguziel commented on a change in pull request #7269: [AIRFLOW-6651] Add Redis Heartbeat option
URL: https://github.com/apache/airflow/pull/7269#discussion_r385961367
 
 

 ##########
 File path: airflow/jobs/base_job.py
 ##########
 @@ -308,3 +316,24 @@ def query(result, items):
             len(reset_tis), task_instance_str
         )
         return reset_tis
+
+    @provide_session
 
 Review comment:
   Actually, on second though this is fine. The session is always passed in from the heartbeat, and there is already one there.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] saguziel commented on issue #7269: [AIRFLOW-6651] Add Redis Heartbeat option

Posted by GitBox <gi...@apache.org>.
saguziel commented on issue #7269: [AIRFLOW-6651] Add Redis Heartbeat option
URL: https://github.com/apache/airflow/pull/7269#issuecomment-591116708
 
 
   Celery does not support this type of thing, and it would not be portable across executors. Celery's consistency model is really simple, which is why it gets really messy with the task_acks_late arg and the fair scheduler and whatnot.  I would say Celery is not designed to offer near-exactly-once type semantics which is why we have so many hacks on top of it.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] codecov-io commented on issue #7269: [AIRFLOW-6651] Add Redis Heartbeat option

Posted by GitBox <gi...@apache.org>.
codecov-io commented on issue #7269: [AIRFLOW-6651] Add Redis Heartbeat option
URL: https://github.com/apache/airflow/pull/7269#issuecomment-589899724
 
 
   # [Codecov](https://codecov.io/gh/apache/airflow/pull/7269?src=pr&el=h1) Report
   > Merging [#7269](https://codecov.io/gh/apache/airflow/pull/7269?src=pr&el=desc) into [master](https://codecov.io/gh/apache/airflow/commit/6cd37da0fd3f5a6966bd321db63123da4abc5a54?src=pr&el=desc) will **increase** coverage by `0.11%`.
   > The diff coverage is `91.3%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/airflow/pull/7269/graphs/tree.svg?width=650&token=WdLKlKHOAU&height=150&src=pr)](https://codecov.io/gh/apache/airflow/pull/7269?src=pr&el=tree)
   
   ```diff
   @@            Coverage Diff            @@
   ##           master   #7269      +/-   ##
   =========================================
   + Coverage   86.59%   86.7%   +0.11%     
   =========================================
     Files         882     887       +5     
     Lines       41600   42018     +418     
   =========================================
   + Hits        36022   36431     +409     
   - Misses       5578    5587       +9
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/airflow/pull/7269?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [airflow/jobs/scheduler\_job.py](https://codecov.io/gh/apache/airflow/pull/7269/diff?src=pr&el=tree#diff-YWlyZmxvdy9qb2JzL3NjaGVkdWxlcl9qb2IucHk=) | `89.81% <ø> (+0.58%)` | :arrow_up: |
   | [airflow/www/views.py](https://codecov.io/gh/apache/airflow/pull/7269/diff?src=pr&el=tree#diff-YWlyZmxvdy93d3cvdmlld3MucHk=) | `76.23% <100%> (+0.01%)` | :arrow_up: |
   | [airflow/jobs/local\_task\_job.py](https://codecov.io/gh/apache/airflow/pull/7269/diff?src=pr&el=tree#diff-YWlyZmxvdy9qb2JzL2xvY2FsX3Rhc2tfam9iLnB5) | `92.52% <100%> (+2.39%)` | :arrow_up: |
   | [airflow/utils/dag\_processing.py](https://codecov.io/gh/apache/airflow/pull/7269/diff?src=pr&el=tree#diff-YWlyZmxvdy91dGlscy9kYWdfcHJvY2Vzc2luZy5weQ==) | `87.81% <100%> (-0.31%)` | :arrow_down: |
   | [airflow/jobs/base\_job.py](https://codecov.io/gh/apache/airflow/pull/7269/diff?src=pr&el=tree#diff-YWlyZmxvdy9qb2JzL2Jhc2Vfam9iLnB5) | `89.02% <82.85%> (-2.59%)` | :arrow_down: |
   | [airflow/models/dagbag.py](https://codecov.io/gh/apache/airflow/pull/7269/diff?src=pr&el=tree#diff-YWlyZmxvdy9tb2RlbHMvZGFnYmFnLnB5) | `89.56% <0%> (-0.99%)` | :arrow_down: |
   | [airflow/ti\_deps/deps/trigger\_rule\_dep.py](https://codecov.io/gh/apache/airflow/pull/7269/diff?src=pr&el=tree#diff-YWlyZmxvdy90aV9kZXBzL2RlcHMvdHJpZ2dlcl9ydWxlX2RlcC5weQ==) | `90.9% <0%> (-0.35%)` | :arrow_down: |
   | [airflow/providers/amazon/aws/operators/ecs.py](https://codecov.io/gh/apache/airflow/pull/7269/diff?src=pr&el=tree#diff-YWlyZmxvdy9wcm92aWRlcnMvYW1hem9uL2F3cy9vcGVyYXRvcnMvZWNzLnB5) | `85.43% <0%> (-0.15%)` | :arrow_down: |
   | ... and [29 more](https://codecov.io/gh/apache/airflow/pull/7269/diff?src=pr&el=tree-more) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/airflow/pull/7269?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/airflow/pull/7269?src=pr&el=footer). Last update [6cd37da...d0bf4ee](https://codecov.io/gh/apache/airflow/pull/7269?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] saguziel commented on a change in pull request #7269: [AIRFLOW-6651] Add Redis Heartbeat option

Posted by GitBox <gi...@apache.org>.
saguziel commented on a change in pull request #7269: [AIRFLOW-6651] Add Redis Heartbeat option
URL: https://github.com/apache/airflow/pull/7269#discussion_r383526519
 
 

 ##########
 File path: airflow/jobs/base_job.py
 ##########
 @@ -102,7 +112,16 @@ def most_recent_job(cls, session=None) -> Optional['BaseJob']:
         :param session: Database session
         :rtype: BaseJob or None
         """
-        return session.query(cls).order_by(cls.latest_heartbeat.desc()).limit(1).first()
+        latest_job_sql = session.query(cls).order_by(cls._legacy_heartbeat.desc()).limit(1).first()
+        if conf.getboolean('heartbeat', 'redis_enabled'):
+            latest_id = cls.redis.zrange(cls.__name__, -1, -1)
 
 Review comment:
   This is only for reads. I'll add a clarifying comment but the gist is the first heartbeat doesn't have a good place to write it without a race condition since the .id is unavailable in __init__ and is only available after the session is flushed. So the legacy heartbeat should only be written at creation time. This is more or less the same as start_time

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] ashb commented on issue #7269: [AIRFLOW-6651] Add Redis Heartbeat option

Posted by GitBox <gi...@apache.org>.
ashb commented on issue #7269: [AIRFLOW-6651] Add Redis Heartbeat option
URL: https://github.com/apache/airflow/pull/7269#issuecomment-590563447
 
 
   How much db load does this actually save? This makes the code a lot more complex so...

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] saguziel commented on issue #7269: [AIRFLOW-6651] Add Redis Heartbeat option

Posted by GitBox <gi...@apache.org>.
saguziel commented on issue #7269: [AIRFLOW-6651] Add Redis Heartbeat option
URL: https://github.com/apache/airflow/pull/7269#issuecomment-592768775
 
 
   BTW, this code is complex, but the complexity is stuck inside a single family of classes, which means it should not pollute the rest of the code.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] ashb commented on a change in pull request #7269: [AIRFLOW-6651] Add Redis Heartbeat option

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #7269: [AIRFLOW-6651] Add Redis Heartbeat option
URL: https://github.com/apache/airflow/pull/7269#discussion_r383533275
 
 

 ##########
 File path: airflow/jobs/base_job.py
 ##########
 @@ -66,23 +76,23 @@ class BaseJob(Base, LoggingMixin):
         'polymorphic_identity': 'BaseJob'
     }
 
-    __table_args__ = (
-        Index('job_type_heart', job_type, latest_heartbeat),
-        Index('idx_job_state_heartbeat', state, latest_heartbeat),
-    )
-
     heartrate = conf.getfloat('scheduler', 'JOB_HEARTBEAT_SEC')
+    if conf.getboolean('heartbeat', 'redis_enabled'):
+        redis = Redis.from_url(conf.get('heartbeat', 'redis_url'))
+    else:
+        redis = None
 
     def __init__(
             self,
             executor=None,
             heartrate=None,
-            *args, **kwargs):
+            *args,
+            **kwargs):
         self.hostname = get_hostname()
         self.executor = executor or ExecutorLoader.get_default_executor()
         self.executor_class = executor.__class__.__name__
         self.start_date = timezone.utcnow()
-        self.latest_heartbeat = timezone.utcnow()
+        self._legacy_heartbeat = timezone.utcnow()
 
 Review comment:
   Let's not call it "legacy" just because a user hasn't enabled it. It's not legacy, and not going to be removed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] kaxil commented on a change in pull request #7269: [AIRFLOW-6651] Add Redis Heartbeat option

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #7269: [AIRFLOW-6651] Add Redis Heartbeat option
URL: https://github.com/apache/airflow/pull/7269#discussion_r383517536
 
 

 ##########
 File path: airflow/jobs/base_job.py
 ##########
 @@ -102,7 +112,16 @@ def most_recent_job(cls, session=None) -> Optional['BaseJob']:
         :param session: Database session
         :rtype: BaseJob or None
         """
-        return session.query(cls).order_by(cls.latest_heartbeat.desc()).limit(1).first()
+        latest_job_sql = session.query(cls).order_by(cls._legacy_heartbeat.desc()).limit(1).first()
+        if conf.getboolean('heartbeat', 'redis_enabled'):
+            latest_id = cls.redis.zrange(cls.__name__, -1, -1)
 
 Review comment:
   Will this mean that the heartbeat will be written at both places? Redis and the metadata db?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] stale[bot] commented on issue #7269: [AIRFLOW-6651] Add Redis Heartbeat option

Posted by GitBox <gi...@apache.org>.
stale[bot] commented on issue #7269: [AIRFLOW-6651] Add Redis Heartbeat option
URL: https://github.com/apache/airflow/pull/7269#issuecomment-613154209
 
 
   This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] KevinYang21 commented on issue #7269: [AIRFLOW-6651] Add Redis Heartbeat option

Posted by GitBox <gi...@apache.org>.
KevinYang21 commented on issue #7269: [AIRFLOW-6651] Add Redis Heartbeat option
URL: https://github.com/apache/airflow/pull/7269#issuecomment-590627923
 
 
   @ashb Good question. I don't have an exact performance A/B test result number for this change( we're still building that framework). But to give a bit more context on the motivation behind this change: for all the past DB incident we had, despite different variaty of the root causes( e.g. high CPU/ram usage caused by bad query or spiky query load), connections from heartbeats always worsen the issue and ended up bringing down the DB with connection flood. When reviewing the incidents, many of the root causes turned out to be not severe enough to bring down the DB by themselves, but caused heartbeats to be slow and heartbeats brought down the DB.
   
   We already have done a lot query optimizations, which is awesome. We believe with this change would give us a lot more headroom operating the metadata DB. And I would argue that the added complexity is quite affordable for the value.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] ashb commented on a change in pull request #7269: [AIRFLOW-6651] Add Redis Heartbeat option

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #7269: [AIRFLOW-6651] Add Redis Heartbeat option
URL: https://github.com/apache/airflow/pull/7269#discussion_r383534431
 
 

 ##########
 File path: airflow/jobs/base_job.py
 ##########
 @@ -308,3 +316,24 @@ def query(result, items):
             len(reset_tis), task_instance_str
         )
         return reset_tis
+
+    @provide_session
 
 Review comment:
   By using this decorators it still uses a connection.
   
   Using `with create_session` would be better

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] ashb edited a comment on issue #7269: [AIRFLOW-6651] Add Redis Heartbeat option

Posted by GitBox <gi...@apache.org>.
ashb edited a comment on issue #7269: [AIRFLOW-6651] Add Redis Heartbeat option
URL: https://github.com/apache/airflow/pull/7269#issuecomment-590563447
 
 
   How much db load does this actually save? This makes the code a lot more complex so... Can we see performance test numbers?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] ashb commented on a change in pull request #7269: [AIRFLOW-6651] Add Redis Heartbeat option

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #7269: [AIRFLOW-6651] Add Redis Heartbeat option
URL: https://github.com/apache/airflow/pull/7269#discussion_r383535134
 
 

 ##########
 File path: airflow/jobs/local_task_job.py
 ##########
 @@ -161,3 +166,55 @@ def heartbeat_callback(self, session=None):
                 ti.task.on_success_callback(context)
             self.task_runner.terminate()
             self.terminating = True
+
+    @classmethod
+    def get_zombie_running_tis(cls, limit_dttm, session=None):
+        def batch_get(acc, items):
+            # Redis has no batch functionality for zscore
+            # see https://github.com/antirez/redis/issues/2344
+            lua_script = '''
+                local res = {}
+                while #ARGV > 0 do
+                    res[#res+1] = redis.call('ZSCORE', KEYS[1], table.remove(ARGV, 1))
+                end
+                return res
+            '''
+            from airflow.jobs import LocalTaskJob
+            return acc + cls.redis.register_script(lua_script)(keys=[LocalTaskJob.__name__], args=items)
+
+        TI = airflow.models.TaskInstance
+
+        if conf.getboolean('heartbeat', 'redis_enabled'):
 
 Review comment:
   Get Boolean is a relatively expensive call, we should avoid calling this repeatedly

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services