You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/04/14 09:44:52 UTC

[GitHub] [airflow] thejens opened a new pull request #15367: Implement BigQuery Table Schema Patch Operator

thejens opened a new pull request #15367:
URL: https://github.com/apache/airflow/pull/15367


   With this change we implement a new operator that handles patching of table schemas in bigquery.
   
   This is needed as typing out an entire schema data structure (schema), in order to set e.g. a field description on a single field requires a lot of overhead. Also, many times the schema is not known or very complex as it may be the result of a Query or parsed automatically when importing files as tables.
   
   This operator is useful for a workflow like:
   Upstream: Create a BigQuery table as the output of a Query or import operator. Writer of job/operator knows the names of the fields, perhaps the types, but not necessarily how other schema fields are defined.
   
   Downstream (this operator): Supply a partial schema definition that only contains field names and description values that will be patched on to the "generated by bigquery" schema from upstream.
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] thejens commented on a change in pull request #15367: Implement BigQuery Table Schema Update Operator

Posted by GitBox <gi...@apache.org>.
thejens commented on a change in pull request #15367:
URL: https://github.com/apache/airflow/pull/15367#discussion_r619658132



##########
File path: airflow/providers/google/cloud/hooks/bigquery.py
##########
@@ -1353,6 +1353,86 @@ def get_schema(self, dataset_id: str, table_id: str, project_id: Optional[str] =
         table = self.get_client(project_id=project_id).get_table(table_ref)
         return {"fields": [s.to_api_repr() for s in table.schema]}
 
+    @GoogleBaseHook.fallback_to_default_project_id
+    def update_table_schema(
+        self,
+        schema_fields_updates: List[Dict[str, Any]],
+        dataset_id: str,
+        table_id: str,
+        project_id: Optional[str] = None,
+    ) -> None:
+        """
+        Update fields within a schema for a given dataset and table. Note that
+        some fields in schemas are immutable and trying to change them will cause
+        an exception.
+        If a new field is included it will be inserted which requires all required fields to be set.
+        See https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#TableSchema
+
+        :param dataset_id: the dataset ID of the requested table to be updated
+        :type dataset_id: str
+        :param table_id: the table ID of the table to be updated
+        :type table_id: str
+        :param schema_fields_updates: a partial schema resource. see
+            https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#TableSchema
+
+        **Example**: ::
+
+            schema_fields_updates=[
+                {"name": "emp_name", "description": "Some New Description"},
+                {"name": "salary", "description": "Some New Description"},
+                {"name": "departments", "fields": [
+                    {"name": "name", "description": "Some New Description"},
+                    {"name": "type", "description": "Some New Description"}
+                ]},
+            ]
+
+        :type schema_fields_updates: List[dict]
+        :param project_id: The name of the project where we want to update the table.
+        :type project_id: str
+        """
+
+        def _build_new_schema(
+            current_schema: List[Dict[str, Any]], schema_fields_updates: List[Dict[str, Any]]
+        ) -> List[Dict[str, Any]]:
+
+            # Turn schema_field_updates into a dict keyed on field names
+            schema_fields_updates = {field["name"]: field for field in deepcopy(schema_fields_updates)}
+
+            # Create a new dict for storing the new schema, initated based on the current_schema
+            # as of Python 3.6, dicts retain order.
+            new_schema = {field["name"]: field for field in deepcopy(current_schema)}
+
+            # Each item in schema_fields_updates contains a potential patch
+            # to a schema field, iterate over them
+            for field_name, patched_value in schema_fields_updates.items():
+                # If this field already exists, update it
+                if field_name in new_schema:
+                    # If this field is of type RECORD and has a fields key we need to patch it recursively
+                    if "fields" in patched_value:
+                        patched_value["fields"] = _build_new_schema(
+                            new_schema[field_name]["fields"], patched_value["fields"]
+                        )
+                    # Update the new_schema with the patched value
+                    new_schema[field_name].update(patched_value)
+                # This is a new field, just include the whole configuration for it
+                else:
+                    new_schema[field_name] = patched_value
+
+            return list(new_schema.values())
+
+        current_table_schema = self.get_schema(

Review comment:
       Well, if the current table has policyTags and you filter them out from the schema included in the update, it is my understanding that you will in effect delete them - which you need access to do. Leaving them unchanged may work though I haven't actually tried. Worst case, you will need additional access level to modify any schema on a table with policyTags.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] thejens commented on a change in pull request #15367: Implement BigQuery Table Schema Update Operator

Posted by GitBox <gi...@apache.org>.
thejens commented on a change in pull request #15367:
URL: https://github.com/apache/airflow/pull/15367#discussion_r620447804



##########
File path: airflow/providers/google/cloud/hooks/bigquery.py
##########
@@ -1353,6 +1353,86 @@ def get_schema(self, dataset_id: str, table_id: str, project_id: Optional[str] =
         table = self.get_client(project_id=project_id).get_table(table_ref)
         return {"fields": [s.to_api_repr() for s in table.schema]}
 
+    @GoogleBaseHook.fallback_to_default_project_id
+    def update_table_schema(
+        self,
+        schema_fields_updates: List[Dict[str, Any]],
+        dataset_id: str,
+        table_id: str,
+        project_id: Optional[str] = None,
+    ) -> None:
+        """
+        Update fields within a schema for a given dataset and table. Note that
+        some fields in schemas are immutable and trying to change them will cause
+        an exception.
+        If a new field is included it will be inserted which requires all required fields to be set.
+        See https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#TableSchema
+
+        :param dataset_id: the dataset ID of the requested table to be updated
+        :type dataset_id: str
+        :param table_id: the table ID of the table to be updated
+        :type table_id: str
+        :param schema_fields_updates: a partial schema resource. see
+            https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#TableSchema
+
+        **Example**: ::
+
+            schema_fields_updates=[
+                {"name": "emp_name", "description": "Some New Description"},
+                {"name": "salary", "description": "Some New Description"},
+                {"name": "departments", "fields": [
+                    {"name": "name", "description": "Some New Description"},
+                    {"name": "type", "description": "Some New Description"}
+                ]},
+            ]
+
+        :type schema_fields_updates: List[dict]
+        :param project_id: The name of the project where we want to update the table.
+        :type project_id: str
+        """
+
+        def _build_new_schema(
+            current_schema: List[Dict[str, Any]], schema_fields_updates: List[Dict[str, Any]]
+        ) -> List[Dict[str, Any]]:
+
+            # Turn schema_field_updates into a dict keyed on field names
+            schema_fields_updates = {field["name"]: field for field in deepcopy(schema_fields_updates)}
+
+            # Create a new dict for storing the new schema, initated based on the current_schema
+            # as of Python 3.6, dicts retain order.
+            new_schema = {field["name"]: field for field in deepcopy(current_schema)}
+
+            # Each item in schema_fields_updates contains a potential patch
+            # to a schema field, iterate over them
+            for field_name, patched_value in schema_fields_updates.items():
+                # If this field already exists, update it
+                if field_name in new_schema:
+                    # If this field is of type RECORD and has a fields key we need to patch it recursively
+                    if "fields" in patched_value:
+                        patched_value["fields"] = _build_new_schema(
+                            new_schema[field_name]["fields"], patched_value["fields"]
+                        )
+                    # Update the new_schema with the patched value
+                    new_schema[field_name].update(patched_value)
+                # This is a new field, just include the whole configuration for it
+                else:
+                    new_schema[field_name] = patched_value
+
+            return list(new_schema.values())
+
+        current_table_schema = self.get_schema(

Review comment:
       I have now added a parameter to control whether to include policyTags in the update of the schema.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] thejens commented on a change in pull request #15367: Implement BigQuery Table Schema Patch Operator

Posted by GitBox <gi...@apache.org>.
thejens commented on a change in pull request #15367:
URL: https://github.com/apache/airflow/pull/15367#discussion_r614776461



##########
File path: airflow/providers/google/cloud/operators/bigquery.py
##########
@@ -2039,6 +2039,149 @@ def execute(self, context) -> None:
         )
 
 
+class BigQueryPatchTableSchemaOperator(BaseOperator):
+    """
+    Patch BigQuery Table Schema
+    Updates fields on a table schema based on contents of the supplied schema
+    parameter. The supplied schema does not need to be complete, if the field
+    already exists in the schema you only need to supply a schema with the
+    fields you want to patch and the "name" key set on the schema resource.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the guide:
+        :ref:`howto/operator:BigQueryPatchTableSchemaOperator`
+
+    :param dataset_id: A dotted
+        ``(<project>.|<project>:)<dataset>`` that indicates which dataset
+        will be updated. (templated)
+    :type dataset_id: str
+    :param schema_fields: a partial schema resource. see
+        https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#TableSchema
+
+    **Example**: ::
+
+        schema_fields=[
+            {"name": "emp_name", "description": "Some New Description"},
+            {"name": "salary", "description": "Some New Description"},
+            {"name": "departments", "fields": [
+                {"name": "name", "description": "Some New Description"},
+                {"name": "type", "description": "Some New Description"}
+            ]},
+        ]
+
+    :type schema_fields: dict
+    :param project_id: The name of the project where we want to update the dataset.
+        Don't need to provide, if projectId in dataset_reference.
+    :type project_id: str
+    :param gcp_conn_id: (Optional) The connection ID used to connect to Google Cloud.
+    :type gcp_conn_id: str
+    :param bigquery_conn_id: (Deprecated) The connection ID used to connect to Google Cloud.
+        This parameter has been deprecated. You should pass the gcp_conn_id parameter instead.
+    :type bigquery_conn_id: str
+    :param delegate_to: The account to impersonate, if any.
+        For this to work, the service account making the request must have domain-wide
+        delegation enabled.
+    :type delegate_to: str
+    :param location: The location used for the operation.
+    :type location: str
+    :param impersonation_chain: Optional service account to impersonate using short-term
+        credentials, or chained list of accounts required to get the access_token
+        of the last account in the list, which will be impersonated in the request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding identity, with first
+        account from the list granting this role to the originating account (templated).
+    :type impersonation_chain: Union[str, Sequence[str]]
+    """
+
+    template_fields = (
+        'schema_fields',
+        'dataset_id',
+        'table_id',
+        'project_id',
+        'impersonation_chain',
+    )
+    template_fields_renderers = {"schema_fields": "json"}
+    ui_color = BigQueryUIColors.TABLE.value
+
+    @classmethod
+    def _patch_schema(cls, old_schema, new_schema):
+        # Updates the content of "old_schema" with
+        # the fields from new_schema. Makes changes
+        # in place and hence has no return value.
+        # Works recursively for sub-records
+
+        # Start by turning the schema list of fields into
+        # a dict keyed on the field name for both the old
+        # and the new schema
+        new_fields = {field["name"]: field for field in new_schema["fields"] if "name" in field}
+        old_fields = {field["name"]: field for field in old_schema["fields"]}
+
+        # Iterate over all new fields and update the
+        # old_schema dict
+        for field_key in new_fields.keys():
+            # Check if the field exists in the old_schema, if
+            # so change it
+            if field_key in old_fields:
+                old_field = old_fields[field_key]
+                new_field = new_fields[field_key]
+                # Check if recursion is needed
+                if "fields" in new_field:
+                    cls._patch_schema(old_field, new_field)
+                    del new_field["fields"]
+
+                # Do the update
+                old_field.update(new_field)
+
+            # Field didn't exist, add it as a new field
+            else:
+                old_schema["fields"].append(new_fields[field_key])
+
+    @apply_defaults
+    def __init__(
+        self,
+        *,
+        dataset_id: Optional[str] = None,
+        schema_fields: List[Dict[str, Any]],
+        table_id: Optional[str] = None,
+        project_id: Optional[str] = None,
+        gcp_conn_id: str = 'google_cloud_default',
+        delegate_to: Optional[str] = None,
+        impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+        **kwargs,
+    ) -> None:
+        self.schema_fields = schema_fields
+        self.table_id = table_id
+        self.dataset_id = dataset_id
+        self.project_id = project_id
+        self.gcp_conn_id = gcp_conn_id
+        self.delegate_to = delegate_to
+        self.impersonation_chain = impersonation_chain
+        super().__init__(**kwargs)
+
+    def execute(self, context):
+        bq_hook = BigQueryHook(
+            gcp_conn_id=self.gcp_conn_id,
+            delegate_to=self.delegate_to,
+            impersonation_chain=self.impersonation_chain,
+        )
+
+        table_schema = bq_hook.get_schema(
+            dataset_id=self.dataset_id, table_id=self.table_id, project_id=self.project_id
+        )
+
+        self._patch_schema(old_schema=table_schema, new_schema={"fields": self.schema_fields})
+
+        return bq_hook.update_table(
+            table_resource={"schema": table_schema},
+            fields=["schema"],
+            dataset_id=self.dataset_id,
+            table_id=self.table_id,
+            project_id=self.project_id,
+        )

Review comment:
       _patch_schema doesn't actually patch the schema on the table, so it doesn't interface with BigQuery, what it does is to take a schema object and alter the fields to be changed, after that the new schema is used as input to update_table where the actual call to the bigquery api is made. The leading underscore in the method indicates it is a private method not intended to be re-used outside this class, however it has to be broken out in a funciton for recursion to work properly




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] thejens commented on a change in pull request #15367: Implement BigQuery Table Schema Update Operator

Posted by GitBox <gi...@apache.org>.
thejens commented on a change in pull request #15367:
URL: https://github.com/apache/airflow/pull/15367#discussion_r620395703



##########
File path: airflow/providers/google/cloud/hooks/bigquery.py
##########
@@ -1353,6 +1353,86 @@ def get_schema(self, dataset_id: str, table_id: str, project_id: Optional[str] =
         table = self.get_client(project_id=project_id).get_table(table_ref)
         return {"fields": [s.to_api_repr() for s in table.schema]}
 
+    @GoogleBaseHook.fallback_to_default_project_id
+    def update_table_schema(
+        self,
+        schema_fields_updates: List[Dict[str, Any]],
+        dataset_id: str,
+        table_id: str,
+        project_id: Optional[str] = None,
+    ) -> None:
+        """
+        Update fields within a schema for a given dataset and table. Note that
+        some fields in schemas are immutable and trying to change them will cause
+        an exception.
+        If a new field is included it will be inserted which requires all required fields to be set.
+        See https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#TableSchema
+
+        :param dataset_id: the dataset ID of the requested table to be updated
+        :type dataset_id: str
+        :param table_id: the table ID of the table to be updated
+        :type table_id: str
+        :param schema_fields_updates: a partial schema resource. see
+            https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#TableSchema
+
+        **Example**: ::
+
+            schema_fields_updates=[
+                {"name": "emp_name", "description": "Some New Description"},
+                {"name": "salary", "description": "Some New Description"},
+                {"name": "departments", "fields": [
+                    {"name": "name", "description": "Some New Description"},
+                    {"name": "type", "description": "Some New Description"}
+                ]},
+            ]
+
+        :type schema_fields_updates: List[dict]
+        :param project_id: The name of the project where we want to update the table.
+        :type project_id: str
+        """
+
+        def _build_new_schema(
+            current_schema: List[Dict[str, Any]], schema_fields_updates: List[Dict[str, Any]]
+        ) -> List[Dict[str, Any]]:
+
+            # Turn schema_field_updates into a dict keyed on field names
+            schema_fields_updates = {field["name"]: field for field in deepcopy(schema_fields_updates)}
+
+            # Create a new dict for storing the new schema, initated based on the current_schema
+            # as of Python 3.6, dicts retain order.
+            new_schema = {field["name"]: field for field in deepcopy(current_schema)}
+
+            # Each item in schema_fields_updates contains a potential patch
+            # to a schema field, iterate over them
+            for field_name, patched_value in schema_fields_updates.items():
+                # If this field already exists, update it
+                if field_name in new_schema:
+                    # If this field is of type RECORD and has a fields key we need to patch it recursively
+                    if "fields" in patched_value:
+                        patched_value["fields"] = _build_new_schema(
+                            new_schema[field_name]["fields"], patched_value["fields"]
+                        )
+                    # Update the new_schema with the patched value
+                    new_schema[field_name].update(patched_value)
+                # This is a new field, just include the whole configuration for it
+                else:
+                    new_schema[field_name] = patched_value
+
+            return list(new_schema.values())
+
+        current_table_schema = self.get_schema(

Review comment:
       I see, I found the description and policyTags fields are optional on the update_request and will only update values if present, otherwise the old value will remain. That is different from "name" and "type" which are required and immutable. I take it there is use for a parameter on the operator to determine whether to include policyTags in the update. I will go ahead and add that. Thanks for the feedback, and today I learned something new!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] thejens commented on pull request #15367: Implement BigQuery Table Schema Patch Operator

Posted by GitBox <gi...@apache.org>.
thejens commented on pull request #15367:
URL: https://github.com/apache/airflow/pull/15367#issuecomment-822405882


   @marcosmarxm 
   Before I change more; I believe these are the three outstanding items from your review. How strongly do you feel about these? As in - before I go ahead and modify I'm keen to present some "why's" for my decisions...
   
   How important would you say it is to move functionality into a hook? Solving the use-case was easy using two already existing hooks and I don't think I need to create an additional hook given I don't implement any new functionality in how we talk with BigQuery
   
   Further I think mutating a mutable object shouldn't be a problem in this case and I am not too keen to deepcopy it for the sake of it.
   
   Lastly the naming of the schema_fields parameter, I wanted to be consistent with naming and input format of that field between this and other operators, do you think it's a problem? Changing it into updates_to_schema_fields or similar is an easy possibility.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] marcosmarxm commented on a change in pull request #15367: Implement BigQuery Table Schema Patch Operator

Posted by GitBox <gi...@apache.org>.
marcosmarxm commented on a change in pull request #15367:
URL: https://github.com/apache/airflow/pull/15367#discussion_r614882725



##########
File path: airflow/providers/google/cloud/operators/bigquery.py
##########
@@ -2039,6 +2039,155 @@ def execute(self, context) -> None:
         )
 
 
+class BigQueryPatchTableSchemaOperator(BaseOperator):
+    """
+    Patch BigQuery Table Schema
+    Updates fields on a table schema based on contents of the supplied schema
+    parameter. The supplied schema does not need to be complete, if the field
+    already exists in the schema you only need to supply a schema with the
+    fields you want to patch and the "name" key set on the schema resource.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the guide:
+        :ref:`howto/operator:BigQueryPatchTableSchemaOperator`
+
+    :param dataset_id: A dotted
+        ``(<project>.|<project>:)<dataset>`` that indicates which dataset
+        will be updated. (templated)
+    :type dataset_id: str
+    :param schema_fields: a partial schema resource. see
+        https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#TableSchema
+
+    **Example**: ::
+
+        schema_fields=[
+            {"name": "emp_name", "description": "Some New Description"},
+            {"name": "salary", "description": "Some New Description"},
+            {"name": "departments", "fields": [
+                {"name": "name", "description": "Some New Description"},
+                {"name": "type", "description": "Some New Description"}
+            ]},
+        ]
+
+    :type schema_fields: dict
+    :param project_id: The name of the project where we want to update the dataset.
+        Don't need to provide, if projectId in dataset_reference.
+    :type project_id: str
+    :param gcp_conn_id: (Optional) The connection ID used to connect to Google Cloud.
+    :type gcp_conn_id: str
+    :param bigquery_conn_id: (Deprecated) The connection ID used to connect to Google Cloud.
+        This parameter has been deprecated. You should pass the gcp_conn_id parameter instead.
+    :type bigquery_conn_id: str
+    :param delegate_to: The account to impersonate, if any.
+        For this to work, the service account making the request must have domain-wide
+        delegation enabled.
+    :type delegate_to: str
+    :param location: The location used for the operation.
+    :type location: str
+    :param impersonation_chain: Optional service account to impersonate using short-term
+        credentials, or chained list of accounts required to get the access_token
+        of the last account in the list, which will be impersonated in the request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding identity, with first
+        account from the list granting this role to the originating account (templated).
+    :type impersonation_chain: Union[str, Sequence[str]]
+    """
+
+    template_fields = (
+        'schema_fields',
+        'dataset_id',
+        'table_id',
+        'project_id',
+        'impersonation_chain',
+    )
+    template_fields_renderers = {"schema_fields": "json"}
+    ui_color = BigQueryUIColors.TABLE.value
+
+    @classmethod
+    def _patch_schema(cls, old_schema: Dict, new_schema: Dict) -> None:
+        """
+        Updates the content of "old_schema" with
+        the fields from new_schema. Makes changes
+        in place and hence has no return value.
+        Works recursively for sub-records.
+        Start by turning the schema list of fields into
+        a dict keyed on the field name for both the old
+        and the new schema.
+
+        :param old_schema: Old schema which is updated in-place
+        :type old_schema: dict
+        :param new_schema: Partial schema definition used to patch old schema
+        :type new_schema: dict
+        """
+        new_fields = {field["name"]: field for field in new_schema["fields"] if "name" in field}
+        old_fields = {field["name"]: field for field in old_schema["fields"]}
+
+        # Iterate over all new fields and update the
+        # old_schema dict
+        for field_key in new_fields.keys():
+            # Check if the field exists in the old_schema, if
+            # so change it
+            if field_key in old_fields:
+                old_field = old_fields[field_key]
+                new_field = new_fields[field_key]
+                # Check if recursion is needed
+                if "fields" in new_field:
+                    cls._patch_schema(old_field, new_field)
+                    del new_field["fields"]
+
+                # Do the update
+                old_field.update(new_field)
+
+            # Field didn't exist, add it as a new field
+            else:
+                old_schema["fields"].append(new_fields[field_key])
+
+    @apply_defaults
+    def __init__(
+        self,
+        *,
+        dataset_id: Optional[str] = None,
+        schema_fields: List[Dict[str, Any]],
+        table_id: Optional[str] = None,
+        project_id: Optional[str] = None,
+        gcp_conn_id: str = 'google_cloud_default',
+        delegate_to: Optional[str] = None,
+        impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+        **kwargs,
+    ) -> None:
+        self.schema_fields = schema_fields
+        self.table_id = table_id
+        self.dataset_id = dataset_id
+        self.project_id = project_id
+        self.gcp_conn_id = gcp_conn_id
+        self.delegate_to = delegate_to
+        self.impersonation_chain = impersonation_chain
+        super().__init__(**kwargs)
+
+    def execute(self, context):
+        bq_hook = BigQueryHook(
+            gcp_conn_id=self.gcp_conn_id,
+            delegate_to=self.delegate_to,
+            impersonation_chain=self.impersonation_chain,
+        )
+
+        table_schema = bq_hook.get_schema(
+            dataset_id=self.dataset_id, table_id=self.table_id, project_id=self.project_id
+        )
+
+        self._patch_schema(old_schema=table_schema, new_schema={"fields": self.schema_fields})

Review comment:
       in-place modification of the variable can very tricky, why not create a new variable `updated_schema`?

##########
File path: airflow/providers/google/cloud/operators/bigquery.py
##########
@@ -2039,6 +2039,149 @@ def execute(self, context) -> None:
         )
 
 
+class BigQueryPatchTableSchemaOperator(BaseOperator):
+    """
+    Patch BigQuery Table Schema
+    Updates fields on a table schema based on contents of the supplied schema
+    parameter. The supplied schema does not need to be complete, if the field
+    already exists in the schema you only need to supply a schema with the
+    fields you want to patch and the "name" key set on the schema resource.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the guide:
+        :ref:`howto/operator:BigQueryPatchTableSchemaOperator`
+
+    :param dataset_id: A dotted
+        ``(<project>.|<project>:)<dataset>`` that indicates which dataset
+        will be updated. (templated)
+    :type dataset_id: str
+    :param schema_fields: a partial schema resource. see
+        https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#TableSchema
+
+    **Example**: ::
+
+        schema_fields=[
+            {"name": "emp_name", "description": "Some New Description"},
+            {"name": "salary", "description": "Some New Description"},
+            {"name": "departments", "fields": [
+                {"name": "name", "description": "Some New Description"},
+                {"name": "type", "description": "Some New Description"}
+            ]},
+        ]
+
+    :type schema_fields: dict
+    :param project_id: The name of the project where we want to update the dataset.
+        Don't need to provide, if projectId in dataset_reference.
+    :type project_id: str
+    :param gcp_conn_id: (Optional) The connection ID used to connect to Google Cloud.
+    :type gcp_conn_id: str
+    :param bigquery_conn_id: (Deprecated) The connection ID used to connect to Google Cloud.
+        This parameter has been deprecated. You should pass the gcp_conn_id parameter instead.
+    :type bigquery_conn_id: str
+    :param delegate_to: The account to impersonate, if any.
+        For this to work, the service account making the request must have domain-wide
+        delegation enabled.
+    :type delegate_to: str
+    :param location: The location used for the operation.
+    :type location: str
+    :param impersonation_chain: Optional service account to impersonate using short-term
+        credentials, or chained list of accounts required to get the access_token
+        of the last account in the list, which will be impersonated in the request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding identity, with first
+        account from the list granting this role to the originating account (templated).
+    :type impersonation_chain: Union[str, Sequence[str]]
+    """
+
+    template_fields = (
+        'schema_fields',
+        'dataset_id',
+        'table_id',
+        'project_id',
+        'impersonation_chain',
+    )
+    template_fields_renderers = {"schema_fields": "json"}
+    ui_color = BigQueryUIColors.TABLE.value
+
+    @classmethod
+    def _patch_schema(cls, old_schema, new_schema):
+        # Updates the content of "old_schema" with
+        # the fields from new_schema. Makes changes
+        # in place and hence has no return value.
+        # Works recursively for sub-records
+
+        # Start by turning the schema list of fields into
+        # a dict keyed on the field name for both the old
+        # and the new schema
+        new_fields = {field["name"]: field for field in new_schema["fields"] if "name" in field}
+        old_fields = {field["name"]: field for field in old_schema["fields"]}
+
+        # Iterate over all new fields and update the
+        # old_schema dict
+        for field_key in new_fields.keys():
+            # Check if the field exists in the old_schema, if
+            # so change it
+            if field_key in old_fields:
+                old_field = old_fields[field_key]
+                new_field = new_fields[field_key]
+                # Check if recursion is needed
+                if "fields" in new_field:
+                    cls._patch_schema(old_field, new_field)
+                    del new_field["fields"]
+
+                # Do the update
+                old_field.update(new_field)
+
+            # Field didn't exist, add it as a new field
+            else:
+                old_schema["fields"].append(new_fields[field_key])
+
+    @apply_defaults
+    def __init__(
+        self,
+        *,
+        dataset_id: Optional[str] = None,
+        schema_fields: List[Dict[str, Any]],
+        table_id: Optional[str] = None,
+        project_id: Optional[str] = None,
+        gcp_conn_id: str = 'google_cloud_default',
+        delegate_to: Optional[str] = None,
+        impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+        **kwargs,
+    ) -> None:
+        self.schema_fields = schema_fields
+        self.table_id = table_id
+        self.dataset_id = dataset_id
+        self.project_id = project_id
+        self.gcp_conn_id = gcp_conn_id
+        self.delegate_to = delegate_to
+        self.impersonation_chain = impersonation_chain
+        super().__init__(**kwargs)
+
+    def execute(self, context):
+        bq_hook = BigQueryHook(
+            gcp_conn_id=self.gcp_conn_id,
+            delegate_to=self.delegate_to,
+            impersonation_chain=self.impersonation_chain,
+        )
+
+        table_schema = bq_hook.get_schema(
+            dataset_id=self.dataset_id, table_id=self.table_id, project_id=self.project_id
+        )
+
+        self._patch_schema(old_schema=table_schema, new_schema={"fields": self.schema_fields})
+
+        return bq_hook.update_table(
+            table_resource={"schema": table_schema},
+            fields=["schema"],
+            dataset_id=self.dataset_id,
+            table_id=self.table_id,
+            project_id=self.project_id,
+        )

Review comment:
       Hooks are interfaces for Operators. The concept of [Operator](https://airflow.apache.org/docs/apache-airflow/stable/concepts.html#operators) and [Hook](https://airflow.apache.org/docs/apache-airflow/stable/concepts.html#hooks). Take a look at all others Operators in the BigQuery file.
   
   IMHO, changing the function `_patch_schema` to BigqueryHook and create a public function called `update_schema` will be more useful in the future. Also change the name is important all other functions called `patch` are deprecated and now use `update` instead, you already use the update term in the documentation.
   @turbaszek what do you think about it? 
   

##########
File path: airflow/providers/google/cloud/operators/bigquery.py
##########
@@ -2039,6 +2039,155 @@ def execute(self, context) -> None:
         )
 
 
+class BigQueryPatchTableSchemaOperator(BaseOperator):
+    """
+    Patch BigQuery Table Schema
+    Updates fields on a table schema based on contents of the supplied schema
+    parameter. The supplied schema does not need to be complete, if the field
+    already exists in the schema you only need to supply a schema with the
+    fields you want to patch and the "name" key set on the schema resource.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the guide:
+        :ref:`howto/operator:BigQueryPatchTableSchemaOperator`
+
+    :param dataset_id: A dotted
+        ``(<project>.|<project>:)<dataset>`` that indicates which dataset
+        will be updated. (templated)
+    :type dataset_id: str
+    :param schema_fields: a partial schema resource. see
+        https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#TableSchema
+
+    **Example**: ::
+
+        schema_fields=[
+            {"name": "emp_name", "description": "Some New Description"},
+            {"name": "salary", "description": "Some New Description"},
+            {"name": "departments", "fields": [
+                {"name": "name", "description": "Some New Description"},
+                {"name": "type", "description": "Some New Description"}
+            ]},
+        ]
+
+    :type schema_fields: dict

Review comment:
       `List[dict]`

##########
File path: airflow/providers/google/cloud/operators/bigquery.py
##########
@@ -2039,6 +2039,155 @@ def execute(self, context) -> None:
         )
 
 
+class BigQueryPatchTableSchemaOperator(BaseOperator):
+    """
+    Patch BigQuery Table Schema
+    Updates fields on a table schema based on contents of the supplied schema
+    parameter. The supplied schema does not need to be complete, if the field
+    already exists in the schema you only need to supply a schema with the
+    fields you want to patch and the "name" key set on the schema resource.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the guide:
+        :ref:`howto/operator:BigQueryPatchTableSchemaOperator`
+
+    :param dataset_id: A dotted
+        ``(<project>.|<project>:)<dataset>`` that indicates which dataset
+        will be updated. (templated)
+    :type dataset_id: str
+    :param schema_fields: a partial schema resource. see
+        https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#TableSchema
+
+    **Example**: ::
+
+        schema_fields=[
+            {"name": "emp_name", "description": "Some New Description"},
+            {"name": "salary", "description": "Some New Description"},
+            {"name": "departments", "fields": [
+                {"name": "name", "description": "Some New Description"},
+                {"name": "type", "description": "Some New Description"}
+            ]},
+        ]
+
+    :type schema_fields: dict
+    :param project_id: The name of the project where we want to update the dataset.
+        Don't need to provide, if projectId in dataset_reference.
+    :type project_id: str
+    :param gcp_conn_id: (Optional) The connection ID used to connect to Google Cloud.
+    :type gcp_conn_id: str
+    :param bigquery_conn_id: (Deprecated) The connection ID used to connect to Google Cloud.
+        This parameter has been deprecated. You should pass the gcp_conn_id parameter instead.
+    :type bigquery_conn_id: str
+    :param delegate_to: The account to impersonate, if any.
+        For this to work, the service account making the request must have domain-wide
+        delegation enabled.
+    :type delegate_to: str
+    :param location: The location used for the operation.
+    :type location: str
+    :param impersonation_chain: Optional service account to impersonate using short-term
+        credentials, or chained list of accounts required to get the access_token
+        of the last account in the list, which will be impersonated in the request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding identity, with first
+        account from the list granting this role to the originating account (templated).
+    :type impersonation_chain: Union[str, Sequence[str]]
+    """
+
+    template_fields = (
+        'schema_fields',
+        'dataset_id',
+        'table_id',
+        'project_id',
+        'impersonation_chain',
+    )
+    template_fields_renderers = {"schema_fields": "json"}
+    ui_color = BigQueryUIColors.TABLE.value
+
+    @classmethod
+    def _patch_schema(cls, old_schema: Dict, new_schema: Dict) -> None:
+        """
+        Updates the content of "old_schema" with
+        the fields from new_schema. Makes changes
+        in place and hence has no return value.
+        Works recursively for sub-records.
+        Start by turning the schema list of fields into
+        a dict keyed on the field name for both the old
+        and the new schema.
+
+        :param old_schema: Old schema which is updated in-place
+        :type old_schema: dict
+        :param new_schema: Partial schema definition used to patch old schema
+        :type new_schema: dict
+        """
+        new_fields = {field["name"]: field for field in new_schema["fields"] if "name" in field}
+        old_fields = {field["name"]: field for field in old_schema["fields"]}
+
+        # Iterate over all new fields and update the
+        # old_schema dict
+        for field_key in new_fields.keys():
+            # Check if the field exists in the old_schema, if
+            # so change it
+            if field_key in old_fields:
+                old_field = old_fields[field_key]
+                new_field = new_fields[field_key]
+                # Check if recursion is needed
+                if "fields" in new_field:
+                    cls._patch_schema(old_field, new_field)
+                    del new_field["fields"]
+
+                # Do the update
+                old_field.update(new_field)
+
+            # Field didn't exist, add it as a new field
+            else:
+                old_schema["fields"].append(new_fields[field_key])
+
+    @apply_defaults
+    def __init__(
+        self,
+        *,
+        dataset_id: Optional[str] = None,
+        schema_fields: List[Dict[str, Any]],

Review comment:
       Because your Operator uses this as the new schema, is it better to rename it to something more intuitive? `new_schema_fields`?

##########
File path: airflow/providers/google/cloud/operators/bigquery.py
##########
@@ -2039,6 +2039,155 @@ def execute(self, context) -> None:
         )
 
 
+class BigQueryPatchTableSchemaOperator(BaseOperator):

Review comment:
       `Patch*Operator` are deprecated call `BigQueryUpdateTableSchemaOperator` to keep actual convention.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] marcosmarxm commented on pull request #15367: Implement BigQuery Table Schema Patch Operator

Posted by GitBox <gi...@apache.org>.
marcosmarxm commented on pull request #15367:
URL: https://github.com/apache/airflow/pull/15367#issuecomment-822602690


   > @marcosmarxm
   > Before I change more; I believe these are the three outstanding items from your review. How strongly do you feel about these? As in - before I go ahead and modify I'm keen to present some "why's" for my decisions...
   > 
   > How important would you say it is to move functionality into a hook? Solving the use-case was easy using two already existing hooks and I don't think I need to create an additional hook given I don't implement any new functionality in how we talk with BigQuery
   > 
   > Further I think mutating a mutable object shouldn't be a problem in this case and I am not too keen to deepcopy it for the sake of it.
   > 
   > Lastly the naming of the schema_fields parameter, I wanted to be consistent with naming and input format of that field between this and other operators, do you think it's a problem? Changing it into updates_to_schema_fields or similar is an easy possibility.
   
   1. This is my point of view, but makes more sense you create a function call `update_schema` in the BigQueryHook. Why? Follow the convention, all functions that interact with bigquery are in the Hook not with Operators, in the future if someone search for a method to update_schema in the hook they won't find it. I know that the function you created doesn't interact directly with bigquery but you are creating the action `update_schema` and this interacts. Imagine a user reading the docs: [BigQuery Hook](https://airflow.apache.org/docs/apache-airflow-providers-google/stable/_api/airflow/providers/google/cloud/hooks/bigquery/index.html) and [BigQuery Operators](https://airflow.apache.org/docs/apache-airflow-providers-google/stable/_api/airflow/providers/google/cloud/operators/bigquery/index.html). Maybe you can talk to a PMC/Committer to get better feedback on this topic =D
   2. It's not a problem and maybe tricky is not the best word to describe. It's just a suggestion to be more readable and easy to follow the `execute` flow. Again maybe a PMC/Committer can you better on this also
   3. I read other operators and it's not a problem keeping the `schema_fields`. It's a suggestion to think more about users because keep the same name users think they need to send the info as in other Operators? (so it's a minor in my opinion)
   
   The CI broke because of pylint can you run pre-commit locally to organize imports?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] thejens commented on a change in pull request #15367: Implement BigQuery Table Schema Patch Operator

Posted by GitBox <gi...@apache.org>.
thejens commented on a change in pull request #15367:
URL: https://github.com/apache/airflow/pull/15367#discussion_r614929411



##########
File path: airflow/providers/google/cloud/operators/bigquery.py
##########
@@ -2039,6 +2039,155 @@ def execute(self, context) -> None:
         )
 
 
+class BigQueryPatchTableSchemaOperator(BaseOperator):
+    """
+    Patch BigQuery Table Schema
+    Updates fields on a table schema based on contents of the supplied schema
+    parameter. The supplied schema does not need to be complete, if the field
+    already exists in the schema you only need to supply a schema with the
+    fields you want to patch and the "name" key set on the schema resource.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the guide:
+        :ref:`howto/operator:BigQueryPatchTableSchemaOperator`
+
+    :param dataset_id: A dotted
+        ``(<project>.|<project>:)<dataset>`` that indicates which dataset
+        will be updated. (templated)
+    :type dataset_id: str
+    :param schema_fields: a partial schema resource. see
+        https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#TableSchema
+
+    **Example**: ::
+
+        schema_fields=[
+            {"name": "emp_name", "description": "Some New Description"},
+            {"name": "salary", "description": "Some New Description"},
+            {"name": "departments", "fields": [
+                {"name": "name", "description": "Some New Description"},
+                {"name": "type", "description": "Some New Description"}
+            ]},
+        ]
+
+    :type schema_fields: dict
+    :param project_id: The name of the project where we want to update the dataset.
+        Don't need to provide, if projectId in dataset_reference.
+    :type project_id: str
+    :param gcp_conn_id: (Optional) The connection ID used to connect to Google Cloud.
+    :type gcp_conn_id: str
+    :param bigquery_conn_id: (Deprecated) The connection ID used to connect to Google Cloud.
+        This parameter has been deprecated. You should pass the gcp_conn_id parameter instead.
+    :type bigquery_conn_id: str
+    :param delegate_to: The account to impersonate, if any.
+        For this to work, the service account making the request must have domain-wide
+        delegation enabled.
+    :type delegate_to: str
+    :param location: The location used for the operation.
+    :type location: str
+    :param impersonation_chain: Optional service account to impersonate using short-term
+        credentials, or chained list of accounts required to get the access_token
+        of the last account in the list, which will be impersonated in the request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding identity, with first
+        account from the list granting this role to the originating account (templated).
+    :type impersonation_chain: Union[str, Sequence[str]]
+    """
+
+    template_fields = (
+        'schema_fields',
+        'dataset_id',
+        'table_id',
+        'project_id',
+        'impersonation_chain',
+    )
+    template_fields_renderers = {"schema_fields": "json"}
+    ui_color = BigQueryUIColors.TABLE.value
+
+    @classmethod
+    def _patch_schema(cls, old_schema: Dict, new_schema: Dict) -> None:
+        """
+        Updates the content of "old_schema" with
+        the fields from new_schema. Makes changes
+        in place and hence has no return value.
+        Works recursively for sub-records.
+        Start by turning the schema list of fields into
+        a dict keyed on the field name for both the old
+        and the new schema.
+
+        :param old_schema: Old schema which is updated in-place
+        :type old_schema: dict
+        :param new_schema: Partial schema definition used to patch old schema
+        :type new_schema: dict
+        """
+        new_fields = {field["name"]: field for field in new_schema["fields"] if "name" in field}
+        old_fields = {field["name"]: field for field in old_schema["fields"]}
+
+        # Iterate over all new fields and update the
+        # old_schema dict
+        for field_key in new_fields.keys():
+            # Check if the field exists in the old_schema, if
+            # so change it
+            if field_key in old_fields:
+                old_field = old_fields[field_key]
+                new_field = new_fields[field_key]
+                # Check if recursion is needed
+                if "fields" in new_field:
+                    cls._patch_schema(old_field, new_field)
+                    del new_field["fields"]
+
+                # Do the update
+                old_field.update(new_field)
+
+            # Field didn't exist, add it as a new field
+            else:
+                old_schema["fields"].append(new_fields[field_key])
+
+    @apply_defaults
+    def __init__(
+        self,
+        *,
+        dataset_id: Optional[str] = None,
+        schema_fields: List[Dict[str, Any]],

Review comment:
       Wanted to be consistend with naming in other operators, can change it if you think intuitive > consistency




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] thejens commented on a change in pull request #15367: Implement BigQuery Table Schema Patch Operator

Posted by GitBox <gi...@apache.org>.
thejens commented on a change in pull request #15367:
URL: https://github.com/apache/airflow/pull/15367#discussion_r614236784



##########
File path: airflow/providers/google/cloud/operators/bigquery.py
##########
@@ -2039,6 +2039,149 @@ def execute(self, context) -> None:
         )
 
 
+class BigQueryPatchTableSchemaOperator(BaseOperator):
+    """
+    Patch BigQuery Table Schema
+    Updates fields on a table schema based on contents of the supplied schema
+    parameter. The supplied schema does not need to be complete, if the field
+    already exists in the schema you only need to supply a schema with the
+    fields you want to patch and the "name" key set on the schema resource.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the guide:
+        :ref:`howto/operator:BigQueryPatchTableSchemaOperator`
+
+    :param dataset_id: A dotted
+        ``(<project>.|<project>:)<dataset>`` that indicates which dataset
+        will be updated. (templated)
+    :type dataset_id: str
+    :param schema_fields: a partial schema resource. see
+        https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#TableSchema
+
+    **Example**: ::
+
+        schema_fields=[
+            {"name": "emp_name", "description": "Some New Description"},
+            {"name": "salary", "description": "Some New Description"},
+            {"name": "departments", "fields": [
+                {"name": "name", "description": "Some New Description"},
+                {"name": "type", "description": "Some New Description"}
+            ]},
+        ]
+
+    :type schema_fields: dict
+    :param project_id: The name of the project where we want to update the dataset.
+        Don't need to provide, if projectId in dataset_reference.
+    :type project_id: str
+    :param gcp_conn_id: (Optional) The connection ID used to connect to Google Cloud.
+    :type gcp_conn_id: str
+    :param bigquery_conn_id: (Deprecated) The connection ID used to connect to Google Cloud.
+        This parameter has been deprecated. You should pass the gcp_conn_id parameter instead.
+    :type bigquery_conn_id: str
+    :param delegate_to: The account to impersonate, if any.
+        For this to work, the service account making the request must have domain-wide
+        delegation enabled.
+    :type delegate_to: str
+    :param location: The location used for the operation.
+    :type location: str
+    :param impersonation_chain: Optional service account to impersonate using short-term
+        credentials, or chained list of accounts required to get the access_token
+        of the last account in the list, which will be impersonated in the request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding identity, with first
+        account from the list granting this role to the originating account (templated).
+    :type impersonation_chain: Union[str, Sequence[str]]
+    """
+
+    template_fields = (
+        'schema_fields',
+        'dataset_id',
+        'table_id',
+        'project_id',
+        'impersonation_chain',
+    )
+    template_fields_renderers = {"schema_fields": "json"}
+    ui_color = BigQueryUIColors.TABLE.value
+
+    @classmethod
+    def _patch_schema(cls, old_schema, new_schema):
+        # Updates the content of "old_schema" with

Review comment:
       ... and it should work with policyTags as well, it would replace the (entire) current list with whatever list you supply




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] thejens commented on a change in pull request #15367: Implement BigQuery Table Schema Patch Operator

Posted by GitBox <gi...@apache.org>.
thejens commented on a change in pull request #15367:
URL: https://github.com/apache/airflow/pull/15367#discussion_r614231256



##########
File path: airflow/providers/google/cloud/operators/bigquery.py
##########
@@ -2039,6 +2039,149 @@ def execute(self, context) -> None:
         )
 
 
+class BigQueryPatchTableSchemaOperator(BaseOperator):
+    """
+    Patch BigQuery Table Schema
+    Updates fields on a table schema based on contents of the supplied schema
+    parameter. The supplied schema does not need to be complete, if the field
+    already exists in the schema you only need to supply a schema with the
+    fields you want to patch and the "name" key set on the schema resource.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the guide:
+        :ref:`howto/operator:BigQueryPatchTableSchemaOperator`
+
+    :param dataset_id: A dotted
+        ``(<project>.|<project>:)<dataset>`` that indicates which dataset
+        will be updated. (templated)
+    :type dataset_id: str
+    :param schema_fields: a partial schema resource. see
+        https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#TableSchema
+
+    **Example**: ::
+
+        schema_fields=[
+            {"name": "emp_name", "description": "Some New Description"},
+            {"name": "salary", "description": "Some New Description"},
+            {"name": "departments", "fields": [
+                {"name": "name", "description": "Some New Description"},
+                {"name": "type", "description": "Some New Description"}
+            ]},
+        ]
+
+    :type schema_fields: dict
+    :param project_id: The name of the project where we want to update the dataset.
+        Don't need to provide, if projectId in dataset_reference.
+    :type project_id: str
+    :param gcp_conn_id: (Optional) The connection ID used to connect to Google Cloud.
+    :type gcp_conn_id: str
+    :param bigquery_conn_id: (Deprecated) The connection ID used to connect to Google Cloud.
+        This parameter has been deprecated. You should pass the gcp_conn_id parameter instead.
+    :type bigquery_conn_id: str
+    :param delegate_to: The account to impersonate, if any.
+        For this to work, the service account making the request must have domain-wide
+        delegation enabled.
+    :type delegate_to: str
+    :param location: The location used for the operation.
+    :type location: str
+    :param impersonation_chain: Optional service account to impersonate using short-term
+        credentials, or chained list of accounts required to get the access_token
+        of the last account in the list, which will be impersonated in the request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding identity, with first
+        account from the list granting this role to the originating account (templated).
+    :type impersonation_chain: Union[str, Sequence[str]]
+    """
+
+    template_fields = (
+        'schema_fields',
+        'dataset_id',
+        'table_id',
+        'project_id',
+        'impersonation_chain',
+    )
+    template_fields_renderers = {"schema_fields": "json"}
+    ui_color = BigQueryUIColors.TABLE.value
+
+    @classmethod
+    def _patch_schema(cls, old_schema, new_schema):
+        # Updates the content of "old_schema" with

Review comment:
       Not sure I follow. The field type is immutable afaik, at least for tables with data in them. For doc-string you can add anything - it will replace whatever was in the doc-string before.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] thejens commented on a change in pull request #15367: Implement BigQuery Table Schema Patch Operator

Posted by GitBox <gi...@apache.org>.
thejens commented on a change in pull request #15367:
URL: https://github.com/apache/airflow/pull/15367#discussion_r614930112



##########
File path: airflow/providers/google/cloud/operators/bigquery.py
##########
@@ -2039,6 +2039,155 @@ def execute(self, context) -> None:
         )
 
 
+class BigQueryPatchTableSchemaOperator(BaseOperator):
+    """
+    Patch BigQuery Table Schema
+    Updates fields on a table schema based on contents of the supplied schema
+    parameter. The supplied schema does not need to be complete, if the field
+    already exists in the schema you only need to supply a schema with the
+    fields you want to patch and the "name" key set on the schema resource.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the guide:
+        :ref:`howto/operator:BigQueryPatchTableSchemaOperator`
+
+    :param dataset_id: A dotted
+        ``(<project>.|<project>:)<dataset>`` that indicates which dataset
+        will be updated. (templated)
+    :type dataset_id: str
+    :param schema_fields: a partial schema resource. see
+        https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#TableSchema
+
+    **Example**: ::
+
+        schema_fields=[
+            {"name": "emp_name", "description": "Some New Description"},
+            {"name": "salary", "description": "Some New Description"},
+            {"name": "departments", "fields": [
+                {"name": "name", "description": "Some New Description"},
+                {"name": "type", "description": "Some New Description"}
+            ]},
+        ]
+
+    :type schema_fields: dict
+    :param project_id: The name of the project where we want to update the dataset.
+        Don't need to provide, if projectId in dataset_reference.
+    :type project_id: str
+    :param gcp_conn_id: (Optional) The connection ID used to connect to Google Cloud.
+    :type gcp_conn_id: str
+    :param bigquery_conn_id: (Deprecated) The connection ID used to connect to Google Cloud.
+        This parameter has been deprecated. You should pass the gcp_conn_id parameter instead.
+    :type bigquery_conn_id: str
+    :param delegate_to: The account to impersonate, if any.
+        For this to work, the service account making the request must have domain-wide
+        delegation enabled.
+    :type delegate_to: str
+    :param location: The location used for the operation.
+    :type location: str
+    :param impersonation_chain: Optional service account to impersonate using short-term
+        credentials, or chained list of accounts required to get the access_token
+        of the last account in the list, which will be impersonated in the request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding identity, with first
+        account from the list granting this role to the originating account (templated).
+    :type impersonation_chain: Union[str, Sequence[str]]
+    """
+
+    template_fields = (
+        'schema_fields',
+        'dataset_id',
+        'table_id',
+        'project_id',
+        'impersonation_chain',
+    )
+    template_fields_renderers = {"schema_fields": "json"}
+    ui_color = BigQueryUIColors.TABLE.value
+
+    @classmethod
+    def _patch_schema(cls, old_schema: Dict, new_schema: Dict) -> None:
+        """
+        Updates the content of "old_schema" with
+        the fields from new_schema. Makes changes
+        in place and hence has no return value.
+        Works recursively for sub-records.
+        Start by turning the schema list of fields into
+        a dict keyed on the field name for both the old
+        and the new schema.
+
+        :param old_schema: Old schema which is updated in-place
+        :type old_schema: dict
+        :param new_schema: Partial schema definition used to patch old schema
+        :type new_schema: dict
+        """
+        new_fields = {field["name"]: field for field in new_schema["fields"] if "name" in field}
+        old_fields = {field["name"]: field for field in old_schema["fields"]}
+
+        # Iterate over all new fields and update the
+        # old_schema dict
+        for field_key in new_fields.keys():
+            # Check if the field exists in the old_schema, if
+            # so change it
+            if field_key in old_fields:
+                old_field = old_fields[field_key]
+                new_field = new_fields[field_key]
+                # Check if recursion is needed
+                if "fields" in new_field:
+                    cls._patch_schema(old_field, new_field)
+                    del new_field["fields"]
+
+                # Do the update
+                old_field.update(new_field)
+
+            # Field didn't exist, add it as a new field
+            else:
+                old_schema["fields"].append(new_fields[field_key])
+
+    @apply_defaults
+    def __init__(
+        self,
+        *,
+        dataset_id: Optional[str] = None,
+        schema_fields: List[Dict[str, Any]],
+        table_id: Optional[str] = None,
+        project_id: Optional[str] = None,
+        gcp_conn_id: str = 'google_cloud_default',
+        delegate_to: Optional[str] = None,
+        impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+        **kwargs,
+    ) -> None:
+        self.schema_fields = schema_fields
+        self.table_id = table_id
+        self.dataset_id = dataset_id
+        self.project_id = project_id
+        self.gcp_conn_id = gcp_conn_id
+        self.delegate_to = delegate_to
+        self.impersonation_chain = impersonation_chain
+        super().__init__(**kwargs)
+
+    def execute(self, context):
+        bq_hook = BigQueryHook(
+            gcp_conn_id=self.gcp_conn_id,
+            delegate_to=self.delegate_to,
+            impersonation_chain=self.impersonation_chain,
+        )
+
+        table_schema = bq_hook.get_schema(
+            dataset_id=self.dataset_id, table_id=self.table_id, project_id=self.project_id
+        )
+
+        self._patch_schema(old_schema=table_schema, new_schema={"fields": self.schema_fields})

Review comment:
       In order to introduce that variable I'd also need to introduce some version of deepcopy . I guess if I move it into a hook instead that's already in there.

##########
File path: airflow/providers/google/cloud/operators/bigquery.py
##########
@@ -2039,6 +2039,155 @@ def execute(self, context) -> None:
         )
 
 
+class BigQueryPatchTableSchemaOperator(BaseOperator):

Review comment:
       changed




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] tswast commented on a change in pull request #15367: Implement BigQuery Table Schema Update Operator

Posted by GitBox <gi...@apache.org>.
tswast commented on a change in pull request #15367:
URL: https://github.com/apache/airflow/pull/15367#discussion_r619501191



##########
File path: airflow/providers/google/cloud/hooks/bigquery.py
##########
@@ -1353,6 +1353,86 @@ def get_schema(self, dataset_id: str, table_id: str, project_id: Optional[str] =
         table = self.get_client(project_id=project_id).get_table(table_ref)
         return {"fields": [s.to_api_repr() for s in table.schema]}
 
+    @GoogleBaseHook.fallback_to_default_project_id
+    def update_table_schema(
+        self,
+        schema_fields_updates: List[Dict[str, Any]],
+        dataset_id: str,
+        table_id: str,
+        project_id: Optional[str] = None,
+    ) -> None:
+        """
+        Update fields within a schema for a given dataset and table. Note that
+        some fields in schemas are immutable and trying to change them will cause
+        an exception.
+        If a new field is included it will be inserted which requires all required fields to be set.
+        See https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#TableSchema
+
+        :param dataset_id: the dataset ID of the requested table to be updated
+        :type dataset_id: str
+        :param table_id: the table ID of the table to be updated
+        :type table_id: str
+        :param schema_fields_updates: a partial schema resource. see
+            https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#TableSchema
+
+        **Example**: ::
+
+            schema_fields_updates=[
+                {"name": "emp_name", "description": "Some New Description"},
+                {"name": "salary", "description": "Some New Description"},
+                {"name": "departments", "fields": [
+                    {"name": "name", "description": "Some New Description"},
+                    {"name": "type", "description": "Some New Description"}
+                ]},
+            ]
+
+        :type schema_fields_updates: List[dict]
+        :param project_id: The name of the project where we want to update the table.
+        :type project_id: str
+        """
+
+        def _build_new_schema(
+            current_schema: List[Dict[str, Any]], schema_fields_updates: List[Dict[str, Any]]
+        ) -> List[Dict[str, Any]]:
+
+            # Turn schema_field_updates into a dict keyed on field names
+            schema_fields_updates = {field["name"]: field for field in deepcopy(schema_fields_updates)}
+
+            # Create a new dict for storing the new schema, initated based on the current_schema
+            # as of Python 3.6, dicts retain order.
+            new_schema = {field["name"]: field for field in deepcopy(current_schema)}
+
+            # Each item in schema_fields_updates contains a potential patch
+            # to a schema field, iterate over them
+            for field_name, patched_value in schema_fields_updates.items():
+                # If this field already exists, update it
+                if field_name in new_schema:
+                    # If this field is of type RECORD and has a fields key we need to patch it recursively
+                    if "fields" in patched_value:
+                        patched_value["fields"] = _build_new_schema(
+                            new_schema[field_name]["fields"], patched_value["fields"]
+                        )
+                    # Update the new_schema with the patched value
+                    new_schema[field_name].update(patched_value)
+                # This is a new field, just include the whole configuration for it
+                else:
+                    new_schema[field_name] = patched_value
+
+            return list(new_schema.values())
+
+        current_table_schema = self.get_schema(

Review comment:
       You'll only want to include `policyTags` if you're actually trying to modify those fields.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] thejens commented on a change in pull request #15367: Implement BigQuery Table Schema Update Operator

Posted by GitBox <gi...@apache.org>.
thejens commented on a change in pull request #15367:
URL: https://github.com/apache/airflow/pull/15367#discussion_r619658132



##########
File path: airflow/providers/google/cloud/hooks/bigquery.py
##########
@@ -1353,6 +1353,86 @@ def get_schema(self, dataset_id: str, table_id: str, project_id: Optional[str] =
         table = self.get_client(project_id=project_id).get_table(table_ref)
         return {"fields": [s.to_api_repr() for s in table.schema]}
 
+    @GoogleBaseHook.fallback_to_default_project_id
+    def update_table_schema(
+        self,
+        schema_fields_updates: List[Dict[str, Any]],
+        dataset_id: str,
+        table_id: str,
+        project_id: Optional[str] = None,
+    ) -> None:
+        """
+        Update fields within a schema for a given dataset and table. Note that
+        some fields in schemas are immutable and trying to change them will cause
+        an exception.
+        If a new field is included it will be inserted which requires all required fields to be set.
+        See https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#TableSchema
+
+        :param dataset_id: the dataset ID of the requested table to be updated
+        :type dataset_id: str
+        :param table_id: the table ID of the table to be updated
+        :type table_id: str
+        :param schema_fields_updates: a partial schema resource. see
+            https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#TableSchema
+
+        **Example**: ::
+
+            schema_fields_updates=[
+                {"name": "emp_name", "description": "Some New Description"},
+                {"name": "salary", "description": "Some New Description"},
+                {"name": "departments", "fields": [
+                    {"name": "name", "description": "Some New Description"},
+                    {"name": "type", "description": "Some New Description"}
+                ]},
+            ]
+
+        :type schema_fields_updates: List[dict]
+        :param project_id: The name of the project where we want to update the table.
+        :type project_id: str
+        """
+
+        def _build_new_schema(
+            current_schema: List[Dict[str, Any]], schema_fields_updates: List[Dict[str, Any]]
+        ) -> List[Dict[str, Any]]:
+
+            # Turn schema_field_updates into a dict keyed on field names
+            schema_fields_updates = {field["name"]: field for field in deepcopy(schema_fields_updates)}
+
+            # Create a new dict for storing the new schema, initated based on the current_schema
+            # as of Python 3.6, dicts retain order.
+            new_schema = {field["name"]: field for field in deepcopy(current_schema)}
+
+            # Each item in schema_fields_updates contains a potential patch
+            # to a schema field, iterate over them
+            for field_name, patched_value in schema_fields_updates.items():
+                # If this field already exists, update it
+                if field_name in new_schema:
+                    # If this field is of type RECORD and has a fields key we need to patch it recursively
+                    if "fields" in patched_value:
+                        patched_value["fields"] = _build_new_schema(
+                            new_schema[field_name]["fields"], patched_value["fields"]
+                        )
+                    # Update the new_schema with the patched value
+                    new_schema[field_name].update(patched_value)
+                # This is a new field, just include the whole configuration for it
+                else:
+                    new_schema[field_name] = patched_value
+
+            return list(new_schema.values())
+
+        current_table_schema = self.get_schema(

Review comment:
       Well, if the current table has policyTags and you filter them out from the schema included in the update, it is my understanding that you will in effect delete them - which you need access to do. Leaving them unchanged may work though I haven't actually tried. Worst case, you will need additional access level to modify any schema on a table with policyTags.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] marcosmarxm commented on a change in pull request #15367: Implement BigQuery Table Schema Patch Operator

Posted by GitBox <gi...@apache.org>.
marcosmarxm commented on a change in pull request #15367:
URL: https://github.com/apache/airflow/pull/15367#discussion_r614321980



##########
File path: airflow/providers/google/cloud/operators/bigquery.py
##########
@@ -2039,6 +2039,149 @@ def execute(self, context) -> None:
         )
 
 
+class BigQueryPatchTableSchemaOperator(BaseOperator):
+    """
+    Patch BigQuery Table Schema
+    Updates fields on a table schema based on contents of the supplied schema
+    parameter. The supplied schema does not need to be complete, if the field
+    already exists in the schema you only need to supply a schema with the
+    fields you want to patch and the "name" key set on the schema resource.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the guide:
+        :ref:`howto/operator:BigQueryPatchTableSchemaOperator`
+
+    :param dataset_id: A dotted
+        ``(<project>.|<project>:)<dataset>`` that indicates which dataset
+        will be updated. (templated)
+    :type dataset_id: str
+    :param schema_fields: a partial schema resource. see
+        https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#TableSchema
+
+    **Example**: ::
+
+        schema_fields=[
+            {"name": "emp_name", "description": "Some New Description"},
+            {"name": "salary", "description": "Some New Description"},
+            {"name": "departments", "fields": [
+                {"name": "name", "description": "Some New Description"},
+                {"name": "type", "description": "Some New Description"}
+            ]},
+        ]
+
+    :type schema_fields: dict
+    :param project_id: The name of the project where we want to update the dataset.
+        Don't need to provide, if projectId in dataset_reference.
+    :type project_id: str
+    :param gcp_conn_id: (Optional) The connection ID used to connect to Google Cloud.
+    :type gcp_conn_id: str
+    :param bigquery_conn_id: (Deprecated) The connection ID used to connect to Google Cloud.
+        This parameter has been deprecated. You should pass the gcp_conn_id parameter instead.
+    :type bigquery_conn_id: str
+    :param delegate_to: The account to impersonate, if any.
+        For this to work, the service account making the request must have domain-wide
+        delegation enabled.
+    :type delegate_to: str
+    :param location: The location used for the operation.
+    :type location: str
+    :param impersonation_chain: Optional service account to impersonate using short-term
+        credentials, or chained list of accounts required to get the access_token
+        of the last account in the list, which will be impersonated in the request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding identity, with first
+        account from the list granting this role to the originating account (templated).
+    :type impersonation_chain: Union[str, Sequence[str]]
+    """
+
+    template_fields = (
+        'schema_fields',
+        'dataset_id',
+        'table_id',
+        'project_id',
+        'impersonation_chain',
+    )
+    template_fields_renderers = {"schema_fields": "json"}
+    ui_color = BigQueryUIColors.TABLE.value
+
+    @classmethod
+    def _patch_schema(cls, old_schema, new_schema):
+        # Updates the content of "old_schema" with

Review comment:
       @thejens I made some suggestions bellow and explain why using the multi-line can be better.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] tswast commented on a change in pull request #15367: Implement BigQuery Table Schema Update Operator

Posted by GitBox <gi...@apache.org>.
tswast commented on a change in pull request #15367:
URL: https://github.com/apache/airflow/pull/15367#discussion_r619500931



##########
File path: airflow/providers/google/cloud/hooks/bigquery.py
##########
@@ -1353,6 +1353,86 @@ def get_schema(self, dataset_id: str, table_id: str, project_id: Optional[str] =
         table = self.get_client(project_id=project_id).get_table(table_ref)
         return {"fields": [s.to_api_repr() for s in table.schema]}
 
+    @GoogleBaseHook.fallback_to_default_project_id
+    def update_table_schema(
+        self,
+        schema_fields_updates: List[Dict[str, Any]],
+        dataset_id: str,
+        table_id: str,
+        project_id: Optional[str] = None,
+    ) -> None:
+        """
+        Update fields within a schema for a given dataset and table. Note that
+        some fields in schemas are immutable and trying to change them will cause
+        an exception.
+        If a new field is included it will be inserted which requires all required fields to be set.
+        See https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#TableSchema
+
+        :param dataset_id: the dataset ID of the requested table to be updated
+        :type dataset_id: str
+        :param table_id: the table ID of the table to be updated
+        :type table_id: str
+        :param schema_fields_updates: a partial schema resource. see
+            https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#TableSchema
+
+        **Example**: ::
+
+            schema_fields_updates=[
+                {"name": "emp_name", "description": "Some New Description"},
+                {"name": "salary", "description": "Some New Description"},
+                {"name": "departments", "fields": [
+                    {"name": "name", "description": "Some New Description"},
+                    {"name": "type", "description": "Some New Description"}
+                ]},
+            ]
+
+        :type schema_fields_updates: List[dict]
+        :param project_id: The name of the project where we want to update the table.
+        :type project_id: str
+        """
+
+        def _build_new_schema(
+            current_schema: List[Dict[str, Any]], schema_fields_updates: List[Dict[str, Any]]
+        ) -> List[Dict[str, Any]]:
+
+            # Turn schema_field_updates into a dict keyed on field names
+            schema_fields_updates = {field["name"]: field for field in deepcopy(schema_fields_updates)}
+
+            # Create a new dict for storing the new schema, initated based on the current_schema
+            # as of Python 3.6, dicts retain order.
+            new_schema = {field["name"]: field for field in deepcopy(current_schema)}
+
+            # Each item in schema_fields_updates contains a potential patch
+            # to a schema field, iterate over them
+            for field_name, patched_value in schema_fields_updates.items():
+                # If this field already exists, update it
+                if field_name in new_schema:
+                    # If this field is of type RECORD and has a fields key we need to patch it recursively
+                    if "fields" in patched_value:
+                        patched_value["fields"] = _build_new_schema(
+                            new_schema[field_name]["fields"], patched_value["fields"]
+                        )
+                    # Update the new_schema with the patched value
+                    new_schema[field_name].update(patched_value)
+                # This is a new field, just include the whole configuration for it
+                else:
+                    new_schema[field_name] = patched_value
+
+            return list(new_schema.values())
+
+        current_table_schema = self.get_schema(

Review comment:
       I see others have mentioned `policyTags`, but I don't see any logic here to filter out those keys from the `current_table_schema`.
   
   If you leave those in your table update request, it'll require extra permissions, even if you aren't actually making changes. See: https://github.com/googleapis/python-bigquery/pull/557




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] thejens commented on pull request #15367: Implement BigQuery Table Schema Update Operator

Posted by GitBox <gi...@apache.org>.
thejens commented on pull request #15367:
URL: https://github.com/apache/airflow/pull/15367#issuecomment-825790727


   @marcosmarxm
   Thanks for the feedback, I broke out the functionality into a new hook and added a unit-test for it. I also changed the name of the parameter and made the recursive function not rely on mutability.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] marcosmarxm commented on a change in pull request #15367: Implement BigQuery Table Schema Patch Operator

Posted by GitBox <gi...@apache.org>.
marcosmarxm commented on a change in pull request #15367:
URL: https://github.com/apache/airflow/pull/15367#discussion_r614207197



##########
File path: airflow/providers/google/cloud/operators/bigquery.py
##########
@@ -2039,6 +2039,149 @@ def execute(self, context) -> None:
         )
 
 
+class BigQueryPatchTableSchemaOperator(BaseOperator):
+    """
+    Patch BigQuery Table Schema
+    Updates fields on a table schema based on contents of the supplied schema
+    parameter. The supplied schema does not need to be complete, if the field
+    already exists in the schema you only need to supply a schema with the
+    fields you want to patch and the "name" key set on the schema resource.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the guide:
+        :ref:`howto/operator:BigQueryPatchTableSchemaOperator`
+
+    :param dataset_id: A dotted
+        ``(<project>.|<project>:)<dataset>`` that indicates which dataset
+        will be updated. (templated)
+    :type dataset_id: str
+    :param schema_fields: a partial schema resource. see
+        https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#TableSchema
+
+    **Example**: ::
+
+        schema_fields=[
+            {"name": "emp_name", "description": "Some New Description"},
+            {"name": "salary", "description": "Some New Description"},
+            {"name": "departments", "fields": [
+                {"name": "name", "description": "Some New Description"},
+                {"name": "type", "description": "Some New Description"}
+            ]},
+        ]
+
+    :type schema_fields: dict
+    :param project_id: The name of the project where we want to update the dataset.
+        Don't need to provide, if projectId in dataset_reference.
+    :type project_id: str
+    :param gcp_conn_id: (Optional) The connection ID used to connect to Google Cloud.
+    :type gcp_conn_id: str
+    :param bigquery_conn_id: (Deprecated) The connection ID used to connect to Google Cloud.
+        This parameter has been deprecated. You should pass the gcp_conn_id parameter instead.
+    :type bigquery_conn_id: str
+    :param delegate_to: The account to impersonate, if any.
+        For this to work, the service account making the request must have domain-wide
+        delegation enabled.
+    :type delegate_to: str
+    :param location: The location used for the operation.
+    :type location: str
+    :param impersonation_chain: Optional service account to impersonate using short-term
+        credentials, or chained list of accounts required to get the access_token
+        of the last account in the list, which will be impersonated in the request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding identity, with first
+        account from the list granting this role to the originating account (templated).
+    :type impersonation_chain: Union[str, Sequence[str]]
+    """
+
+    template_fields = (
+        'schema_fields',
+        'dataset_id',
+        'table_id',
+        'project_id',
+        'impersonation_chain',
+    )
+    template_fields_renderers = {"schema_fields": "json"}
+    ui_color = BigQueryUIColors.TABLE.value
+
+    @classmethod
+    def _patch_schema(cls, old_schema, new_schema):
+        # Updates the content of "old_schema" with

Review comment:
       Can you insert type notation and also change single-line comments for a doc-string?

##########
File path: airflow/providers/google/cloud/operators/bigquery.py
##########
@@ -2039,6 +2039,149 @@ def execute(self, context) -> None:
         )
 
 
+class BigQueryPatchTableSchemaOperator(BaseOperator):
+    """
+    Patch BigQuery Table Schema
+    Updates fields on a table schema based on contents of the supplied schema
+    parameter. The supplied schema does not need to be complete, if the field
+    already exists in the schema you only need to supply a schema with the
+    fields you want to patch and the "name" key set on the schema resource.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the guide:
+        :ref:`howto/operator:BigQueryPatchTableSchemaOperator`
+
+    :param dataset_id: A dotted
+        ``(<project>.|<project>:)<dataset>`` that indicates which dataset
+        will be updated. (templated)
+    :type dataset_id: str
+    :param schema_fields: a partial schema resource. see
+        https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#TableSchema
+
+    **Example**: ::
+
+        schema_fields=[
+            {"name": "emp_name", "description": "Some New Description"},
+            {"name": "salary", "description": "Some New Description"},
+            {"name": "departments", "fields": [
+                {"name": "name", "description": "Some New Description"},
+                {"name": "type", "description": "Some New Description"}
+            ]},
+        ]
+
+    :type schema_fields: dict
+    :param project_id: The name of the project where we want to update the dataset.
+        Don't need to provide, if projectId in dataset_reference.
+    :type project_id: str
+    :param gcp_conn_id: (Optional) The connection ID used to connect to Google Cloud.
+    :type gcp_conn_id: str
+    :param bigquery_conn_id: (Deprecated) The connection ID used to connect to Google Cloud.
+        This parameter has been deprecated. You should pass the gcp_conn_id parameter instead.
+    :type bigquery_conn_id: str
+    :param delegate_to: The account to impersonate, if any.
+        For this to work, the service account making the request must have domain-wide
+        delegation enabled.
+    :type delegate_to: str
+    :param location: The location used for the operation.
+    :type location: str
+    :param impersonation_chain: Optional service account to impersonate using short-term
+        credentials, or chained list of accounts required to get the access_token
+        of the last account in the list, which will be impersonated in the request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding identity, with first
+        account from the list granting this role to the originating account (templated).
+    :type impersonation_chain: Union[str, Sequence[str]]
+    """
+
+    template_fields = (
+        'schema_fields',
+        'dataset_id',
+        'table_id',
+        'project_id',
+        'impersonation_chain',
+    )
+    template_fields_renderers = {"schema_fields": "json"}
+    ui_color = BigQueryUIColors.TABLE.value
+
+    @classmethod
+    def _patch_schema(cls, old_schema, new_schema):
+        # Updates the content of "old_schema" with
+        # the fields from new_schema. Makes changes
+        # in place and hence has no return value.
+        # Works recursively for sub-records
+
+        # Start by turning the schema list of fields into
+        # a dict keyed on the field name for both the old
+        # and the new schema
+        new_fields = {field["name"]: field for field in new_schema["fields"] if "name" in field}
+        old_fields = {field["name"]: field for field in old_schema["fields"]}
+
+        # Iterate over all new fields and update the
+        # old_schema dict
+        for field_key in new_fields.keys():
+            # Check if the field exists in the old_schema, if
+            # so change it
+            if field_key in old_fields:
+                old_field = old_fields[field_key]
+                new_field = new_fields[field_key]
+                # Check if recursion is needed
+                if "fields" in new_field:
+                    cls._patch_schema(old_field, new_field)
+                    del new_field["fields"]
+
+                # Do the update
+                old_field.update(new_field)
+
+            # Field didn't exist, add it as a new field
+            else:
+                old_schema["fields"].append(new_fields[field_key])
+
+    @apply_defaults
+    def __init__(
+        self,
+        *,
+        dataset_id: Optional[str] = None,
+        schema_fields: List[Dict[str, Any]],
+        table_id: Optional[str] = None,
+        project_id: Optional[str] = None,
+        gcp_conn_id: str = 'google_cloud_default',
+        delegate_to: Optional[str] = None,
+        impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+        **kwargs,
+    ) -> None:
+        self.schema_fields = schema_fields
+        self.table_id = table_id
+        self.dataset_id = dataset_id
+        self.project_id = project_id
+        self.gcp_conn_id = gcp_conn_id
+        self.delegate_to = delegate_to
+        self.impersonation_chain = impersonation_chain
+        super().__init__(**kwargs)
+
+    def execute(self, context):
+        bq_hook = BigQueryHook(
+            gcp_conn_id=self.gcp_conn_id,
+            delegate_to=self.delegate_to,
+            impersonation_chain=self.impersonation_chain,
+        )
+
+        table_schema = bq_hook.get_schema(
+            dataset_id=self.dataset_id, table_id=self.table_id, project_id=self.project_id
+        )
+
+        self._patch_schema(old_schema=table_schema, new_schema={"fields": self.schema_fields})
+
+        return bq_hook.update_table(
+            table_resource={"schema": table_schema},
+            fields=["schema"],
+            dataset_id=self.dataset_id,
+            table_id=self.table_id,
+            project_id=self.project_id,
+        )

Review comment:
       It's look this operation and function could be better place in the BigQueryHook, imho.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] tswast commented on a change in pull request #15367: Implement BigQuery Table Schema Update Operator

Posted by GitBox <gi...@apache.org>.
tswast commented on a change in pull request #15367:
URL: https://github.com/apache/airflow/pull/15367#discussion_r620364813



##########
File path: airflow/providers/google/cloud/hooks/bigquery.py
##########
@@ -1353,6 +1353,86 @@ def get_schema(self, dataset_id: str, table_id: str, project_id: Optional[str] =
         table = self.get_client(project_id=project_id).get_table(table_ref)
         return {"fields": [s.to_api_repr() for s in table.schema]}
 
+    @GoogleBaseHook.fallback_to_default_project_id
+    def update_table_schema(
+        self,
+        schema_fields_updates: List[Dict[str, Any]],
+        dataset_id: str,
+        table_id: str,
+        project_id: Optional[str] = None,
+    ) -> None:
+        """
+        Update fields within a schema for a given dataset and table. Note that
+        some fields in schemas are immutable and trying to change them will cause
+        an exception.
+        If a new field is included it will be inserted which requires all required fields to be set.
+        See https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#TableSchema
+
+        :param dataset_id: the dataset ID of the requested table to be updated
+        :type dataset_id: str
+        :param table_id: the table ID of the table to be updated
+        :type table_id: str
+        :param schema_fields_updates: a partial schema resource. see
+            https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#TableSchema
+
+        **Example**: ::
+
+            schema_fields_updates=[
+                {"name": "emp_name", "description": "Some New Description"},
+                {"name": "salary", "description": "Some New Description"},
+                {"name": "departments", "fields": [
+                    {"name": "name", "description": "Some New Description"},
+                    {"name": "type", "description": "Some New Description"}
+                ]},
+            ]
+
+        :type schema_fields_updates: List[dict]
+        :param project_id: The name of the project where we want to update the table.
+        :type project_id: str
+        """
+
+        def _build_new_schema(
+            current_schema: List[Dict[str, Any]], schema_fields_updates: List[Dict[str, Any]]
+        ) -> List[Dict[str, Any]]:
+
+            # Turn schema_field_updates into a dict keyed on field names
+            schema_fields_updates = {field["name"]: field for field in deepcopy(schema_fields_updates)}
+
+            # Create a new dict for storing the new schema, initated based on the current_schema
+            # as of Python 3.6, dicts retain order.
+            new_schema = {field["name"]: field for field in deepcopy(current_schema)}
+
+            # Each item in schema_fields_updates contains a potential patch
+            # to a schema field, iterate over them
+            for field_name, patched_value in schema_fields_updates.items():
+                # If this field already exists, update it
+                if field_name in new_schema:
+                    # If this field is of type RECORD and has a fields key we need to patch it recursively
+                    if "fields" in patched_value:
+                        patched_value["fields"] = _build_new_schema(
+                            new_schema[field_name]["fields"], patched_value["fields"]
+                        )
+                    # Update the new_schema with the patched value
+                    new_schema[field_name].update(patched_value)
+                # This is a new field, just include the whole configuration for it
+                else:
+                    new_schema[field_name] = patched_value
+
+            return list(new_schema.values())
+
+        current_table_schema = self.get_schema(

Review comment:
       > Well, if the current table has policyTags and you filter them out from the schema included in the update, it is my understanding that you will in effect delete them
   
   This is false.
   
   The way it works is that any keys that are present in the JSON are the ones that are updated. To remove policy tags, you'd have to include the `policyTags` key and set them to an empty list (or maybe None).
   
   Including these keys, even if they are exactly the same as what is present on the table will attempt to update them, which requires additional permissions beyond those needed for general schema updates. (Thus the 403 error I linked to and the resulting fix)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] thejens commented on a change in pull request #15367: Implement BigQuery Table Schema Patch Operator

Posted by GitBox <gi...@apache.org>.
thejens commented on a change in pull request #15367:
URL: https://github.com/apache/airflow/pull/15367#discussion_r614775355



##########
File path: airflow/providers/google/cloud/operators/bigquery.py
##########
@@ -2039,6 +2039,149 @@ def execute(self, context) -> None:
         )
 
 
+class BigQueryPatchTableSchemaOperator(BaseOperator):
+    """
+    Patch BigQuery Table Schema
+    Updates fields on a table schema based on contents of the supplied schema
+    parameter. The supplied schema does not need to be complete, if the field
+    already exists in the schema you only need to supply a schema with the
+    fields you want to patch and the "name" key set on the schema resource.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the guide:
+        :ref:`howto/operator:BigQueryPatchTableSchemaOperator`
+
+    :param dataset_id: A dotted
+        ``(<project>.|<project>:)<dataset>`` that indicates which dataset
+        will be updated. (templated)
+    :type dataset_id: str
+    :param schema_fields: a partial schema resource. see
+        https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#TableSchema
+
+    **Example**: ::
+
+        schema_fields=[
+            {"name": "emp_name", "description": "Some New Description"},
+            {"name": "salary", "description": "Some New Description"},
+            {"name": "departments", "fields": [
+                {"name": "name", "description": "Some New Description"},
+                {"name": "type", "description": "Some New Description"}
+            ]},
+        ]
+
+    :type schema_fields: dict
+    :param project_id: The name of the project where we want to update the dataset.
+        Don't need to provide, if projectId in dataset_reference.
+    :type project_id: str
+    :param gcp_conn_id: (Optional) The connection ID used to connect to Google Cloud.
+    :type gcp_conn_id: str
+    :param bigquery_conn_id: (Deprecated) The connection ID used to connect to Google Cloud.
+        This parameter has been deprecated. You should pass the gcp_conn_id parameter instead.
+    :type bigquery_conn_id: str
+    :param delegate_to: The account to impersonate, if any.
+        For this to work, the service account making the request must have domain-wide
+        delegation enabled.
+    :type delegate_to: str
+    :param location: The location used for the operation.
+    :type location: str
+    :param impersonation_chain: Optional service account to impersonate using short-term
+        credentials, or chained list of accounts required to get the access_token
+        of the last account in the list, which will be impersonated in the request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding identity, with first
+        account from the list granting this role to the originating account (templated).
+    :type impersonation_chain: Union[str, Sequence[str]]
+    """
+
+    template_fields = (
+        'schema_fields',
+        'dataset_id',
+        'table_id',
+        'project_id',
+        'impersonation_chain',
+    )
+    template_fields_renderers = {"schema_fields": "json"}
+    ui_color = BigQueryUIColors.TABLE.value
+
+    @classmethod
+    def _patch_schema(cls, old_schema, new_schema):
+        # Updates the content of "old_schema" with

Review comment:
       Thanks will change this!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] thejens commented on a change in pull request #15367: Implement BigQuery Table Schema Patch Operator

Posted by GitBox <gi...@apache.org>.
thejens commented on a change in pull request #15367:
URL: https://github.com/apache/airflow/pull/15367#discussion_r614945105



##########
File path: airflow/providers/google/cloud/operators/bigquery.py
##########
@@ -2039,6 +2039,155 @@ def execute(self, context) -> None:
         )
 
 
+class BigQueryPatchTableSchemaOperator(BaseOperator):
+    """
+    Patch BigQuery Table Schema
+    Updates fields on a table schema based on contents of the supplied schema
+    parameter. The supplied schema does not need to be complete, if the field
+    already exists in the schema you only need to supply a schema with the
+    fields you want to patch and the "name" key set on the schema resource.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the guide:
+        :ref:`howto/operator:BigQueryPatchTableSchemaOperator`
+
+    :param dataset_id: A dotted
+        ``(<project>.|<project>:)<dataset>`` that indicates which dataset
+        will be updated. (templated)
+    :type dataset_id: str
+    :param schema_fields: a partial schema resource. see
+        https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#TableSchema
+
+    **Example**: ::
+
+        schema_fields=[
+            {"name": "emp_name", "description": "Some New Description"},
+            {"name": "salary", "description": "Some New Description"},
+            {"name": "departments", "fields": [
+                {"name": "name", "description": "Some New Description"},
+                {"name": "type", "description": "Some New Description"}
+            ]},
+        ]
+
+    :type schema_fields: dict
+    :param project_id: The name of the project where we want to update the dataset.
+        Don't need to provide, if projectId in dataset_reference.
+    :type project_id: str
+    :param gcp_conn_id: (Optional) The connection ID used to connect to Google Cloud.
+    :type gcp_conn_id: str
+    :param bigquery_conn_id: (Deprecated) The connection ID used to connect to Google Cloud.
+        This parameter has been deprecated. You should pass the gcp_conn_id parameter instead.
+    :type bigquery_conn_id: str
+    :param delegate_to: The account to impersonate, if any.
+        For this to work, the service account making the request must have domain-wide
+        delegation enabled.
+    :type delegate_to: str
+    :param location: The location used for the operation.
+    :type location: str
+    :param impersonation_chain: Optional service account to impersonate using short-term
+        credentials, or chained list of accounts required to get the access_token
+        of the last account in the list, which will be impersonated in the request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding identity, with first
+        account from the list granting this role to the originating account (templated).
+    :type impersonation_chain: Union[str, Sequence[str]]
+    """
+
+    template_fields = (
+        'schema_fields',
+        'dataset_id',
+        'table_id',
+        'project_id',
+        'impersonation_chain',
+    )
+    template_fields_renderers = {"schema_fields": "json"}
+    ui_color = BigQueryUIColors.TABLE.value
+
+    @classmethod
+    def _patch_schema(cls, old_schema: Dict, new_schema: Dict) -> None:
+        """
+        Updates the content of "old_schema" with
+        the fields from new_schema. Makes changes
+        in place and hence has no return value.
+        Works recursively for sub-records.
+        Start by turning the schema list of fields into
+        a dict keyed on the field name for both the old
+        and the new schema.
+
+        :param old_schema: Old schema which is updated in-place
+        :type old_schema: dict
+        :param new_schema: Partial schema definition used to patch old schema
+        :type new_schema: dict
+        """
+        new_fields = {field["name"]: field for field in new_schema["fields"] if "name" in field}
+        old_fields = {field["name"]: field for field in old_schema["fields"]}
+
+        # Iterate over all new fields and update the
+        # old_schema dict
+        for field_key in new_fields.keys():
+            # Check if the field exists in the old_schema, if
+            # so change it
+            if field_key in old_fields:
+                old_field = old_fields[field_key]
+                new_field = new_fields[field_key]
+                # Check if recursion is needed
+                if "fields" in new_field:
+                    cls._patch_schema(old_field, new_field)
+                    del new_field["fields"]
+
+                # Do the update
+                old_field.update(new_field)
+
+            # Field didn't exist, add it as a new field
+            else:
+                old_schema["fields"].append(new_fields[field_key])
+
+    @apply_defaults
+    def __init__(
+        self,
+        *,
+        dataset_id: Optional[str] = None,
+        schema_fields: List[Dict[str, Any]],
+        table_id: Optional[str] = None,
+        project_id: Optional[str] = None,
+        gcp_conn_id: str = 'google_cloud_default',
+        delegate_to: Optional[str] = None,
+        impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+        **kwargs,
+    ) -> None:
+        self.schema_fields = schema_fields
+        self.table_id = table_id
+        self.dataset_id = dataset_id
+        self.project_id = project_id
+        self.gcp_conn_id = gcp_conn_id
+        self.delegate_to = delegate_to
+        self.impersonation_chain = impersonation_chain
+        super().__init__(**kwargs)
+
+    def execute(self, context):
+        bq_hook = BigQueryHook(
+            gcp_conn_id=self.gcp_conn_id,
+            delegate_to=self.delegate_to,
+            impersonation_chain=self.impersonation_chain,
+        )
+
+        table_schema = bq_hook.get_schema(
+            dataset_id=self.dataset_id, table_id=self.table_id, project_id=self.project_id
+        )
+
+        self._patch_schema(old_schema=table_schema, new_schema={"fields": self.schema_fields})

Review comment:
       Not sure I agree that it is tricky in a setting like this though, it's just a mutable object that I change




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #15367: Implement BigQuery Table Schema Update Operator

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #15367:
URL: https://github.com/apache/airflow/pull/15367#issuecomment-827411546


   [The Workflow run](https://github.com/apache/airflow/actions/runs/788324828) is cancelling this PR. Building images for the PR has failed. Follow the workflow link to check the reason.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] marcosmarxm commented on a change in pull request #15367: Implement BigQuery Table Schema Patch Operator

Posted by GitBox <gi...@apache.org>.
marcosmarxm commented on a change in pull request #15367:
URL: https://github.com/apache/airflow/pull/15367#discussion_r614317645



##########
File path: airflow/providers/google/cloud/operators/bigquery.py
##########
@@ -2039,6 +2039,149 @@ def execute(self, context) -> None:
         )
 
 
+class BigQueryPatchTableSchemaOperator(BaseOperator):
+    """
+    Patch BigQuery Table Schema
+    Updates fields on a table schema based on contents of the supplied schema
+    parameter. The supplied schema does not need to be complete, if the field
+    already exists in the schema you only need to supply a schema with the
+    fields you want to patch and the "name" key set on the schema resource.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the guide:
+        :ref:`howto/operator:BigQueryPatchTableSchemaOperator`
+
+    :param dataset_id: A dotted
+        ``(<project>.|<project>:)<dataset>`` that indicates which dataset
+        will be updated. (templated)
+    :type dataset_id: str
+    :param schema_fields: a partial schema resource. see
+        https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#TableSchema
+
+    **Example**: ::
+
+        schema_fields=[
+            {"name": "emp_name", "description": "Some New Description"},
+            {"name": "salary", "description": "Some New Description"},
+            {"name": "departments", "fields": [
+                {"name": "name", "description": "Some New Description"},
+                {"name": "type", "description": "Some New Description"}
+            ]},
+        ]
+
+    :type schema_fields: dict
+    :param project_id: The name of the project where we want to update the dataset.
+        Don't need to provide, if projectId in dataset_reference.
+    :type project_id: str
+    :param gcp_conn_id: (Optional) The connection ID used to connect to Google Cloud.
+    :type gcp_conn_id: str
+    :param bigquery_conn_id: (Deprecated) The connection ID used to connect to Google Cloud.
+        This parameter has been deprecated. You should pass the gcp_conn_id parameter instead.
+    :type bigquery_conn_id: str
+    :param delegate_to: The account to impersonate, if any.
+        For this to work, the service account making the request must have domain-wide
+        delegation enabled.
+    :type delegate_to: str
+    :param location: The location used for the operation.
+    :type location: str
+    :param impersonation_chain: Optional service account to impersonate using short-term
+        credentials, or chained list of accounts required to get the access_token
+        of the last account in the list, which will be impersonated in the request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding identity, with first
+        account from the list granting this role to the originating account (templated).
+    :type impersonation_chain: Union[str, Sequence[str]]
+    """
+
+    template_fields = (
+        'schema_fields',
+        'dataset_id',
+        'table_id',
+        'project_id',
+        'impersonation_chain',
+    )
+    template_fields_renderers = {"schema_fields": "json"}
+    ui_color = BigQueryUIColors.TABLE.value
+
+    @classmethod
+    def _patch_schema(cls, old_schema, new_schema):
+        # Updates the content of "old_schema" with

Review comment:
       ```suggestion
       def _patch_schema(cls, old_schema: Dict, new_schema: Dict) -> None:
           """
           Updates the content of "old_schema" with
           the fields from new_schema. Makes changes
           in place and hence has no return value.
           Works recursively for sub-records.
   
           Start by turning the schema list of fields into
           a dict keyed on the field name for both the old
           and the new schema.
           
           :param old_schema: Old schema...
           :type old_schema: dict
           """
   ```
   
   This way information about the function itself will be render in Airflow website and users have more information about it, take a look [in a example here](
   https://airflow.apache.org/docs/apache-airflow-providers-google/stable/_api/airflow/providers/google/cloud/hooks/bigquery/index.html#airflow.providers.google.cloud.hooks.bigquery.BigQueryHook.get_schema)

##########
File path: airflow/providers/google/cloud/operators/bigquery.py
##########
@@ -2039,6 +2039,149 @@ def execute(self, context) -> None:
         )
 
 
+class BigQueryPatchTableSchemaOperator(BaseOperator):
+    """
+    Patch BigQuery Table Schema
+    Updates fields on a table schema based on contents of the supplied schema
+    parameter. The supplied schema does not need to be complete, if the field
+    already exists in the schema you only need to supply a schema with the
+    fields you want to patch and the "name" key set on the schema resource.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the guide:
+        :ref:`howto/operator:BigQueryPatchTableSchemaOperator`
+
+    :param dataset_id: A dotted
+        ``(<project>.|<project>:)<dataset>`` that indicates which dataset
+        will be updated. (templated)
+    :type dataset_id: str
+    :param schema_fields: a partial schema resource. see
+        https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#TableSchema
+
+    **Example**: ::
+
+        schema_fields=[
+            {"name": "emp_name", "description": "Some New Description"},
+            {"name": "salary", "description": "Some New Description"},
+            {"name": "departments", "fields": [
+                {"name": "name", "description": "Some New Description"},
+                {"name": "type", "description": "Some New Description"}
+            ]},
+        ]
+
+    :type schema_fields: dict
+    :param project_id: The name of the project where we want to update the dataset.
+        Don't need to provide, if projectId in dataset_reference.
+    :type project_id: str
+    :param gcp_conn_id: (Optional) The connection ID used to connect to Google Cloud.
+    :type gcp_conn_id: str
+    :param bigquery_conn_id: (Deprecated) The connection ID used to connect to Google Cloud.
+        This parameter has been deprecated. You should pass the gcp_conn_id parameter instead.
+    :type bigquery_conn_id: str
+    :param delegate_to: The account to impersonate, if any.
+        For this to work, the service account making the request must have domain-wide
+        delegation enabled.
+    :type delegate_to: str
+    :param location: The location used for the operation.
+    :type location: str
+    :param impersonation_chain: Optional service account to impersonate using short-term
+        credentials, or chained list of accounts required to get the access_token
+        of the last account in the list, which will be impersonated in the request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding identity, with first
+        account from the list granting this role to the originating account (templated).
+    :type impersonation_chain: Union[str, Sequence[str]]
+    """
+
+    template_fields = (
+        'schema_fields',
+        'dataset_id',
+        'table_id',
+        'project_id',
+        'impersonation_chain',
+    )
+    template_fields_renderers = {"schema_fields": "json"}
+    ui_color = BigQueryUIColors.TABLE.value
+
+    @classmethod
+    def _patch_schema(cls, old_schema, new_schema):
+        # Updates the content of "old_schema" with
+        # the fields from new_schema. Makes changes
+        # in place and hence has no return value.
+        # Works recursively for sub-records
+
+        # Start by turning the schema list of fields into
+        # a dict keyed on the field name for both the old
+        # and the new schema
+        new_fields = {field["name"]: field for field in new_schema["fields"] if "name" in field}
+        old_fields = {field["name"]: field for field in old_schema["fields"]}
+
+        # Iterate over all new fields and update the
+        # old_schema dict
+        for field_key in new_fields.keys():
+            # Check if the field exists in the old_schema, if
+            # so change it
+            if field_key in old_fields:
+                old_field = old_fields[field_key]
+                new_field = new_fields[field_key]
+                # Check if recursion is needed
+                if "fields" in new_field:
+                    cls._patch_schema(old_field, new_field)
+                    del new_field["fields"]
+
+                # Do the update
+                old_field.update(new_field)
+
+            # Field didn't exist, add it as a new field
+            else:
+                old_schema["fields"].append(new_fields[field_key])
+
+    @apply_defaults
+    def __init__(
+        self,
+        *,
+        dataset_id: Optional[str] = None,
+        schema_fields: List[Dict[str, Any]],
+        table_id: Optional[str] = None,
+        project_id: Optional[str] = None,
+        gcp_conn_id: str = 'google_cloud_default',
+        delegate_to: Optional[str] = None,
+        impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+        **kwargs,
+    ) -> None:
+        self.schema_fields = schema_fields
+        self.table_id = table_id
+        self.dataset_id = dataset_id
+        self.project_id = project_id
+        self.gcp_conn_id = gcp_conn_id
+        self.delegate_to = delegate_to
+        self.impersonation_chain = impersonation_chain
+        super().__init__(**kwargs)
+
+    def execute(self, context):
+        bq_hook = BigQueryHook(
+            gcp_conn_id=self.gcp_conn_id,
+            delegate_to=self.delegate_to,
+            impersonation_chain=self.impersonation_chain,
+        )
+
+        table_schema = bq_hook.get_schema(
+            dataset_id=self.dataset_id, table_id=self.table_id, project_id=self.project_id
+        )
+
+        self._patch_schema(old_schema=table_schema, new_schema={"fields": self.schema_fields})
+
+        return bq_hook.update_table(
+            table_resource={"schema": table_schema},
+            fields=["schema"],
+            dataset_id=self.dataset_id,
+            table_id=self.table_id,
+            project_id=self.project_id,
+        )

Review comment:
       BigQueryHook has `patch_table`, `patch_dataset`, `get_schema` methods... maybe the function `_patch_schema` should belong to the BigqueryHook class. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] thejens commented on a change in pull request #15367: Implement BigQuery Table Schema Patch Operator

Posted by GitBox <gi...@apache.org>.
thejens commented on a change in pull request #15367:
URL: https://github.com/apache/airflow/pull/15367#discussion_r614778960



##########
File path: airflow/providers/google/cloud/operators/bigquery.py
##########
@@ -2039,6 +2039,149 @@ def execute(self, context) -> None:
         )
 
 
+class BigQueryPatchTableSchemaOperator(BaseOperator):
+    """
+    Patch BigQuery Table Schema
+    Updates fields on a table schema based on contents of the supplied schema
+    parameter. The supplied schema does not need to be complete, if the field
+    already exists in the schema you only need to supply a schema with the
+    fields you want to patch and the "name" key set on the schema resource.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the guide:
+        :ref:`howto/operator:BigQueryPatchTableSchemaOperator`
+
+    :param dataset_id: A dotted
+        ``(<project>.|<project>:)<dataset>`` that indicates which dataset
+        will be updated. (templated)
+    :type dataset_id: str
+    :param schema_fields: a partial schema resource. see
+        https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#TableSchema
+
+    **Example**: ::
+
+        schema_fields=[
+            {"name": "emp_name", "description": "Some New Description"},
+            {"name": "salary", "description": "Some New Description"},
+            {"name": "departments", "fields": [
+                {"name": "name", "description": "Some New Description"},
+                {"name": "type", "description": "Some New Description"}
+            ]},
+        ]
+
+    :type schema_fields: dict
+    :param project_id: The name of the project where we want to update the dataset.
+        Don't need to provide, if projectId in dataset_reference.
+    :type project_id: str
+    :param gcp_conn_id: (Optional) The connection ID used to connect to Google Cloud.
+    :type gcp_conn_id: str
+    :param bigquery_conn_id: (Deprecated) The connection ID used to connect to Google Cloud.
+        This parameter has been deprecated. You should pass the gcp_conn_id parameter instead.
+    :type bigquery_conn_id: str
+    :param delegate_to: The account to impersonate, if any.
+        For this to work, the service account making the request must have domain-wide
+        delegation enabled.
+    :type delegate_to: str
+    :param location: The location used for the operation.
+    :type location: str
+    :param impersonation_chain: Optional service account to impersonate using short-term
+        credentials, or chained list of accounts required to get the access_token
+        of the last account in the list, which will be impersonated in the request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding identity, with first
+        account from the list granting this role to the originating account (templated).
+    :type impersonation_chain: Union[str, Sequence[str]]
+    """
+
+    template_fields = (
+        'schema_fields',
+        'dataset_id',
+        'table_id',
+        'project_id',
+        'impersonation_chain',
+    )
+    template_fields_renderers = {"schema_fields": "json"}
+    ui_color = BigQueryUIColors.TABLE.value
+
+    @classmethod
+    def _patch_schema(cls, old_schema, new_schema):
+        # Updates the content of "old_schema" with
+        # the fields from new_schema. Makes changes
+        # in place and hence has no return value.
+        # Works recursively for sub-records
+
+        # Start by turning the schema list of fields into
+        # a dict keyed on the field name for both the old
+        # and the new schema
+        new_fields = {field["name"]: field for field in new_schema["fields"] if "name" in field}
+        old_fields = {field["name"]: field for field in old_schema["fields"]}
+
+        # Iterate over all new fields and update the
+        # old_schema dict
+        for field_key in new_fields.keys():
+            # Check if the field exists in the old_schema, if
+            # so change it
+            if field_key in old_fields:
+                old_field = old_fields[field_key]
+                new_field = new_fields[field_key]
+                # Check if recursion is needed
+                if "fields" in new_field:
+                    cls._patch_schema(old_field, new_field)
+                    del new_field["fields"]
+
+                # Do the update
+                old_field.update(new_field)
+
+            # Field didn't exist, add it as a new field
+            else:
+                old_schema["fields"].append(new_fields[field_key])
+
+    @apply_defaults
+    def __init__(
+        self,
+        *,
+        dataset_id: Optional[str] = None,
+        schema_fields: List[Dict[str, Any]],
+        table_id: Optional[str] = None,
+        project_id: Optional[str] = None,
+        gcp_conn_id: str = 'google_cloud_default',
+        delegate_to: Optional[str] = None,
+        impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+        **kwargs,
+    ) -> None:
+        self.schema_fields = schema_fields
+        self.table_id = table_id
+        self.dataset_id = dataset_id
+        self.project_id = project_id
+        self.gcp_conn_id = gcp_conn_id
+        self.delegate_to = delegate_to
+        self.impersonation_chain = impersonation_chain
+        super().__init__(**kwargs)
+
+    def execute(self, context):
+        bq_hook = BigQueryHook(
+            gcp_conn_id=self.gcp_conn_id,
+            delegate_to=self.delegate_to,
+            impersonation_chain=self.impersonation_chain,
+        )
+
+        table_schema = bq_hook.get_schema(
+            dataset_id=self.dataset_id, table_id=self.table_id, project_id=self.project_id
+        )
+
+        self._patch_schema(old_schema=table_schema, new_schema={"fields": self.schema_fields})
+
+        return bq_hook.update_table(
+            table_resource={"schema": table_schema},
+            fields=["schema"],
+            dataset_id=self.dataset_id,
+            table_id=self.table_id,
+            project_id=self.project_id,
+        )

Review comment:
       do you think it better if I just define the function inside the execute scope? I mean, I could add all the logic from the Operator into a new hook - but then I don't see the point of separating hooks and operators, if operators are just wrappers




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] thejens commented on a change in pull request #15367: Implement BigQuery Table Schema Patch Operator

Posted by GitBox <gi...@apache.org>.
thejens commented on a change in pull request #15367:
URL: https://github.com/apache/airflow/pull/15367#discussion_r614928939



##########
File path: airflow/providers/google/cloud/operators/bigquery.py
##########
@@ -2039,6 +2039,149 @@ def execute(self, context) -> None:
         )
 
 
+class BigQueryPatchTableSchemaOperator(BaseOperator):
+    """
+    Patch BigQuery Table Schema
+    Updates fields on a table schema based on contents of the supplied schema
+    parameter. The supplied schema does not need to be complete, if the field
+    already exists in the schema you only need to supply a schema with the
+    fields you want to patch and the "name" key set on the schema resource.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the guide:
+        :ref:`howto/operator:BigQueryPatchTableSchemaOperator`
+
+    :param dataset_id: A dotted
+        ``(<project>.|<project>:)<dataset>`` that indicates which dataset
+        will be updated. (templated)
+    :type dataset_id: str
+    :param schema_fields: a partial schema resource. see
+        https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#TableSchema
+
+    **Example**: ::
+
+        schema_fields=[
+            {"name": "emp_name", "description": "Some New Description"},
+            {"name": "salary", "description": "Some New Description"},
+            {"name": "departments", "fields": [
+                {"name": "name", "description": "Some New Description"},
+                {"name": "type", "description": "Some New Description"}
+            ]},
+        ]
+
+    :type schema_fields: dict
+    :param project_id: The name of the project where we want to update the dataset.
+        Don't need to provide, if projectId in dataset_reference.
+    :type project_id: str
+    :param gcp_conn_id: (Optional) The connection ID used to connect to Google Cloud.
+    :type gcp_conn_id: str
+    :param bigquery_conn_id: (Deprecated) The connection ID used to connect to Google Cloud.
+        This parameter has been deprecated. You should pass the gcp_conn_id parameter instead.
+    :type bigquery_conn_id: str
+    :param delegate_to: The account to impersonate, if any.
+        For this to work, the service account making the request must have domain-wide
+        delegation enabled.
+    :type delegate_to: str
+    :param location: The location used for the operation.
+    :type location: str
+    :param impersonation_chain: Optional service account to impersonate using short-term
+        credentials, or chained list of accounts required to get the access_token
+        of the last account in the list, which will be impersonated in the request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding identity, with first
+        account from the list granting this role to the originating account (templated).
+    :type impersonation_chain: Union[str, Sequence[str]]
+    """
+
+    template_fields = (
+        'schema_fields',
+        'dataset_id',
+        'table_id',
+        'project_id',
+        'impersonation_chain',
+    )
+    template_fields_renderers = {"schema_fields": "json"}
+    ui_color = BigQueryUIColors.TABLE.value
+
+    @classmethod
+    def _patch_schema(cls, old_schema, new_schema):
+        # Updates the content of "old_schema" with
+        # the fields from new_schema. Makes changes
+        # in place and hence has no return value.
+        # Works recursively for sub-records
+
+        # Start by turning the schema list of fields into
+        # a dict keyed on the field name for both the old
+        # and the new schema
+        new_fields = {field["name"]: field for field in new_schema["fields"] if "name" in field}
+        old_fields = {field["name"]: field for field in old_schema["fields"]}
+
+        # Iterate over all new fields and update the
+        # old_schema dict
+        for field_key in new_fields.keys():
+            # Check if the field exists in the old_schema, if
+            # so change it
+            if field_key in old_fields:
+                old_field = old_fields[field_key]
+                new_field = new_fields[field_key]
+                # Check if recursion is needed
+                if "fields" in new_field:
+                    cls._patch_schema(old_field, new_field)
+                    del new_field["fields"]
+
+                # Do the update
+                old_field.update(new_field)
+
+            # Field didn't exist, add it as a new field
+            else:
+                old_schema["fields"].append(new_fields[field_key])
+
+    @apply_defaults
+    def __init__(
+        self,
+        *,
+        dataset_id: Optional[str] = None,
+        schema_fields: List[Dict[str, Any]],
+        table_id: Optional[str] = None,
+        project_id: Optional[str] = None,
+        gcp_conn_id: str = 'google_cloud_default',
+        delegate_to: Optional[str] = None,
+        impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+        **kwargs,
+    ) -> None:
+        self.schema_fields = schema_fields
+        self.table_id = table_id
+        self.dataset_id = dataset_id
+        self.project_id = project_id
+        self.gcp_conn_id = gcp_conn_id
+        self.delegate_to = delegate_to
+        self.impersonation_chain = impersonation_chain
+        super().__init__(**kwargs)
+
+    def execute(self, context):
+        bq_hook = BigQueryHook(
+            gcp_conn_id=self.gcp_conn_id,
+            delegate_to=self.delegate_to,
+            impersonation_chain=self.impersonation_chain,
+        )
+
+        table_schema = bq_hook.get_schema(
+            dataset_id=self.dataset_id, table_id=self.table_id, project_id=self.project_id
+        )
+
+        self._patch_schema(old_schema=table_schema, new_schema={"fields": self.schema_fields})
+
+        return bq_hook.update_table(
+            table_resource={"schema": table_schema},
+            fields=["schema"],
+            dataset_id=self.dataset_id,
+            table_id=self.table_id,
+            project_id=self.project_id,
+        )

Review comment:
       I changed names to update instead of patch, however I do consider there to be a difference in that I don't require an entire new schema to be set but rather altering the existing fields. I have no strong feelings for naming though

##########
File path: airflow/providers/google/cloud/operators/bigquery.py
##########
@@ -2039,6 +2039,155 @@ def execute(self, context) -> None:
         )
 
 
+class BigQueryPatchTableSchemaOperator(BaseOperator):
+    """
+    Patch BigQuery Table Schema
+    Updates fields on a table schema based on contents of the supplied schema
+    parameter. The supplied schema does not need to be complete, if the field
+    already exists in the schema you only need to supply a schema with the
+    fields you want to patch and the "name" key set on the schema resource.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the guide:
+        :ref:`howto/operator:BigQueryPatchTableSchemaOperator`
+
+    :param dataset_id: A dotted
+        ``(<project>.|<project>:)<dataset>`` that indicates which dataset
+        will be updated. (templated)
+    :type dataset_id: str
+    :param schema_fields: a partial schema resource. see
+        https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#TableSchema
+
+    **Example**: ::
+
+        schema_fields=[
+            {"name": "emp_name", "description": "Some New Description"},
+            {"name": "salary", "description": "Some New Description"},
+            {"name": "departments", "fields": [
+                {"name": "name", "description": "Some New Description"},
+                {"name": "type", "description": "Some New Description"}
+            ]},
+        ]
+
+    :type schema_fields: dict

Review comment:
       changed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on pull request #15367: Implement BigQuery Table Schema Update Operator

Posted by GitBox <gi...@apache.org>.
potiuk commented on pull request #15367:
URL: https://github.com/apache/airflow/pull/15367#issuecomment-832072118


   Thanks for the thorough reviews @marcosmarxm and @tswast. Great jiob @thejens !


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk merged pull request #15367: Implement BigQuery Table Schema Update Operator

Posted by GitBox <gi...@apache.org>.
potiuk merged pull request #15367:
URL: https://github.com/apache/airflow/pull/15367


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] thejens commented on a change in pull request #15367: Implement BigQuery Table Schema Patch Operator

Posted by GitBox <gi...@apache.org>.
thejens commented on a change in pull request #15367:
URL: https://github.com/apache/airflow/pull/15367#discussion_r614232691



##########
File path: airflow/providers/google/cloud/operators/bigquery.py
##########
@@ -2039,6 +2039,149 @@ def execute(self, context) -> None:
         )
 
 
+class BigQueryPatchTableSchemaOperator(BaseOperator):
+    """
+    Patch BigQuery Table Schema
+    Updates fields on a table schema based on contents of the supplied schema
+    parameter. The supplied schema does not need to be complete, if the field
+    already exists in the schema you only need to supply a schema with the
+    fields you want to patch and the "name" key set on the schema resource.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the guide:
+        :ref:`howto/operator:BigQueryPatchTableSchemaOperator`
+
+    :param dataset_id: A dotted
+        ``(<project>.|<project>:)<dataset>`` that indicates which dataset
+        will be updated. (templated)
+    :type dataset_id: str
+    :param schema_fields: a partial schema resource. see
+        https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#TableSchema
+
+    **Example**: ::
+
+        schema_fields=[
+            {"name": "emp_name", "description": "Some New Description"},
+            {"name": "salary", "description": "Some New Description"},
+            {"name": "departments", "fields": [
+                {"name": "name", "description": "Some New Description"},
+                {"name": "type", "description": "Some New Description"}
+            ]},
+        ]
+
+    :type schema_fields: dict
+    :param project_id: The name of the project where we want to update the dataset.
+        Don't need to provide, if projectId in dataset_reference.
+    :type project_id: str
+    :param gcp_conn_id: (Optional) The connection ID used to connect to Google Cloud.
+    :type gcp_conn_id: str
+    :param bigquery_conn_id: (Deprecated) The connection ID used to connect to Google Cloud.
+        This parameter has been deprecated. You should pass the gcp_conn_id parameter instead.
+    :type bigquery_conn_id: str
+    :param delegate_to: The account to impersonate, if any.
+        For this to work, the service account making the request must have domain-wide
+        delegation enabled.
+    :type delegate_to: str
+    :param location: The location used for the operation.
+    :type location: str
+    :param impersonation_chain: Optional service account to impersonate using short-term
+        credentials, or chained list of accounts required to get the access_token
+        of the last account in the list, which will be impersonated in the request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding identity, with first
+        account from the list granting this role to the originating account (templated).
+    :type impersonation_chain: Union[str, Sequence[str]]
+    """
+
+    template_fields = (
+        'schema_fields',
+        'dataset_id',
+        'table_id',
+        'project_id',
+        'impersonation_chain',
+    )
+    template_fields_renderers = {"schema_fields": "json"}
+    ui_color = BigQueryUIColors.TABLE.value
+
+    @classmethod
+    def _patch_schema(cls, old_schema, new_schema):
+        # Updates the content of "old_schema" with

Review comment:
       .... you can check the unit-test for an example if you want




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] marcosmarxm commented on a change in pull request #15367: Implement BigQuery Table Schema Patch Operator

Posted by GitBox <gi...@apache.org>.
marcosmarxm commented on a change in pull request #15367:
URL: https://github.com/apache/airflow/pull/15367#discussion_r614317645



##########
File path: airflow/providers/google/cloud/operators/bigquery.py
##########
@@ -2039,6 +2039,149 @@ def execute(self, context) -> None:
         )
 
 
+class BigQueryPatchTableSchemaOperator(BaseOperator):
+    """
+    Patch BigQuery Table Schema
+    Updates fields on a table schema based on contents of the supplied schema
+    parameter. The supplied schema does not need to be complete, if the field
+    already exists in the schema you only need to supply a schema with the
+    fields you want to patch and the "name" key set on the schema resource.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the guide:
+        :ref:`howto/operator:BigQueryPatchTableSchemaOperator`
+
+    :param dataset_id: A dotted
+        ``(<project>.|<project>:)<dataset>`` that indicates which dataset
+        will be updated. (templated)
+    :type dataset_id: str
+    :param schema_fields: a partial schema resource. see
+        https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#TableSchema
+
+    **Example**: ::
+
+        schema_fields=[
+            {"name": "emp_name", "description": "Some New Description"},
+            {"name": "salary", "description": "Some New Description"},
+            {"name": "departments", "fields": [
+                {"name": "name", "description": "Some New Description"},
+                {"name": "type", "description": "Some New Description"}
+            ]},
+        ]
+
+    :type schema_fields: dict
+    :param project_id: The name of the project where we want to update the dataset.
+        Don't need to provide, if projectId in dataset_reference.
+    :type project_id: str
+    :param gcp_conn_id: (Optional) The connection ID used to connect to Google Cloud.
+    :type gcp_conn_id: str
+    :param bigquery_conn_id: (Deprecated) The connection ID used to connect to Google Cloud.
+        This parameter has been deprecated. You should pass the gcp_conn_id parameter instead.
+    :type bigquery_conn_id: str
+    :param delegate_to: The account to impersonate, if any.
+        For this to work, the service account making the request must have domain-wide
+        delegation enabled.
+    :type delegate_to: str
+    :param location: The location used for the operation.
+    :type location: str
+    :param impersonation_chain: Optional service account to impersonate using short-term
+        credentials, or chained list of accounts required to get the access_token
+        of the last account in the list, which will be impersonated in the request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding identity, with first
+        account from the list granting this role to the originating account (templated).
+    :type impersonation_chain: Union[str, Sequence[str]]
+    """
+
+    template_fields = (
+        'schema_fields',
+        'dataset_id',
+        'table_id',
+        'project_id',
+        'impersonation_chain',
+    )
+    template_fields_renderers = {"schema_fields": "json"}
+    ui_color = BigQueryUIColors.TABLE.value
+
+    @classmethod
+    def _patch_schema(cls, old_schema, new_schema):
+        # Updates the content of "old_schema" with

Review comment:
       ```suggestion
       def _patch_schema(cls, old_schema: Dict, new_schema: Dict) -> None:
           """
           Updates the content of "old_schema" with
           the fields from new_schema. Makes changes
           in place and hence has no return value.
           Works recursively for sub-records.
   
           Start by turning the schema list of fields into
           a dict keyed on the field name for both the old
           and the new schema.
           
           :param old_schema: Old schema...
           :type old_schema: dict
           """
   ```
   
   This way information about the function itself will be render in Airflow website and users have more information about it, take a look [in an example here](
   https://airflow.apache.org/docs/apache-airflow-providers-google/stable/_api/airflow/providers/google/cloud/hooks/bigquery/index.html#airflow.providers.google.cloud.hooks.bigquery.BigQueryHook.get_schema)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #15367: Implement BigQuery Table Schema Patch Operator

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #15367:
URL: https://github.com/apache/airflow/pull/15367#issuecomment-821268825


   [The Workflow run](https://github.com/apache/airflow/actions/runs/756152896) is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks,^Build docs$,^Spell check docs$,^Provider packages,^Checks: Helm tests$,^Test OpenAPI*.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org