You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/12/14 10:25:25 UTC

[GitHub] [airflow] uranusjr opened a new pull request #20286: Add TaskMap and TaskInstance.map_id

uranusjr opened a new pull request #20286:
URL: https://github.com/apache/airflow/pull/20286


   TaskMap is populated by a TaskInstance when it pushes its return value to XCom.
   
   Mostly for feedback for now. The pushing part needs some additional logic from #19965 to discover downstreams that need mapping.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr merged pull request #20286: Add TaskMap and TaskInstance.map_id

Posted by GitBox <gi...@apache.org>.
uranusjr merged pull request #20286:
URL: https://github.com/apache/airflow/pull/20286


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #20286: Add TaskMap and TaskInstance.map_id

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #20286:
URL: https://github.com/apache/airflow/pull/20286#discussion_r784165717



##########
File path: airflow/models/taskmap.py
##########
@@ -0,0 +1,110 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Table to store "mapped" task instances (AIP-42)."""

Review comment:
       ```suggestion
   """Table to store information about mapped task instances (AIP-42)."""
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #20286: Add TaskMap and TaskInstance.map_id

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #20286:
URL: https://github.com/apache/airflow/pull/20286#discussion_r770097036



##########
File path: airflow/models/taskmap.py
##########
@@ -0,0 +1,71 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Table to store "mapped" task instances (AIP-42)."""
+
+import enum
+
+from sqlalchemy import Column, ForeignKeyConstraint, Integer, String
+
+from airflow.models.base import COLLATION_ARGS, ID_LEN, Base
+from airflow.utils.sqlalchemy import ExtendedJSON
+
+
+class TaskMapVariant(enum.Enum):
+    """Task map variant.
+
+    Possible values are **dict** (for a key-value mapping) and **list** (for an
+    ordered value sequence).
+    """
+
+    DICT = "dict"
+    LIST = "list"
+
+
+class TaskMap(Base):
+    """Model to track dynamic task-mapping information.
+
+    This is currently only populated by an upstream TaskInstance pushing an
+    XCom that's pulled by a downstream for mapping purposes.
+    """
+
+    __tablename__ = "task_map"
+
+    # Link to upstream TaskInstance creating this dynamic mapping information.
+    dag_id = Column(String(ID_LEN, **COLLATION_ARGS), primary_key=True)
+    task_id = Column(String(ID_LEN, **COLLATION_ARGS), primary_key=True)
+    run_id = Column(String(ID_LEN, **COLLATION_ARGS), primary_key=True)
+    map_id = Column(Integer, primary_key=True)

Review comment:
       Maybe we should call this `map_index` -- The `id` suffix to me implies its a FK to something, but this isn't an ID itself, but an index in to some other list.
   
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #20286: Add TaskMap and TaskInstance.map_id

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #20286:
URL: https://github.com/apache/airflow/pull/20286#discussion_r785791486



##########
File path: airflow/migrations/versions/e655c0453f75_add_taskmap_and_map_id_on_taskinstance.py
##########
@@ -0,0 +1,120 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Add TaskMap and map_index on TaskInstance.
+
+Revision ID: e655c0453f75
+Revises: 587bdf053233
+Create Date: 2021-12-13 22:59:41.052584
+"""
+
+from alembic import op
+from sqlalchemy import Column, ForeignKeyConstraint, Integer
+
+from airflow.models.base import StringID
+from airflow.utils.sqlalchemy import ExtendedJSON
+
+# Revision identifiers, used by Alembic.
+revision = "e655c0453f75"
+down_revision = "587bdf053233"
+branch_labels = None
+depends_on = None
+
+
+def upgrade():
+    """Add TaskMap and map_index on TaskInstance."""
+    # We need to first remove constraints on task_reschedule since they depend on task_instance.
+    with op.batch_alter_table("task_reschedule") as batch_op:
+        batch_op.drop_constraint("task_reschedule_ti_fkey", "foreignkey")
+        batch_op.drop_index("idx_task_reschedule_dag_task_run")
+
+    # Change task_instance's primary key.
+    with op.batch_alter_table("task_instance") as batch_op:
+        # I think we always use this name for TaskInstance after 7b2661a43ba3?
+        batch_op.drop_constraint("task_instance_pkey", type_="primary")
+        batch_op.add_column(Column("map_index", Integer, nullable=False, default=-1))

Review comment:
       @uranusjr Did this need `server_default=-1` instead?  - re https://github.com/apache/airflow/issues/20876




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on pull request #20286: Add TaskMap and TaskInstance.map_id

Posted by GitBox <gi...@apache.org>.
uranusjr commented on pull request #20286:
URL: https://github.com/apache/airflow/pull/20286#issuecomment-1011122326


   Alright, modified the code a bit and added a few tests. We can’t put mapped task in a DAG yet (`bag_dag` kept throwing AttributeError at me about a ton of things expected on BaseOperator but not yet present on MappedOperator), so a bit of mocking is needed. But otherwise things seem to work as expected.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #20286: Add TaskMap and TaskInstance.map_id

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #20286:
URL: https://github.com/apache/airflow/pull/20286#discussion_r770097836



##########
File path: tests/providers/jira/operators/test_jira.py
##########
@@ -39,8 +38,11 @@
 }
 
 
-class TestJiraOperator(unittest.TestCase):
-    def setUp(self):
+class TestJiraOperator:
+    def setup_class(self):
+        clear_db_runs()

Review comment:
       Are these changes related/needed cos of the change in this PR or something else?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #20286: Add TaskMap and TaskInstance.map_id

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #20286:
URL: https://github.com/apache/airflow/pull/20286#discussion_r780049983



##########
File path: airflow/models/taskmap.py
##########
@@ -0,0 +1,71 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Table to store "mapped" task instances (AIP-42)."""
+
+import enum
+
+from sqlalchemy import Column, ForeignKeyConstraint, Integer, String
+
+from airflow.models.base import COLLATION_ARGS, ID_LEN, Base
+from airflow.utils.sqlalchemy import ExtendedJSON
+
+
+class TaskMapVariant(enum.Enum):
+    """Task map variant.
+
+    Possible values are **dict** (for a key-value mapping) and **list** (for an
+    ordered value sequence).
+    """
+
+    DICT = "dict"
+    LIST = "list"
+
+
+class TaskMap(Base):
+    """Model to track dynamic task-mapping information.
+
+    This is currently only populated by an upstream TaskInstance pushing an
+    XCom that's pulled by a downstream for mapping purposes.
+    """
+
+    __tablename__ = "task_map"
+
+    # Link to upstream TaskInstance creating this dynamic mapping information.
+    dag_id = Column(String(ID_LEN, **COLLATION_ARGS), primary_key=True)
+    task_id = Column(String(ID_LEN, **COLLATION_ARGS), primary_key=True)
+    run_id = Column(String(ID_LEN, **COLLATION_ARGS), primary_key=True)
+    map_id = Column(Integer, primary_key=True)

Review comment:
       Changed




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #20286: Add TaskMap and TaskInstance.map_id

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #20286:
URL: https://github.com/apache/airflow/pull/20286#discussion_r780049983



##########
File path: airflow/models/taskmap.py
##########
@@ -0,0 +1,71 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Table to store "mapped" task instances (AIP-42)."""
+
+import enum
+
+from sqlalchemy import Column, ForeignKeyConstraint, Integer, String
+
+from airflow.models.base import COLLATION_ARGS, ID_LEN, Base
+from airflow.utils.sqlalchemy import ExtendedJSON
+
+
+class TaskMapVariant(enum.Enum):
+    """Task map variant.
+
+    Possible values are **dict** (for a key-value mapping) and **list** (for an
+    ordered value sequence).
+    """
+
+    DICT = "dict"
+    LIST = "list"
+
+
+class TaskMap(Base):
+    """Model to track dynamic task-mapping information.
+
+    This is currently only populated by an upstream TaskInstance pushing an
+    XCom that's pulled by a downstream for mapping purposes.
+    """
+
+    __tablename__ = "task_map"
+
+    # Link to upstream TaskInstance creating this dynamic mapping information.
+    dag_id = Column(String(ID_LEN, **COLLATION_ARGS), primary_key=True)
+    task_id = Column(String(ID_LEN, **COLLATION_ARGS), primary_key=True)
+    run_id = Column(String(ID_LEN, **COLLATION_ARGS), primary_key=True)
+    map_id = Column(Integer, primary_key=True)

Review comment:
       Changed




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #20286: Add TaskMap and TaskInstance.map_id

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #20286:
URL: https://github.com/apache/airflow/pull/20286#discussion_r783657603



##########
File path: airflow/models/baseoperator.py
##########
@@ -1632,6 +1632,33 @@ def defer(
     def map(self, **kwargs) -> "MappedOperator":
         return MappedOperator.from_operator(self, kwargs)
 
+    def has_mapped_dependants(self) -> bool:
+        """Whether any downstream dependencies depend on this task for mapping."""
+        from airflow.utils.task_group import MappedTaskGroup, TaskGroup
+
+        if not self.has_dag():
+            return False
+
+        def _walk_group(group: TaskGroup) -> Iterable[Tuple[str, DAGNode]]:
+            """Recursively walk children in a task group.
+
+            This yields all direct children (including both tasks and task
+            groups), and all children of any task groups.
+            """
+            for key, child in group.children.items():
+                yield key, child
+                if isinstance(child, TaskGroup):
+                    yield from _walk_group(child)
+
+        for key, child in _walk_group(self.dag.task_group):
+            if key == self.task_id:
+                continue
+            if not isinstance(child, (MappedOperator, MappedTaskGroup)):
+                continue
+            if self.task_id in child.upstream_task_ids:
+                return True
+        return False

Review comment:
       Done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #20286: Add TaskMap and TaskInstance.map_id

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #20286:
URL: https://github.com/apache/airflow/pull/20286#discussion_r782776457



##########
File path: airflow/migrations/versions/e655c0453f75_add_taskmap_and_map_id_on_taskinstance.py
##########
@@ -0,0 +1,120 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Add TaskMap and map_index on TaskInstance.
+
+Revision ID: e655c0453f75
+Revises: 587bdf053233
+Create Date: 2021-12-13 22:59:41.052584
+"""
+
+from alembic import op
+from sqlalchemy import Column, ForeignKeyConstraint, Integer
+
+from airflow.models.base import StringID
+from airflow.utils.sqlalchemy import ExtendedJSON
+
+# Revision identifiers, used by Alembic.
+revision = "e655c0453f75"
+down_revision = "587bdf053233"
+branch_labels = None
+depends_on = None
+
+
+def upgrade():
+    """Add TaskMap and map_index on TaskInstance."""
+    # We need to first remove constraints on task_reschedule since they depend on task_instance.
+    with op.batch_alter_table("task_reschedule") as batch_op:
+        batch_op.drop_constraint("task_reschedule_ti_fkey", "foreignkey")
+        batch_op.drop_index("idx_task_reschedule_dag_task_run")
+
+    # Change task_instance's primary key.
+    with op.batch_alter_table("task_instance") as batch_op:
+        # I think we always use this name for TaskInstance after 7b2661a43ba3?
+        batch_op.drop_constraint("task_instance_pkey", type_="primary")
+        batch_op.add_column(Column("map_index", Integer, nullable=False, default=-1))
+        batch_op.create_primary_key("task_instance_pkey", ["dag_id", "task_id", "run_id", "map_index"])
+
+    # Re-create task_reschedule's constraints.
+    with op.batch_alter_table("task_reschedule") as batch_op:
+        batch_op.add_column(Column("map_index", Integer, nullable=False, default=-1))
+        batch_op.create_foreign_key(
+            "task_reschedule_ti_fkey",
+            "task_instance",
+            ["dag_id", "task_id", "run_id", "map_index"],
+            ["dag_id", "task_id", "run_id", "map_index"],
+            ondelete="CASCADE",
+        )
+        batch_op.create_index(
+            "idx_task_reschedule_dag_task_run",
+            ["dag_id", "task_id", "run_id", "map_index"],
+            unique=False,
+        )
+
+    # Create task_map.
+    op.create_table(
+        "task_map",
+        Column("dag_id", StringID(), primary_key=True),
+        Column("task_id", StringID(), primary_key=True),
+        Column("run_id", StringID(), primary_key=True),
+        Column("map_index", Integer, primary_key=True),

Review comment:
       Actually I think this _shouldn’t ever_ be `-1` because `TI.map_index == -1` means the TI is _not_ mapped. Let me check if SQLAlchemy has an unsigned integer column type…

##########
File path: airflow/migrations/versions/e655c0453f75_add_taskmap_and_map_id_on_taskinstance.py
##########
@@ -0,0 +1,120 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Add TaskMap and map_index on TaskInstance.
+
+Revision ID: e655c0453f75
+Revises: 587bdf053233
+Create Date: 2021-12-13 22:59:41.052584
+"""
+
+from alembic import op
+from sqlalchemy import Column, ForeignKeyConstraint, Integer
+
+from airflow.models.base import StringID
+from airflow.utils.sqlalchemy import ExtendedJSON
+
+# Revision identifiers, used by Alembic.
+revision = "e655c0453f75"
+down_revision = "587bdf053233"
+branch_labels = None
+depends_on = None
+
+
+def upgrade():
+    """Add TaskMap and map_index on TaskInstance."""
+    # We need to first remove constraints on task_reschedule since they depend on task_instance.
+    with op.batch_alter_table("task_reschedule") as batch_op:
+        batch_op.drop_constraint("task_reschedule_ti_fkey", "foreignkey")
+        batch_op.drop_index("idx_task_reschedule_dag_task_run")
+
+    # Change task_instance's primary key.
+    with op.batch_alter_table("task_instance") as batch_op:
+        # I think we always use this name for TaskInstance after 7b2661a43ba3?
+        batch_op.drop_constraint("task_instance_pkey", type_="primary")
+        batch_op.add_column(Column("map_index", Integer, nullable=False, default=-1))
+        batch_op.create_primary_key("task_instance_pkey", ["dag_id", "task_id", "run_id", "map_index"])
+
+    # Re-create task_reschedule's constraints.
+    with op.batch_alter_table("task_reschedule") as batch_op:
+        batch_op.add_column(Column("map_index", Integer, nullable=False, default=-1))
+        batch_op.create_foreign_key(
+            "task_reschedule_ti_fkey",
+            "task_instance",
+            ["dag_id", "task_id", "run_id", "map_index"],
+            ["dag_id", "task_id", "run_id", "map_index"],
+            ondelete="CASCADE",
+        )
+        batch_op.create_index(
+            "idx_task_reschedule_dag_task_run",
+            ["dag_id", "task_id", "run_id", "map_index"],
+            unique=False,
+        )
+
+    # Create task_map.
+    op.create_table(
+        "task_map",
+        Column("dag_id", StringID(), primary_key=True),
+        Column("task_id", StringID(), primary_key=True),
+        Column("run_id", StringID(), primary_key=True),
+        Column("map_index", Integer, primary_key=True),

Review comment:
       Oh TIL the SQL standards do not ever define an unsigned integer type!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #20286: Add TaskMap and TaskInstance.map_id

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #20286:
URL: https://github.com/apache/airflow/pull/20286#discussion_r782776457



##########
File path: airflow/migrations/versions/e655c0453f75_add_taskmap_and_map_id_on_taskinstance.py
##########
@@ -0,0 +1,120 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Add TaskMap and map_index on TaskInstance.
+
+Revision ID: e655c0453f75
+Revises: 587bdf053233
+Create Date: 2021-12-13 22:59:41.052584
+"""
+
+from alembic import op
+from sqlalchemy import Column, ForeignKeyConstraint, Integer
+
+from airflow.models.base import StringID
+from airflow.utils.sqlalchemy import ExtendedJSON
+
+# Revision identifiers, used by Alembic.
+revision = "e655c0453f75"
+down_revision = "587bdf053233"
+branch_labels = None
+depends_on = None
+
+
+def upgrade():
+    """Add TaskMap and map_index on TaskInstance."""
+    # We need to first remove constraints on task_reschedule since they depend on task_instance.
+    with op.batch_alter_table("task_reschedule") as batch_op:
+        batch_op.drop_constraint("task_reschedule_ti_fkey", "foreignkey")
+        batch_op.drop_index("idx_task_reschedule_dag_task_run")
+
+    # Change task_instance's primary key.
+    with op.batch_alter_table("task_instance") as batch_op:
+        # I think we always use this name for TaskInstance after 7b2661a43ba3?
+        batch_op.drop_constraint("task_instance_pkey", type_="primary")
+        batch_op.add_column(Column("map_index", Integer, nullable=False, default=-1))
+        batch_op.create_primary_key("task_instance_pkey", ["dag_id", "task_id", "run_id", "map_index"])
+
+    # Re-create task_reschedule's constraints.
+    with op.batch_alter_table("task_reschedule") as batch_op:
+        batch_op.add_column(Column("map_index", Integer, nullable=False, default=-1))
+        batch_op.create_foreign_key(
+            "task_reschedule_ti_fkey",
+            "task_instance",
+            ["dag_id", "task_id", "run_id", "map_index"],
+            ["dag_id", "task_id", "run_id", "map_index"],
+            ondelete="CASCADE",
+        )
+        batch_op.create_index(
+            "idx_task_reschedule_dag_task_run",
+            ["dag_id", "task_id", "run_id", "map_index"],
+            unique=False,
+        )
+
+    # Create task_map.
+    op.create_table(
+        "task_map",
+        Column("dag_id", StringID(), primary_key=True),
+        Column("task_id", StringID(), primary_key=True),
+        Column("run_id", StringID(), primary_key=True),
+        Column("map_index", Integer, primary_key=True),

Review comment:
       Actually I think this _shouldn’t_ be `-1` because `TI.map_index == -1` means the TI is _not_ mapped. Let me check if SQLAlchemy has an unsigned interger column type…




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #20286: Add TaskMap and TaskInstance.map_id

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #20286:
URL: https://github.com/apache/airflow/pull/20286#discussion_r785799480



##########
File path: airflow/migrations/versions/e655c0453f75_add_taskmap_and_map_id_on_taskinstance.py
##########
@@ -0,0 +1,120 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Add TaskMap and map_index on TaskInstance.
+
+Revision ID: e655c0453f75
+Revises: 587bdf053233
+Create Date: 2021-12-13 22:59:41.052584
+"""
+
+from alembic import op
+from sqlalchemy import Column, ForeignKeyConstraint, Integer
+
+from airflow.models.base import StringID
+from airflow.utils.sqlalchemy import ExtendedJSON
+
+# Revision identifiers, used by Alembic.
+revision = "e655c0453f75"
+down_revision = "587bdf053233"
+branch_labels = None
+depends_on = None
+
+
+def upgrade():
+    """Add TaskMap and map_index on TaskInstance."""
+    # We need to first remove constraints on task_reschedule since they depend on task_instance.
+    with op.batch_alter_table("task_reschedule") as batch_op:
+        batch_op.drop_constraint("task_reschedule_ti_fkey", "foreignkey")
+        batch_op.drop_index("idx_task_reschedule_dag_task_run")
+
+    # Change task_instance's primary key.
+    with op.batch_alter_table("task_instance") as batch_op:
+        # I think we always use this name for TaskInstance after 7b2661a43ba3?
+        batch_op.drop_constraint("task_instance_pkey", type_="primary")
+        batch_op.add_column(Column("map_index", Integer, nullable=False, default=-1))

Review comment:
       I’m investigating if it works as expected on all engines (if not I’ll just do an `UPDATE SET map_index = -1` after this line)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #20286: Add TaskMap and TaskInstance.map_id

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #20286:
URL: https://github.com/apache/airflow/pull/20286#discussion_r782844937



##########
File path: airflow/migrations/versions/e655c0453f75_add_taskmap_and_map_id_on_taskinstance.py
##########
@@ -0,0 +1,120 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Add TaskMap and map_index on TaskInstance.
+
+Revision ID: e655c0453f75
+Revises: 587bdf053233
+Create Date: 2021-12-13 22:59:41.052584
+"""
+
+from alembic import op
+from sqlalchemy import Column, ForeignKeyConstraint, Integer
+
+from airflow.models.base import StringID
+from airflow.utils.sqlalchemy import ExtendedJSON
+
+# Revision identifiers, used by Alembic.
+revision = "e655c0453f75"
+down_revision = "587bdf053233"
+branch_labels = None
+depends_on = None
+
+
+def upgrade():
+    """Add TaskMap and map_index on TaskInstance."""
+    # We need to first remove constraints on task_reschedule since they depend on task_instance.
+    with op.batch_alter_table("task_reschedule") as batch_op:
+        batch_op.drop_constraint("task_reschedule_ti_fkey", "foreignkey")
+        batch_op.drop_index("idx_task_reschedule_dag_task_run")
+
+    # Change task_instance's primary key.
+    with op.batch_alter_table("task_instance") as batch_op:
+        # I think we always use this name for TaskInstance after 7b2661a43ba3?
+        batch_op.drop_constraint("task_instance_pkey", type_="primary")
+        batch_op.add_column(Column("map_index", Integer, nullable=False, default=-1))
+        batch_op.create_primary_key("task_instance_pkey", ["dag_id", "task_id", "run_id", "map_index"])
+
+    # Re-create task_reschedule's constraints.
+    with op.batch_alter_table("task_reschedule") as batch_op:
+        batch_op.add_column(Column("map_index", Integer, nullable=False, default=-1))
+        batch_op.create_foreign_key(
+            "task_reschedule_ti_fkey",
+            "task_instance",
+            ["dag_id", "task_id", "run_id", "map_index"],
+            ["dag_id", "task_id", "run_id", "map_index"],
+            ondelete="CASCADE",
+        )
+        batch_op.create_index(
+            "idx_task_reschedule_dag_task_run",
+            ["dag_id", "task_id", "run_id", "map_index"],
+            unique=False,
+        )
+
+    # Create task_map.
+    op.create_table(
+        "task_map",
+        Column("dag_id", StringID(), primary_key=True),
+        Column("task_id", StringID(), primary_key=True),
+        Column("run_id", StringID(), primary_key=True),
+        Column("map_index", Integer, primary_key=True),

Review comment:
       The default on `TI.map_index` is mainly added so we don’t need to change too much existing code (otherwise we’d need to add a ton of `map_index=-1`), but this being a new class, I think I prefer being explicit instead.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on pull request #20286: Add TaskMap and TaskInstance.map_id

Posted by GitBox <gi...@apache.org>.
uranusjr commented on pull request #20286:
URL: https://github.com/apache/airflow/pull/20286#issuecomment-1008546911


   I think this is ready-ish


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #20286: Add TaskMap and TaskInstance.map_id

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #20286:
URL: https://github.com/apache/airflow/pull/20286#discussion_r781128089



##########
File path: airflow/models/taskinstance.py
##########
@@ -2128,6 +2135,28 @@ def set_duration(self) -> None:
             self.duration = None
         self.log.debug("Task Duration set to %s", self.duration)
 
+    @provide_session
+    def _record_task_map_for_downstreams(self, value: Any, *, session: Session = NEW_SESSION) -> None:
+        if not self.task.has_mapped_dependants():
+            return
+        if not isinstance(value, collections.abc.Collection):
+            return  # TODO: Error if the pushed value is not mappable?
+        session.query(TaskMap).filter_by(
+            dag_id=self.dag_id,
+            task_id=self.task_id,
+            run_id=self.run_id,
+            map_index=self.map_index,
+        ).delete()
+        instance = TaskMap(
+            dag_id=self.dag_id,
+            task_id=self.task_id,
+            run_id=self.run_id,
+            map_index=self.map_index,
+            length=len(value),
+            keys=(list(value) if isinstance(value, collections.abc.Mapping) else None),

Review comment:
       Maybe it makes sense to move this logic in to the ctor for `TaskMap`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #20286: Add TaskMap and TaskInstance.map_id

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #20286:
URL: https://github.com/apache/airflow/pull/20286#discussion_r772287878



##########
File path: tests/providers/jira/operators/test_jira.py
##########
@@ -39,8 +38,11 @@
 }
 
 
-class TestJiraOperator(unittest.TestCase):
-    def setUp(self):
+class TestJiraOperator:
+    def setup_class(self):
+        clear_db_runs()

Review comment:
       I was debugging issues. These will be rolled back if they end up not needed when the PR goes out of draft state.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #20286: Add TaskMap and TaskInstance.map_id

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #20286:
URL: https://github.com/apache/airflow/pull/20286#discussion_r780201382



##########
File path: airflow/migrations/versions/e655c0453f75_add_taskmap_and_map_id_on_taskinstance.py
##########
@@ -0,0 +1,124 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Add TaskMap and map_index on TaskInstance.
+
+Revision ID: e655c0453f75
+Revises: 587bdf053233
+Create Date: 2021-12-13 22:59:41.052584
+"""
+
+from alembic import op
+from sqlalchemy import Column, ForeignKeyConstraint, Integer, String
+
+from airflow.models.base import COLLATION_ARGS
+from airflow.utils.sqlalchemy import ExtendedJSON
+
+# Revision identifiers, used by Alembic.
+revision = "e655c0453f75"
+down_revision = "587bdf053233"
+branch_labels = None
+depends_on = None
+
+
+def upgrade():
+    """Add TaskMap and map_index on TaskInstance."""
+    # We need to first remove constraints on task_reschedule since they depend on task_instance.
+    with op.batch_alter_table("task_reschedule") as batch_op:
+        batch_op.drop_index("idx_task_reschedule_dag_task_run")
+        batch_op.drop_constraint("task_reschedule_ti_fkey", "foreignkey")
+        if op.get_bind().dialect.name == "mysql":  # MySQL also creates an index.
+            batch_op.drop_index("task_reschedule_ti_fkey")
+
+    # Change task_instance's primary key.
+    with op.batch_alter_table("task_instance") as batch_op:
+        # I think we always use this name for TaskInstance after 7b2661a43ba3?
+        batch_op.drop_constraint("task_instance_pkey", type_="primary")
+        batch_op.add_column(Column("map_index", Integer, default=-1))
+        batch_op.create_primary_key("task_instance_pkey", ["dag_id", "task_id", "run_id", "map_index"])
+
+    # Re-create task_reschedule's constraints.
+    with op.batch_alter_table("task_reschedule") as batch_op:
+        batch_op.add_column(Column("map_index", Integer, nullable=False, default=-1))
+        batch_op.create_foreign_key(
+            "task_reschedule_ti_fkey",
+            "task_instance",
+            ["dag_id", "task_id", "run_id", "map_index"],
+            ["dag_id", "task_id", "run_id", "map_index"],
+            ondelete="CASCADE",
+        )
+        batch_op.create_index(
+            "idx_task_reschedule_dag_task_run",
+            ["dag_id", "task_id", "run_id", "map_index"],
+            unique=False,
+        )
+
+    # Create task_map.
+    op.create_table(
+        "task_map",
+        Column("dag_id", String(250, **COLLATION_ARGS), primary_key=True),

Review comment:
       `StringID` from airflow.models.base
   
   ```suggestion
           Column("dag_id", StringID(), primary_key=True),
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #20286: Add TaskMap and TaskInstance.map_id

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #20286:
URL: https://github.com/apache/airflow/pull/20286#discussion_r782776457



##########
File path: airflow/migrations/versions/e655c0453f75_add_taskmap_and_map_id_on_taskinstance.py
##########
@@ -0,0 +1,120 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Add TaskMap and map_index on TaskInstance.
+
+Revision ID: e655c0453f75
+Revises: 587bdf053233
+Create Date: 2021-12-13 22:59:41.052584
+"""
+
+from alembic import op
+from sqlalchemy import Column, ForeignKeyConstraint, Integer
+
+from airflow.models.base import StringID
+from airflow.utils.sqlalchemy import ExtendedJSON
+
+# Revision identifiers, used by Alembic.
+revision = "e655c0453f75"
+down_revision = "587bdf053233"
+branch_labels = None
+depends_on = None
+
+
+def upgrade():
+    """Add TaskMap and map_index on TaskInstance."""
+    # We need to first remove constraints on task_reschedule since they depend on task_instance.
+    with op.batch_alter_table("task_reschedule") as batch_op:
+        batch_op.drop_constraint("task_reschedule_ti_fkey", "foreignkey")
+        batch_op.drop_index("idx_task_reschedule_dag_task_run")
+
+    # Change task_instance's primary key.
+    with op.batch_alter_table("task_instance") as batch_op:
+        # I think we always use this name for TaskInstance after 7b2661a43ba3?
+        batch_op.drop_constraint("task_instance_pkey", type_="primary")
+        batch_op.add_column(Column("map_index", Integer, nullable=False, default=-1))
+        batch_op.create_primary_key("task_instance_pkey", ["dag_id", "task_id", "run_id", "map_index"])
+
+    # Re-create task_reschedule's constraints.
+    with op.batch_alter_table("task_reschedule") as batch_op:
+        batch_op.add_column(Column("map_index", Integer, nullable=False, default=-1))
+        batch_op.create_foreign_key(
+            "task_reschedule_ti_fkey",
+            "task_instance",
+            ["dag_id", "task_id", "run_id", "map_index"],
+            ["dag_id", "task_id", "run_id", "map_index"],
+            ondelete="CASCADE",
+        )
+        batch_op.create_index(
+            "idx_task_reschedule_dag_task_run",
+            ["dag_id", "task_id", "run_id", "map_index"],
+            unique=False,
+        )
+
+    # Create task_map.
+    op.create_table(
+        "task_map",
+        Column("dag_id", StringID(), primary_key=True),
+        Column("task_id", StringID(), primary_key=True),
+        Column("run_id", StringID(), primary_key=True),
+        Column("map_index", Integer, primary_key=True),

Review comment:
       Actually I think this _shouldn’t ever_ be `-1` because `TI.map_index == -1` means the TI is _not_ mapped. Let me check if SQLAlchemy has an unsigned integer column type…




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #20286: Add TaskMap and TaskInstance.map_id

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #20286:
URL: https://github.com/apache/airflow/pull/20286#issuecomment-1012355382


   The PR most likely needs to run full matrix of tests because it modifies parts of the core of Airflow. However, committers might decide to merge it quickly and take the risk. If they don't merge it quickly - please rebase it to the latest main at your convenience, or amend the last commit of the PR, and push it with --force-with-lease.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #20286: Add TaskMap and TaskInstance.map_id

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #20286:
URL: https://github.com/apache/airflow/pull/20286#discussion_r782838839



##########
File path: airflow/migrations/versions/e655c0453f75_add_taskmap_and_map_id_on_taskinstance.py
##########
@@ -0,0 +1,120 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Add TaskMap and map_index on TaskInstance.
+
+Revision ID: e655c0453f75
+Revises: 587bdf053233
+Create Date: 2021-12-13 22:59:41.052584
+"""
+
+from alembic import op
+from sqlalchemy import Column, ForeignKeyConstraint, Integer
+
+from airflow.models.base import StringID
+from airflow.utils.sqlalchemy import ExtendedJSON
+
+# Revision identifiers, used by Alembic.
+revision = "e655c0453f75"
+down_revision = "587bdf053233"
+branch_labels = None
+depends_on = None
+
+
+def upgrade():
+    """Add TaskMap and map_index on TaskInstance."""
+    # We need to first remove constraints on task_reschedule since they depend on task_instance.
+    with op.batch_alter_table("task_reschedule") as batch_op:
+        batch_op.drop_constraint("task_reschedule_ti_fkey", "foreignkey")
+        batch_op.drop_index("idx_task_reschedule_dag_task_run")
+
+    # Change task_instance's primary key.
+    with op.batch_alter_table("task_instance") as batch_op:
+        # I think we always use this name for TaskInstance after 7b2661a43ba3?
+        batch_op.drop_constraint("task_instance_pkey", type_="primary")
+        batch_op.add_column(Column("map_index", Integer, nullable=False, default=-1))
+        batch_op.create_primary_key("task_instance_pkey", ["dag_id", "task_id", "run_id", "map_index"])
+
+    # Re-create task_reschedule's constraints.
+    with op.batch_alter_table("task_reschedule") as batch_op:
+        batch_op.add_column(Column("map_index", Integer, nullable=False, default=-1))
+        batch_op.create_foreign_key(
+            "task_reschedule_ti_fkey",
+            "task_instance",
+            ["dag_id", "task_id", "run_id", "map_index"],
+            ["dag_id", "task_id", "run_id", "map_index"],
+            ondelete="CASCADE",
+        )
+        batch_op.create_index(
+            "idx_task_reschedule_dag_task_run",
+            ["dag_id", "task_id", "run_id", "map_index"],
+            unique=False,
+        )
+
+    # Create task_map.
+    op.create_table(
+        "task_map",
+        Column("dag_id", StringID(), primary_key=True),
+        Column("task_id", StringID(), primary_key=True),
+        Column("run_id", StringID(), primary_key=True),
+        Column("map_index", Integer, primary_key=True),

Review comment:
       Added a CheckConstraint and a check in `__init__` for this instead.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #20286: Add TaskMap and TaskInstance.map_id

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #20286:
URL: https://github.com/apache/airflow/pull/20286#discussion_r783194609



##########
File path: airflow/models/taskinstance.py
##########
@@ -2128,6 +2138,14 @@ def set_duration(self) -> None:
             self.duration = None
         self.log.debug("Task Duration set to %s", self.duration)
 
+    def _record_task_map_for_downstreams(self, value: Any, *, session: Session) -> None:
+        if not self.task.has_mapped_dependants():
+            return
+        if not isinstance(value, collections.abc.Collection) or isinstance(value, (bytes, str)):
+            self.log.info("Failing %s for unmappable XCom push %r", self.key, value)
+            raise UnmappableXComPushed(value)

Review comment:
       Oh good point. Even doing `repr(value)[:100]` can potentially consume a lot of memory, so I guess the only reasonable approach is to not log the value at all. Worst case, the user can manually look up that problematic value in the XCom table.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #20286: Add TaskMap and TaskInstance.map_id

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #20286:
URL: https://github.com/apache/airflow/pull/20286#discussion_r781126481



##########
File path: airflow/models/taskinstance.py
##########
@@ -2128,6 +2135,28 @@ def set_duration(self) -> None:
             self.duration = None
         self.log.debug("Task Duration set to %s", self.duration)
 
+    @provide_session
+    def _record_task_map_for_downstreams(self, value: Any, *, session: Session = NEW_SESSION) -> None:
+        if not self.task.has_mapped_dependants():
+            return
+        if not isinstance(value, collections.abc.Collection):
+            return  # TODO: Error if the pushed value is not mappable?

Review comment:
       Yeah, I think this should fail the task.

##########
File path: airflow/migrations/versions/e655c0453f75_add_taskmap_and_map_id_on_taskinstance.py
##########
@@ -0,0 +1,120 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Add TaskMap and map_index on TaskInstance.
+
+Revision ID: e655c0453f75
+Revises: 587bdf053233
+Create Date: 2021-12-13 22:59:41.052584
+"""
+
+from alembic import op
+from sqlalchemy import Column, ForeignKeyConstraint, Integer
+
+from airflow.models.base import StringID
+from airflow.utils.sqlalchemy import ExtendedJSON
+
+# Revision identifiers, used by Alembic.
+revision = "e655c0453f75"
+down_revision = "587bdf053233"
+branch_labels = None
+depends_on = None
+
+
+def upgrade():
+    """Add TaskMap and map_index on TaskInstance."""
+    # We need to first remove constraints on task_reschedule since they depend on task_instance.
+    with op.batch_alter_table("task_reschedule") as batch_op:
+        batch_op.drop_constraint("task_reschedule_ti_fkey", "foreignkey")
+        batch_op.drop_index("idx_task_reschedule_dag_task_run")
+
+    # Change task_instance's primary key.
+    with op.batch_alter_table("task_instance") as batch_op:
+        # I think we always use this name for TaskInstance after 7b2661a43ba3?
+        batch_op.drop_constraint("task_instance_pkey", type_="primary")
+        batch_op.add_column(Column("map_index", Integer, nullable=False, default=-1))
+        batch_op.create_primary_key("task_instance_pkey", ["dag_id", "task_id", "run_id", "map_index"])
+
+    # Re-create task_reschedule's constraints.
+    with op.batch_alter_table("task_reschedule") as batch_op:
+        batch_op.add_column(Column("map_index", Integer, nullable=False, default=-1))
+        batch_op.create_foreign_key(
+            "task_reschedule_ti_fkey",
+            "task_instance",
+            ["dag_id", "task_id", "run_id", "map_index"],
+            ["dag_id", "task_id", "run_id", "map_index"],
+            ondelete="CASCADE",
+        )
+        batch_op.create_index(
+            "idx_task_reschedule_dag_task_run",
+            ["dag_id", "task_id", "run_id", "map_index"],
+            unique=False,
+        )
+
+    # Create task_map.
+    op.create_table(
+        "task_map",
+        Column("dag_id", StringID(), primary_key=True),
+        Column("task_id", StringID(), primary_key=True),
+        Column("run_id", StringID(), primary_key=True),
+        Column("map_index", Integer, primary_key=True),

Review comment:
       Not that it matters, but for consistency should we have the same default here as we do on TI's map_index column?

##########
File path: airflow/models/taskinstance.py
##########
@@ -2128,6 +2135,28 @@ def set_duration(self) -> None:
             self.duration = None
         self.log.debug("Task Duration set to %s", self.duration)
 
+    @provide_session
+    def _record_task_map_for_downstreams(self, value: Any, *, session: Session = NEW_SESSION) -> None:

Review comment:
       ```suggestion
       def _record_task_map_for_downstreams(self, value: Any, *, session: Session) -> None:
   ```
   
   I generally avoid using `provide_session` on internal funcs and favour being explicit about passing the session instead.

##########
File path: airflow/models/baseoperator.py
##########
@@ -1632,6 +1632,33 @@ def defer(
     def map(self, **kwargs) -> "MappedOperator":
         return MappedOperator.from_operator(self, kwargs)
 
+    def has_mapped_dependants(self) -> bool:
+        """Whether any downstream dependencies depend on this task for mapping."""
+        from airflow.utils.task_group import MappedTaskGroup, TaskGroup
+
+        if not self.has_dag():
+            return False
+
+        def _walk_group(group: TaskGroup) -> Iterable[Tuple[str, DAGNode]]:
+            """Recursively walk children in a task group.
+
+            This yields all direct children (including both tasks and task
+            groups), and all children of any task groups.
+            """
+            for key, child in group.children.items():
+                yield key, child
+                if isinstance(child, TaskGroup):
+                    yield from _walk_group(child)
+
+        for key, child in _walk_group(self.dag.task_group):
+            if key == self.task_id:
+                continue
+            if not isinstance(child, (MappedOperator, MappedTaskGroup)):
+                continue
+            if self.task_id in child.upstream_task_ids:
+                return True
+        return False

Review comment:
       Why do we walk the entire dag, rather than looking at `self.downstream_list`?

##########
File path: airflow/models/taskinstance.py
##########
@@ -2128,6 +2135,28 @@ def set_duration(self) -> None:
             self.duration = None
         self.log.debug("Task Duration set to %s", self.duration)
 
+    @provide_session
+    def _record_task_map_for_downstreams(self, value: Any, *, session: Session = NEW_SESSION) -> None:
+        if not self.task.has_mapped_dependants():
+            return
+        if not isinstance(value, collections.abc.Collection):
+            return  # TODO: Error if the pushed value is not mappable?
+        session.query(TaskMap).filter_by(
+            dag_id=self.dag_id,
+            task_id=self.task_id,
+            run_id=self.run_id,
+            map_index=self.map_index,
+        ).delete()

Review comment:
       If we are doing session.merge we don't _have to_ delete it as SQLA would handle doing the right INSERT/UPDATE itself.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #20286: Add TaskMap and TaskInstance.map_id

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #20286:
URL: https://github.com/apache/airflow/pull/20286#discussion_r783171485



##########
File path: airflow/models/baseoperator.py
##########
@@ -1632,6 +1632,33 @@ def defer(
     def map(self, **kwargs) -> "MappedOperator":
         return MappedOperator.from_operator(self, kwargs)
 
+    def has_mapped_dependants(self) -> bool:
+        """Whether any downstream dependencies depend on this task for mapping."""
+        from airflow.utils.task_group import MappedTaskGroup, TaskGroup
+
+        if not self.has_dag():
+            return False
+
+        def _walk_group(group: TaskGroup) -> Iterable[Tuple[str, DAGNode]]:
+            """Recursively walk children in a task group.
+
+            This yields all direct children (including both tasks and task
+            groups), and all children of any task groups.
+            """
+            for key, child in group.children.items():
+                yield key, child
+                if isinstance(child, TaskGroup):
+                    yield from _walk_group(child)
+
+        for key, child in _walk_group(self.dag.task_group):
+            if key == self.task_id:
+                continue
+            if not isinstance(child, (MappedOperator, MappedTaskGroup)):
+                continue
+            if self.task_id in child.upstream_task_ids:
+                return True
+        return False

Review comment:
       Could you add a comment saying why?
   
   (Cos I think fairly soon it will be time to have TaskGroups directly in the graph rather than tacked on and UI only like we have it)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #20286: Add TaskMap and TaskInstance.map_id

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #20286:
URL: https://github.com/apache/airflow/pull/20286#discussion_r784165717



##########
File path: airflow/models/taskmap.py
##########
@@ -0,0 +1,110 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Table to store "mapped" task instances (AIP-42)."""

Review comment:
       ```suggestion
   """Table to store information *about* mapped task instances (AIP-42)."""
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #20286: Add TaskMap and TaskInstance.map_id

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #20286:
URL: https://github.com/apache/airflow/pull/20286#discussion_r786073606



##########
File path: airflow/migrations/versions/e655c0453f75_add_taskmap_and_map_id_on_taskinstance.py
##########
@@ -0,0 +1,120 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Add TaskMap and map_index on TaskInstance.
+
+Revision ID: e655c0453f75
+Revises: 587bdf053233
+Create Date: 2021-12-13 22:59:41.052584
+"""
+
+from alembic import op
+from sqlalchemy import Column, ForeignKeyConstraint, Integer
+
+from airflow.models.base import StringID
+from airflow.utils.sqlalchemy import ExtendedJSON
+
+# Revision identifiers, used by Alembic.
+revision = "e655c0453f75"
+down_revision = "587bdf053233"
+branch_labels = None
+depends_on = None
+
+
+def upgrade():
+    """Add TaskMap and map_index on TaskInstance."""
+    # We need to first remove constraints on task_reschedule since they depend on task_instance.
+    with op.batch_alter_table("task_reschedule") as batch_op:
+        batch_op.drop_constraint("task_reschedule_ti_fkey", "foreignkey")
+        batch_op.drop_index("idx_task_reschedule_dag_task_run")
+
+    # Change task_instance's primary key.
+    with op.batch_alter_table("task_instance") as batch_op:
+        # I think we always use this name for TaskInstance after 7b2661a43ba3?
+        batch_op.drop_constraint("task_instance_pkey", type_="primary")
+        batch_op.add_column(Column("map_index", Integer, nullable=False, default=-1))

Review comment:
       Doing it with a server default is much preferred -- as otherwise an UPDATE requires us to re-write/touch every row, but a server default (at least on Postgres) doesn't.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #20286: Add TaskMap and TaskInstance.map_id

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #20286:
URL: https://github.com/apache/airflow/pull/20286#discussion_r782769669



##########
File path: airflow/models/baseoperator.py
##########
@@ -1632,6 +1632,33 @@ def defer(
     def map(self, **kwargs) -> "MappedOperator":
         return MappedOperator.from_operator(self, kwargs)
 
+    def has_mapped_dependants(self) -> bool:
+        """Whether any downstream dependencies depend on this task for mapping."""
+        from airflow.utils.task_group import MappedTaskGroup, TaskGroup
+
+        if not self.has_dag():
+            return False
+
+        def _walk_group(group: TaskGroup) -> Iterable[Tuple[str, DAGNode]]:
+            """Recursively walk children in a task group.
+
+            This yields all direct children (including both tasks and task
+            groups), and all children of any task groups.
+            """
+            for key, child in group.children.items():
+                yield key, child
+                if isinstance(child, TaskGroup):
+                    yield from _walk_group(child)
+
+        for key, child in _walk_group(self.dag.task_group):
+            if key == self.task_id:
+                continue
+            if not isinstance(child, (MappedOperator, MappedTaskGroup)):
+                continue
+            if self.task_id in child.upstream_task_ids:
+                return True
+        return False

Review comment:
       Because `downstream_list` misses (mapped) task groups, I believe.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #20286: Add TaskMap and TaskInstance.map_id

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #20286:
URL: https://github.com/apache/airflow/pull/20286#discussion_r782776457



##########
File path: airflow/migrations/versions/e655c0453f75_add_taskmap_and_map_id_on_taskinstance.py
##########
@@ -0,0 +1,120 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Add TaskMap and map_index on TaskInstance.
+
+Revision ID: e655c0453f75
+Revises: 587bdf053233
+Create Date: 2021-12-13 22:59:41.052584
+"""
+
+from alembic import op
+from sqlalchemy import Column, ForeignKeyConstraint, Integer
+
+from airflow.models.base import StringID
+from airflow.utils.sqlalchemy import ExtendedJSON
+
+# Revision identifiers, used by Alembic.
+revision = "e655c0453f75"
+down_revision = "587bdf053233"
+branch_labels = None
+depends_on = None
+
+
+def upgrade():
+    """Add TaskMap and map_index on TaskInstance."""
+    # We need to first remove constraints on task_reschedule since they depend on task_instance.
+    with op.batch_alter_table("task_reschedule") as batch_op:
+        batch_op.drop_constraint("task_reschedule_ti_fkey", "foreignkey")
+        batch_op.drop_index("idx_task_reschedule_dag_task_run")
+
+    # Change task_instance's primary key.
+    with op.batch_alter_table("task_instance") as batch_op:
+        # I think we always use this name for TaskInstance after 7b2661a43ba3?
+        batch_op.drop_constraint("task_instance_pkey", type_="primary")
+        batch_op.add_column(Column("map_index", Integer, nullable=False, default=-1))
+        batch_op.create_primary_key("task_instance_pkey", ["dag_id", "task_id", "run_id", "map_index"])
+
+    # Re-create task_reschedule's constraints.
+    with op.batch_alter_table("task_reschedule") as batch_op:
+        batch_op.add_column(Column("map_index", Integer, nullable=False, default=-1))
+        batch_op.create_foreign_key(
+            "task_reschedule_ti_fkey",
+            "task_instance",
+            ["dag_id", "task_id", "run_id", "map_index"],
+            ["dag_id", "task_id", "run_id", "map_index"],
+            ondelete="CASCADE",
+        )
+        batch_op.create_index(
+            "idx_task_reschedule_dag_task_run",
+            ["dag_id", "task_id", "run_id", "map_index"],
+            unique=False,
+        )
+
+    # Create task_map.
+    op.create_table(
+        "task_map",
+        Column("dag_id", StringID(), primary_key=True),
+        Column("task_id", StringID(), primary_key=True),
+        Column("run_id", StringID(), primary_key=True),
+        Column("map_index", Integer, primary_key=True),

Review comment:
       Actually I think this _shouldn’t ever_ be `-1` because `TI.map_index == -1` means the TI is _not_ mapped. Let me check if SQLAlchemy has an unsigned interger column type…




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #20286: Add TaskMap and TaskInstance.map_id

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #20286:
URL: https://github.com/apache/airflow/pull/20286#discussion_r783658006



##########
File path: airflow/models/taskinstance.py
##########
@@ -2128,6 +2138,14 @@ def set_duration(self) -> None:
             self.duration = None
         self.log.debug("Task Duration set to %s", self.duration)
 
+    def _record_task_map_for_downstreams(self, value: Any, *, session: Session) -> None:
+        if not self.task.has_mapped_dependants():
+            return
+        if not isinstance(value, collections.abc.Collection) or isinstance(value, (bytes, str)):
+            self.log.info("Failing %s for unmappable XCom push %r", self.key, value)
+            raise UnmappableXComPushed(value)

Review comment:
       Changed both logging and exception to only include the variable type instead.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #20286: Add TaskMap and TaskInstance.map_id

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #20286:
URL: https://github.com/apache/airflow/pull/20286#discussion_r782781047



##########
File path: airflow/migrations/versions/e655c0453f75_add_taskmap_and_map_id_on_taskinstance.py
##########
@@ -0,0 +1,120 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Add TaskMap and map_index on TaskInstance.
+
+Revision ID: e655c0453f75
+Revises: 587bdf053233
+Create Date: 2021-12-13 22:59:41.052584
+"""
+
+from alembic import op
+from sqlalchemy import Column, ForeignKeyConstraint, Integer
+
+from airflow.models.base import StringID
+from airflow.utils.sqlalchemy import ExtendedJSON
+
+# Revision identifiers, used by Alembic.
+revision = "e655c0453f75"
+down_revision = "587bdf053233"
+branch_labels = None
+depends_on = None
+
+
+def upgrade():
+    """Add TaskMap and map_index on TaskInstance."""
+    # We need to first remove constraints on task_reschedule since they depend on task_instance.
+    with op.batch_alter_table("task_reschedule") as batch_op:
+        batch_op.drop_constraint("task_reschedule_ti_fkey", "foreignkey")
+        batch_op.drop_index("idx_task_reschedule_dag_task_run")
+
+    # Change task_instance's primary key.
+    with op.batch_alter_table("task_instance") as batch_op:
+        # I think we always use this name for TaskInstance after 7b2661a43ba3?
+        batch_op.drop_constraint("task_instance_pkey", type_="primary")
+        batch_op.add_column(Column("map_index", Integer, nullable=False, default=-1))
+        batch_op.create_primary_key("task_instance_pkey", ["dag_id", "task_id", "run_id", "map_index"])
+
+    # Re-create task_reschedule's constraints.
+    with op.batch_alter_table("task_reschedule") as batch_op:
+        batch_op.add_column(Column("map_index", Integer, nullable=False, default=-1))
+        batch_op.create_foreign_key(
+            "task_reschedule_ti_fkey",
+            "task_instance",
+            ["dag_id", "task_id", "run_id", "map_index"],
+            ["dag_id", "task_id", "run_id", "map_index"],
+            ondelete="CASCADE",
+        )
+        batch_op.create_index(
+            "idx_task_reschedule_dag_task_run",
+            ["dag_id", "task_id", "run_id", "map_index"],
+            unique=False,
+        )
+
+    # Create task_map.
+    op.create_table(
+        "task_map",
+        Column("dag_id", StringID(), primary_key=True),
+        Column("task_id", StringID(), primary_key=True),
+        Column("run_id", StringID(), primary_key=True),
+        Column("map_index", Integer, primary_key=True),

Review comment:
       Oh TIL the SQL standards do not ever define an unsigned integer type!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on pull request #20286: Add TaskMap and TaskInstance.map_id

Posted by GitBox <gi...@apache.org>.
ashb commented on pull request #20286:
URL: https://github.com/apache/airflow/pull/20286#issuecomment-1011149780


   > so a bit of mocking is needed. But otherwise things seem to work as expected.
   
   Yeah, I guess those'll be fixed by https://github.com/apache/airflow/pull/20743


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #20286: Add TaskMap and TaskInstance.map_id

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #20286:
URL: https://github.com/apache/airflow/pull/20286#discussion_r783184829



##########
File path: airflow/models/taskinstance.py
##########
@@ -2128,6 +2138,14 @@ def set_duration(self) -> None:
             self.duration = None
         self.log.debug("Task Duration set to %s", self.duration)
 
+    def _record_task_map_for_downstreams(self, value: Any, *, session: Session) -> None:
+        if not self.task.has_mapped_dependants():
+            return
+        if not isinstance(value, collections.abc.Collection) or isinstance(value, (bytes, str)):
+            self.log.info("Failing %s for unmappable XCom push %r", self.key, value)
+            raise UnmappableXComPushed(value)

Review comment:
       I don't think we should include the _whole_ value here -- it could potentially be a huge object/structure/etc. (Either here, or in the exception itself.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #20286: Add TaskMap and TaskInstance.map_id

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #20286:
URL: https://github.com/apache/airflow/pull/20286#discussion_r780201382



##########
File path: airflow/migrations/versions/e655c0453f75_add_taskmap_and_map_id_on_taskinstance.py
##########
@@ -0,0 +1,124 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Add TaskMap and map_index on TaskInstance.
+
+Revision ID: e655c0453f75
+Revises: 587bdf053233
+Create Date: 2021-12-13 22:59:41.052584
+"""
+
+from alembic import op
+from sqlalchemy import Column, ForeignKeyConstraint, Integer, String
+
+from airflow.models.base import COLLATION_ARGS
+from airflow.utils.sqlalchemy import ExtendedJSON
+
+# Revision identifiers, used by Alembic.
+revision = "e655c0453f75"
+down_revision = "587bdf053233"
+branch_labels = None
+depends_on = None
+
+
+def upgrade():
+    """Add TaskMap and map_index on TaskInstance."""
+    # We need to first remove constraints on task_reschedule since they depend on task_instance.
+    with op.batch_alter_table("task_reschedule") as batch_op:
+        batch_op.drop_index("idx_task_reschedule_dag_task_run")
+        batch_op.drop_constraint("task_reschedule_ti_fkey", "foreignkey")
+        if op.get_bind().dialect.name == "mysql":  # MySQL also creates an index.
+            batch_op.drop_index("task_reschedule_ti_fkey")
+
+    # Change task_instance's primary key.
+    with op.batch_alter_table("task_instance") as batch_op:
+        # I think we always use this name for TaskInstance after 7b2661a43ba3?
+        batch_op.drop_constraint("task_instance_pkey", type_="primary")
+        batch_op.add_column(Column("map_index", Integer, default=-1))
+        batch_op.create_primary_key("task_instance_pkey", ["dag_id", "task_id", "run_id", "map_index"])
+
+    # Re-create task_reschedule's constraints.
+    with op.batch_alter_table("task_reschedule") as batch_op:
+        batch_op.add_column(Column("map_index", Integer, nullable=False, default=-1))
+        batch_op.create_foreign_key(
+            "task_reschedule_ti_fkey",
+            "task_instance",
+            ["dag_id", "task_id", "run_id", "map_index"],
+            ["dag_id", "task_id", "run_id", "map_index"],
+            ondelete="CASCADE",
+        )
+        batch_op.create_index(
+            "idx_task_reschedule_dag_task_run",
+            ["dag_id", "task_id", "run_id", "map_index"],
+            unique=False,
+        )
+
+    # Create task_map.
+    op.create_table(
+        "task_map",
+        Column("dag_id", String(250, **COLLATION_ARGS), primary_key=True),

Review comment:
       `StringID` from airflow.models.base
   
   ```suggestion
           Column("dag_id", StringID(), primary_key=True),
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr edited a comment on pull request #20286: Add TaskMap and TaskInstance.map_id

Posted by GitBox <gi...@apache.org>.
uranusjr edited a comment on pull request #20286:
URL: https://github.com/apache/airflow/pull/20286#issuecomment-1008546911


   I think this is ready-ish. Probably needs more eyes on the migration code.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org