You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/07/14 08:50:57 UTC

[GitHub] [airflow] ashb commented on a change in pull request #16953: Feature: Added Task Notes

ashb commented on a change in pull request #16953:
URL: https://github.com/apache/airflow/pull/16953#discussion_r669412887



##########
File path: airflow/models/task_note.py
##########
@@ -0,0 +1,219 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import logging
+from typing import Any, Iterable, Optional, Union
+
+import pendulum
+from sqlalchemy import Column, String, Text, and_
+from sqlalchemy.orm import Query, Session
+
+from airflow.models.base import COLLATION_ARGS, ID_LEN, Base
+from airflow.utils.helpers import is_container
+from airflow.utils.session import provide_session
+from airflow.utils.sqlalchemy import UtcDateTime
+
+log = logging.getLogger(__name__)
+
+
+class TaskNote(Base):
+    """Model that stores a note for a task id."""
+
+    __tablename__ = "task_notes"
+
+    task_id = Column(String(ID_LEN, **COLLATION_ARGS), primary_key=True)
+    dag_id = Column(String(ID_LEN, **COLLATION_ARGS), primary_key=True)
+    execution_date = Column(UtcDateTime, primary_key=True)
+    timestamp = Column(UtcDateTime, primary_key=True)
+    user_name = Column(String(ID_LEN))
+    task_note = Column(Text())
+
+    def __repr__(self):
+        return str(self.__key())
+
+    def __key(self):
+        return self.dag_id, self.task_id, self.execution_date, self.timestamp, self.user_name, self.task_note
+
+    def __hash__(self):
+        return hash(self.__key())
+
+    def __eq__(self, other):
+        if isinstance(other, TaskNote):
+            return self.__key() == other.__key()
+        return False

Review comment:
       Do we need these? Doesn't SQLA Base give us some of these already?

##########
File path: airflow/migrations/versions/e3a246e0dc1_current_schema.py
##########
@@ -220,6 +220,18 @@ def upgrade():
             sa.PrimaryKeyConstraint('id'),
         )
 
+    if 'task_notes' not in tables:
+        op.create_table(
+            'task_notes',
+            sa.Column('task_id', sa.String(length=250, **COLLATION_ARGS), nullable=False),
+            sa.Column('dag_id', sa.String(length=250, **COLLATION_ARGS), nullable=False),
+            sa.Column('execution_date', sa.DateTime(), nullable=False),
+            sa.Column('timestamp', sa.DateTime(), nullable=False),
+            sa.Column('user_name', sa.String(length=250), nullable=True),
+            sa.Column('task_note', sa.Text(), nullable=True),
+            sa.PrimaryKeyConstraint('task_id', 'dag_id', 'execution_date', 'timestamp'),
+        )
+

Review comment:
       You will need to create a new migration instead of editing the existing one -- when upgrading existing installs old migrations are not re-run.

##########
File path: airflow/models/task_note.py
##########
@@ -0,0 +1,219 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import logging
+from typing import Any, Iterable, Optional, Union
+
+import pendulum
+from sqlalchemy import Column, String, Text, and_
+from sqlalchemy.orm import Query, Session
+
+from airflow.models.base import COLLATION_ARGS, ID_LEN, Base
+from airflow.utils.helpers import is_container
+from airflow.utils.session import provide_session
+from airflow.utils.sqlalchemy import UtcDateTime
+
+log = logging.getLogger(__name__)
+
+
+class TaskNote(Base):
+    """Model that stores a note for a task id."""
+
+    __tablename__ = "task_notes"
+
+    task_id = Column(String(ID_LEN, **COLLATION_ARGS), primary_key=True)
+    dag_id = Column(String(ID_LEN, **COLLATION_ARGS), primary_key=True)
+    execution_date = Column(UtcDateTime, primary_key=True)
+    timestamp = Column(UtcDateTime, primary_key=True)
+    user_name = Column(String(ID_LEN))
+    task_note = Column(Text())
+
+    def __repr__(self):
+        return str(self.__key())
+
+    def __key(self):
+        return self.dag_id, self.task_id, self.execution_date, self.timestamp, self.user_name, self.task_note
+
+    def __hash__(self):
+        return hash(self.__key())
+
+    def __eq__(self, other):
+        if isinstance(other, TaskNote):
+            return self.__key() == other.__key()
+        return False
+
+    @classmethod
+    @provide_session
+    def set(cls, task_note, timestamp, user_name, execution_date, task_id, dag_id, session=None):
+        """
+        Store a TaskNote
+        :return: None
+        """
+        session.expunge_all()
+
+        # remove any duplicate TaskNote
+        session.query(cls).filter(
+            cls.execution_date == execution_date,
+            cls.user_name == user_name,
+            cls.task_id == task_id,
+            cls.dag_id == dag_id,
+            cls.timestamp == timestamp,
+        ).delete()
+
+        # insert new TaskNote
+        session.add(
+            TaskNote(
+                timestamp=timestamp,
+                task_note=task_note,
+                user_name=user_name,
+                execution_date=execution_date,
+                task_id=task_id,
+                dag_id=dag_id,
+            )
+        )
+
+        session.commit()
+
+    @classmethod
+    @provide_session
+    def delete(cls, notes, session=None):
+        """Delete TaskNote"""
+        if isinstance(notes, TaskNote):
+            notes = [notes]
+        for note in notes:
+            if not isinstance(note, TaskNote):
+                raise TypeError(f'Expected TaskNote; received {note.__class__.__name__}')
+            session.delete(note)
+        session.commit()
+
+    @classmethod
+    @provide_session
+    def get_one(
+        cls,
+        execution_date: pendulum.DateTime,
+        timestamp: pendulum.DateTime,
+        user_names: Optional[Union[str, Iterable[str]]] = None,
+        task_ids: Optional[Union[str, Iterable[str]]] = None,
+        dag_ids: Optional[Union[str, Iterable[str]]] = None,
+        session: Session = None,
+    ) -> Optional[Any]:
+        """
+        Retrieve a TaskNote value, optionally meeting certain criteria. Returns None
+        of there are no results.
+
+        :param execution_date: Execution date for the task
+        :type execution_date: pendulum.datetime
+        :param task_ids: Only TaskNotes from task with matching id will be
+            pulled. Can pass None to remove the filter.
+        :type task_ids: str
+        :param dag_ids: If provided, only pulls TaskNote from this DAG.
+            If None (default), the DAG of the calling task is used.
+        :type dag_ids str or iterable of strings (representing dag ids)
+        :param timestamp: Timestamp of the TaskNote creation
+        :type timestamp: pendulum.datetime
+        :param user_names: If provided, only pulls TaskNote from these users
+        :type user_names: str or iterable of strings (representing usernames)
+        :type dag_ids: str
+        :param session: database session
+        :type session: sqlalchemy.orm.session.Session
+        """
+        return cls.get_many(
+            execution_date=execution_date,
+            timestamp=timestamp,
+            user_names=user_names,
+            task_ids=task_ids,
+            dag_ids=dag_ids,
+            session=session,
+            limit=1,

Review comment:
       ```suggestion
   ```
   
   `first()` does `limit 1` for us

##########
File path: airflow/www/views.py
##########
@@ -1354,6 +1356,82 @@ def xcom(self, session=None):
             title=title,
         )
 
+    @expose('/task_note', methods=['POST', 'GET'])
+    @auth.has_access(
+        [
+            (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG),
+            (permissions.ACTION_CAN_READ, permissions.RESOURCE_TASK_INSTANCE),
+            (permissions.ACTION_CAN_READ, permissions.RESOURCE_TASK_NOTE),
+        ]
+    )
+    @action_logging
+    @provide_session
+    def task_note(self, session=None):
+        """Retrieve and store task notes"""
+        dag_id = request.args.get('dag_id')
+        task_id = request.args.get('task_id')
+        execution_date = request.args.get('execution_date')
+        dttm = timezone.parse(execution_date)
+        form = DateTimeForm(data={'execution_date': dttm})
+        root = request.args.get('root', '')
+        dm_db = models.DagModel
+        ti_db = models.TaskInstance
+        request_note = request.values.get('note')
+        default_user = "Anonymous"
+        title = "Notes"
+
+        dag = session.query(dm_db).filter(dm_db.dag_id == dag_id).first()
+
+        ti = (
+            session.query(ti_db)
+            .filter(and_(ti_db.dag_id == dag_id, ti_db.task_id == task_id, ti_db.execution_date == dttm))
+            .first()
+        )
+
+        if not ti:
+            flash(f"Task [{dag_id}.{task_id}.{execution_date}] doesn't seem to exist at the moment", "error")
+            return redirect(url_for('Airflow.index'))
+
+        can_add_note = current_app.appbuilder.sm.has_access(
+            permissions.ACTION_CAN_CREATE, permissions.RESOURCE_TASK_NOTE
+        )
+
+        if request.method == 'GET':
+            notes = (
+                TaskNote.get_many(dag_ids=dag_id, task_ids=task_id, execution_date=dttm)
+                .order_by(TaskNote.timestamp.asc())
+                .all()
+            )
+
+            attributes = [(note.task_note, note.timestamp, note.user_name) for note in notes]
+
+            return self.render_template(
+                'airflow/task_notes.html',
+                attributes=attributes,
+                task_id=task_id,
+                execution_date=execution_date,
+                form=form,
+                can_add_note=can_add_note,
+                root=root,
+                dag=dag,
+                title=title,
+            )
+
+        if request.method == 'POST':
+            if not can_add_note:
+                flash("Current user cannot add notes")
+            else:
+                TaskNote.set(

Review comment:
       If you just submit an empty form (i.e. no note) it would still be written to the DB -- you should add some validation of the input.

##########
File path: airflow/www/views.py
##########
@@ -1354,6 +1356,82 @@ def xcom(self, session=None):
             title=title,
         )
 
+    @expose('/task_note', methods=['POST', 'GET'])

Review comment:
       If you are creating a TaskNoteModelView then  we shouldn't have this method here -- but use the model view. Having both is wrong.

##########
File path: airflow/www/views.py
##########
@@ -1354,6 +1356,82 @@ def xcom(self, session=None):
             title=title,
         )
 
+    @expose('/task_note', methods=['POST', 'GET'])
+    @auth.has_access(
+        [
+            (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG),
+            (permissions.ACTION_CAN_READ, permissions.RESOURCE_TASK_INSTANCE),
+            (permissions.ACTION_CAN_READ, permissions.RESOURCE_TASK_NOTE),
+        ]
+    )
+    @action_logging
+    @provide_session
+    def task_note(self, session=None):
+        """Retrieve and store task notes"""
+        dag_id = request.args.get('dag_id')
+        task_id = request.args.get('task_id')
+        execution_date = request.args.get('execution_date')
+        dttm = timezone.parse(execution_date)
+        form = DateTimeForm(data={'execution_date': dttm})
+        root = request.args.get('root', '')
+        dm_db = models.DagModel
+        ti_db = models.TaskInstance
+        request_note = request.values.get('note')
+        default_user = "Anonymous"
+        title = "Notes"
+
+        dag = session.query(dm_db).filter(dm_db.dag_id == dag_id).first()
+
+        ti = (
+            session.query(ti_db)
+            .filter(and_(ti_db.dag_id == dag_id, ti_db.task_id == task_id, ti_db.execution_date == dttm))
+            .first()
+        )
+
+        if not ti:
+            flash(f"Task [{dag_id}.{task_id}.{execution_date}] doesn't seem to exist at the moment", "error")
+            return redirect(url_for('Airflow.index'))
+
+        can_add_note = current_app.appbuilder.sm.has_access(
+            permissions.ACTION_CAN_CREATE, permissions.RESOURCE_TASK_NOTE
+        )

Review comment:
       Make post handler a separate method, and then let the permissions decorator handle permissions for us.

##########
File path: airflow/www/views.py
##########
@@ -1354,6 +1356,82 @@ def xcom(self, session=None):
             title=title,
         )
 
+    @expose('/task_note', methods=['POST', 'GET'])
+    @auth.has_access(
+        [
+            (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG),
+            (permissions.ACTION_CAN_READ, permissions.RESOURCE_TASK_INSTANCE),
+            (permissions.ACTION_CAN_READ, permissions.RESOURCE_TASK_NOTE),
+        ]
+    )
+    @action_logging
+    @provide_session
+    def task_note(self, session=None):
+        """Retrieve and store task notes"""
+        dag_id = request.args.get('dag_id')
+        task_id = request.args.get('task_id')
+        execution_date = request.args.get('execution_date')
+        dttm = timezone.parse(execution_date)
+        form = DateTimeForm(data={'execution_date': dttm})
+        root = request.args.get('root', '')
+        dm_db = models.DagModel
+        ti_db = models.TaskInstance
+        request_note = request.values.get('note')
+        default_user = "Anonymous"
+        title = "Notes"
+
+        dag = session.query(dm_db).filter(dm_db.dag_id == dag_id).first()

Review comment:
       Is this ever used?

##########
File path: airflow/models/task_note.py
##########
@@ -0,0 +1,219 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import logging
+from typing import Any, Iterable, Optional, Union
+
+import pendulum
+from sqlalchemy import Column, String, Text, and_
+from sqlalchemy.orm import Query, Session
+
+from airflow.models.base import COLLATION_ARGS, ID_LEN, Base
+from airflow.utils.helpers import is_container
+from airflow.utils.session import provide_session
+from airflow.utils.sqlalchemy import UtcDateTime
+
+log = logging.getLogger(__name__)
+
+
+class TaskNote(Base):
+    """Model that stores a note for a task id."""
+
+    __tablename__ = "task_notes"
+
+    task_id = Column(String(ID_LEN, **COLLATION_ARGS), primary_key=True)
+    dag_id = Column(String(ID_LEN, **COLLATION_ARGS), primary_key=True)
+    execution_date = Column(UtcDateTime, primary_key=True)
+    timestamp = Column(UtcDateTime, primary_key=True)
+    user_name = Column(String(ID_LEN))
+    task_note = Column(Text())
+
+    def __repr__(self):
+        return str(self.__key())
+
+    def __key(self):
+        return self.dag_id, self.task_id, self.execution_date, self.timestamp, self.user_name, self.task_note
+
+    def __hash__(self):
+        return hash(self.__key())
+
+    def __eq__(self, other):
+        if isinstance(other, TaskNote):
+            return self.__key() == other.__key()
+        return False
+
+    @classmethod
+    @provide_session
+    def set(cls, task_note, timestamp, user_name, execution_date, task_id, dag_id, session=None):
+        """
+        Store a TaskNote
+        :return: None
+        """
+        session.expunge_all()

Review comment:
       ```suggestion
   ```

##########
File path: airflow/www/views.py
##########
@@ -1354,6 +1356,82 @@ def xcom(self, session=None):
             title=title,
         )
 
+    @expose('/task_note', methods=['POST', 'GET'])
+    @auth.has_access(
+        [
+            (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG),
+            (permissions.ACTION_CAN_READ, permissions.RESOURCE_TASK_INSTANCE),
+            (permissions.ACTION_CAN_READ, permissions.RESOURCE_TASK_NOTE),
+        ]
+    )
+    @action_logging
+    @provide_session
+    def task_note(self, session=None):
+        """Retrieve and store task notes"""
+        dag_id = request.args.get('dag_id')
+        task_id = request.args.get('task_id')
+        execution_date = request.args.get('execution_date')
+        dttm = timezone.parse(execution_date)
+        form = DateTimeForm(data={'execution_date': dttm})
+        root = request.args.get('root', '')
+        dm_db = models.DagModel
+        ti_db = models.TaskInstance
+        request_note = request.values.get('note')
+        default_user = "Anonymous"
+        title = "Notes"
+
+        dag = session.query(dm_db).filter(dm_db.dag_id == dag_id).first()
+
+        ti = (
+            session.query(ti_db)
+            .filter(and_(ti_db.dag_id == dag_id, ti_db.task_id == task_id, ti_db.execution_date == dttm))
+            .first()
+        )
+
+        if not ti:
+            flash(f"Task [{dag_id}.{task_id}.{execution_date}] doesn't seem to exist at the moment", "error")
+            return redirect(url_for('Airflow.index'))
+
+        can_add_note = current_app.appbuilder.sm.has_access(
+            permissions.ACTION_CAN_CREATE, permissions.RESOURCE_TASK_NOTE
+        )
+
+        if request.method == 'GET':
+            notes = (
+                TaskNote.get_many(dag_ids=dag_id, task_ids=task_id, execution_date=dttm)
+                .order_by(TaskNote.timestamp.asc())
+                .all()
+            )
+
+            attributes = [(note.task_note, note.timestamp, note.user_name) for note in notes]
+
+            return self.render_template(
+                'airflow/task_notes.html',
+                attributes=attributes,
+                task_id=task_id,
+                execution_date=execution_date,
+                form=form,
+                can_add_note=can_add_note,
+                root=root,
+                dag=dag,
+                title=title,
+            )
+
+        if request.method == 'POST':
+            if not can_add_note:
+                flash("Current user cannot add notes")
+            else:
+                TaskNote.set(
+                    timestamp=pendulum.now(),
+                    task_note=request_note,
+                    user_name=str(g.user) if g.user else default_user,

Review comment:
       This shouldn't be necessary -- there's always got to be a user here right?

##########
File path: airflow/www/views.py
##########
@@ -1354,6 +1356,82 @@ def xcom(self, session=None):
             title=title,
         )
 
+    @expose('/task_note', methods=['POST', 'GET'])
+    @auth.has_access(
+        [
+            (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG),
+            (permissions.ACTION_CAN_READ, permissions.RESOURCE_TASK_INSTANCE),
+            (permissions.ACTION_CAN_READ, permissions.RESOURCE_TASK_NOTE),
+        ]
+    )
+    @action_logging
+    @provide_session
+    def task_note(self, session=None):
+        """Retrieve and store task notes"""
+        dag_id = request.args.get('dag_id')
+        task_id = request.args.get('task_id')
+        execution_date = request.args.get('execution_date')
+        dttm = timezone.parse(execution_date)
+        form = DateTimeForm(data={'execution_date': dttm})
+        root = request.args.get('root', '')
+        dm_db = models.DagModel
+        ti_db = models.TaskInstance

Review comment:
       ```suggestion
           DM = models.DagModel
           TI = models.TaskInstance
   ```
   
   to follow the convention

##########
File path: airflow/models/task_note.py
##########
@@ -0,0 +1,219 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import logging
+from typing import Any, Iterable, Optional, Union
+
+import pendulum
+from sqlalchemy import Column, String, Text, and_
+from sqlalchemy.orm import Query, Session
+
+from airflow.models.base import COLLATION_ARGS, ID_LEN, Base
+from airflow.utils.helpers import is_container
+from airflow.utils.session import provide_session
+from airflow.utils.sqlalchemy import UtcDateTime
+
+log = logging.getLogger(__name__)
+
+
+class TaskNote(Base):
+    """Model that stores a note for a task id."""
+
+    __tablename__ = "task_notes"
+
+    task_id = Column(String(ID_LEN, **COLLATION_ARGS), primary_key=True)
+    dag_id = Column(String(ID_LEN, **COLLATION_ARGS), primary_key=True)
+    execution_date = Column(UtcDateTime, primary_key=True)
+    timestamp = Column(UtcDateTime, primary_key=True)
+    user_name = Column(String(ID_LEN))
+    task_note = Column(Text())
+
+    def __repr__(self):
+        return str(self.__key())
+
+    def __key(self):
+        return self.dag_id, self.task_id, self.execution_date, self.timestamp, self.user_name, self.task_note
+
+    def __hash__(self):
+        return hash(self.__key())
+
+    def __eq__(self, other):
+        if isinstance(other, TaskNote):
+            return self.__key() == other.__key()
+        return False
+
+    @classmethod
+    @provide_session
+    def set(cls, task_note, timestamp, user_name, execution_date, task_id, dag_id, session=None):
+        """
+        Store a TaskNote
+        :return: None
+        """
+        session.expunge_all()
+
+        # remove any duplicate TaskNote
+        session.query(cls).filter(
+            cls.execution_date == execution_date,
+            cls.user_name == user_name,
+            cls.task_id == task_id,
+            cls.dag_id == dag_id,
+            cls.timestamp == timestamp,
+        ).delete()
+
+        # insert new TaskNote
+        session.add(
+            TaskNote(
+                timestamp=timestamp,
+                task_note=task_note,
+                user_name=user_name,
+                execution_date=execution_date,
+                task_id=task_id,
+                dag_id=dag_id,
+            )
+        )
+
+        session.commit()
+
+    @classmethod
+    @provide_session
+    def delete(cls, notes, session=None):
+        """Delete TaskNote"""
+        if isinstance(notes, TaskNote):
+            notes = [notes]
+        for note in notes:
+            if not isinstance(note, TaskNote):
+                raise TypeError(f'Expected TaskNote; received {note.__class__.__name__}')
+            session.delete(note)
+        session.commit()

Review comment:
       This function dcoesn't seem all that necessary.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org