You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/04/24 13:17:23 UTC

[GitHub] [airflow] thejens commented on a change in pull request #15367: Implement BigQuery Table Schema Update Operator

thejens commented on a change in pull request #15367:
URL: https://github.com/apache/airflow/pull/15367#discussion_r619658132



##########
File path: airflow/providers/google/cloud/hooks/bigquery.py
##########
@@ -1353,6 +1353,86 @@ def get_schema(self, dataset_id: str, table_id: str, project_id: Optional[str] =
         table = self.get_client(project_id=project_id).get_table(table_ref)
         return {"fields": [s.to_api_repr() for s in table.schema]}
 
+    @GoogleBaseHook.fallback_to_default_project_id
+    def update_table_schema(
+        self,
+        schema_fields_updates: List[Dict[str, Any]],
+        dataset_id: str,
+        table_id: str,
+        project_id: Optional[str] = None,
+    ) -> None:
+        """
+        Update fields within a schema for a given dataset and table. Note that
+        some fields in schemas are immutable and trying to change them will cause
+        an exception.
+        If a new field is included it will be inserted which requires all required fields to be set.
+        See https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#TableSchema
+
+        :param dataset_id: the dataset ID of the requested table to be updated
+        :type dataset_id: str
+        :param table_id: the table ID of the table to be updated
+        :type table_id: str
+        :param schema_fields_updates: a partial schema resource. see
+            https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#TableSchema
+
+        **Example**: ::
+
+            schema_fields_updates=[
+                {"name": "emp_name", "description": "Some New Description"},
+                {"name": "salary", "description": "Some New Description"},
+                {"name": "departments", "fields": [
+                    {"name": "name", "description": "Some New Description"},
+                    {"name": "type", "description": "Some New Description"}
+                ]},
+            ]
+
+        :type schema_fields_updates: List[dict]
+        :param project_id: The name of the project where we want to update the table.
+        :type project_id: str
+        """
+
+        def _build_new_schema(
+            current_schema: List[Dict[str, Any]], schema_fields_updates: List[Dict[str, Any]]
+        ) -> List[Dict[str, Any]]:
+
+            # Turn schema_field_updates into a dict keyed on field names
+            schema_fields_updates = {field["name"]: field for field in deepcopy(schema_fields_updates)}
+
+            # Create a new dict for storing the new schema, initated based on the current_schema
+            # as of Python 3.6, dicts retain order.
+            new_schema = {field["name"]: field for field in deepcopy(current_schema)}
+
+            # Each item in schema_fields_updates contains a potential patch
+            # to a schema field, iterate over them
+            for field_name, patched_value in schema_fields_updates.items():
+                # If this field already exists, update it
+                if field_name in new_schema:
+                    # If this field is of type RECORD and has a fields key we need to patch it recursively
+                    if "fields" in patched_value:
+                        patched_value["fields"] = _build_new_schema(
+                            new_schema[field_name]["fields"], patched_value["fields"]
+                        )
+                    # Update the new_schema with the patched value
+                    new_schema[field_name].update(patched_value)
+                # This is a new field, just include the whole configuration for it
+                else:
+                    new_schema[field_name] = patched_value
+
+            return list(new_schema.values())
+
+        current_table_schema = self.get_schema(

Review comment:
       Well, if the current table has policyTags and you filter them out from the schema included in the update, it is my understanding that you will in effect delete them - which you need access to do. Leaving them unchanged may work though I haven't actually tried. Worst case, you will need additional access level to modify any schema on a table with policyTags.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org