You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/04/23 21:18:59 UTC

[GitHub] [airflow] tswast commented on a change in pull request #15367: Implement BigQuery Table Schema Update Operator

tswast commented on a change in pull request #15367:
URL: https://github.com/apache/airflow/pull/15367#discussion_r619500931



##########
File path: airflow/providers/google/cloud/hooks/bigquery.py
##########
@@ -1353,6 +1353,86 @@ def get_schema(self, dataset_id: str, table_id: str, project_id: Optional[str] =
         table = self.get_client(project_id=project_id).get_table(table_ref)
         return {"fields": [s.to_api_repr() for s in table.schema]}
 
+    @GoogleBaseHook.fallback_to_default_project_id
+    def update_table_schema(
+        self,
+        schema_fields_updates: List[Dict[str, Any]],
+        dataset_id: str,
+        table_id: str,
+        project_id: Optional[str] = None,
+    ) -> None:
+        """
+        Update fields within a schema for a given dataset and table. Note that
+        some fields in schemas are immutable and trying to change them will cause
+        an exception.
+        If a new field is included it will be inserted which requires all required fields to be set.
+        See https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#TableSchema
+
+        :param dataset_id: the dataset ID of the requested table to be updated
+        :type dataset_id: str
+        :param table_id: the table ID of the table to be updated
+        :type table_id: str
+        :param schema_fields_updates: a partial schema resource. see
+            https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#TableSchema
+
+        **Example**: ::
+
+            schema_fields_updates=[
+                {"name": "emp_name", "description": "Some New Description"},
+                {"name": "salary", "description": "Some New Description"},
+                {"name": "departments", "fields": [
+                    {"name": "name", "description": "Some New Description"},
+                    {"name": "type", "description": "Some New Description"}
+                ]},
+            ]
+
+        :type schema_fields_updates: List[dict]
+        :param project_id: The name of the project where we want to update the table.
+        :type project_id: str
+        """
+
+        def _build_new_schema(
+            current_schema: List[Dict[str, Any]], schema_fields_updates: List[Dict[str, Any]]
+        ) -> List[Dict[str, Any]]:
+
+            # Turn schema_field_updates into a dict keyed on field names
+            schema_fields_updates = {field["name"]: field for field in deepcopy(schema_fields_updates)}
+
+            # Create a new dict for storing the new schema, initated based on the current_schema
+            # as of Python 3.6, dicts retain order.
+            new_schema = {field["name"]: field for field in deepcopy(current_schema)}
+
+            # Each item in schema_fields_updates contains a potential patch
+            # to a schema field, iterate over them
+            for field_name, patched_value in schema_fields_updates.items():
+                # If this field already exists, update it
+                if field_name in new_schema:
+                    # If this field is of type RECORD and has a fields key we need to patch it recursively
+                    if "fields" in patched_value:
+                        patched_value["fields"] = _build_new_schema(
+                            new_schema[field_name]["fields"], patched_value["fields"]
+                        )
+                    # Update the new_schema with the patched value
+                    new_schema[field_name].update(patched_value)
+                # This is a new field, just include the whole configuration for it
+                else:
+                    new_schema[field_name] = patched_value
+
+            return list(new_schema.values())
+
+        current_table_schema = self.get_schema(

Review comment:
       I see others have mentioned `policyTags`, but I don't see any logic here to filter out those keys from the `current_table_schema`.
   
   If you leave those in your table update request, it'll require extra permissions, even if you aren't actually making changes. See: https://github.com/googleapis/python-bigquery/pull/557




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org