You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/02/11 03:44:18 UTC

[GitHub] [airflow] dstandish commented on a change in pull request #13510: Add functionality for the SalesforceToGcsOperator

dstandish commented on a change in pull request #13510:
URL: https://github.com/apache/airflow/pull/13510#discussion_r574233410



##########
File path: airflow/providers/salesforce/hooks/salesforce.py
##########
@@ -146,14 +155,17 @@ def get_object_from_salesforce(self, obj: str, fields: Iterable[str]) -> dict:
         :return: all instances of the object from Salesforce.
         :rtype: dict
         """
+        if not fields:
+            fields = self.get_available_fields(obj)

Review comment:
       hmm... so you make it optional, but if not supplied, it will fail if not object...
   
   starting to wonder if this thould be a subclass `SalesforceObjectToGcsOperator` instead of parameter

##########
File path: airflow/providers/google/cloud/transfers/salesforce_to_gcs.py
##########
@@ -121,5 +126,22 @@ def execute(self, context: Dict):
             )
 
             gcs_uri = f"gs://{self.bucket_name}/{self.object_name}"
-            self.log.info("%s uploaded to GCS", gcs_uri)
-            return gcs_uri
+            return_val = {'data': gcs_uri}
+            self.log.info("Data file: %s uploaded to GCS", gcs_uri)
+
+            # Writing schema only works if a salesforce_object is defined

Review comment:
       > Writing schema only works if a salesforce_object is defined
   
   then i'd raise for this combination in init so doesnt just silently fail

##########
File path: airflow/providers/salesforce/hooks/salesforce.py
##########
@@ -336,3 +348,44 @@ def object_to_df(
             df["time_fetched_from_salesforce"] = fetched_time
 
         return df
+
+    def write_schema_to_file(self, schema_file_name: str, obj: str, record_time_added: bool):
+        """
+        Export BigQuery schema to file
+
+        Write a JSON schema file with data types inferred from the Salesforce client.
+
+        :param schema_file_name: file name of the schema file (e.g schema.JSON)
+        :type schema_file_name: str
+        :param obj: Salesforce object name (e.g. Contact, Asset)
+        :type obj: str
+        :param record_time_added: Wheter to add a time_fetched_from_salesforce column
+        :type record_time_added: bool
+        """
+        descriptions = self.describe_object(obj)
+        fields = [field['name'] for field in descriptions['fields']]
+
+        data_types = [field['type'] for field in descriptions['fields']]
+        field_types = [self.type_map.get(data_type, "STRING") for data_type in data_types]
+
+        if record_time_added:
+            fields.append('time_fetched_from_salesforce')
+            field_types.append('TIMESTAMP')
+
+        schema = []
+
+        for field, field_type in zip(fields, field_types):
+            obj = {
+                'name': field,
+                'type': field_type,
+                'mode': 'NULLABLE',
+            }
+            schema.append(obj)
+
+        json_strings = json.dumps(schema)
+
+        json_file = open(schema_file_name, "w")
+        json_file.write(json_strings)
+        json_file.close()

Review comment:
       use ctx mgr

##########
File path: airflow/providers/salesforce/hooks/salesforce.py
##########
@@ -336,3 +348,44 @@ def object_to_df(
             df["time_fetched_from_salesforce"] = fetched_time
 
         return df
+
+    def write_schema_to_file(self, schema_file_name: str, obj: str, record_time_added: bool):
+        """
+        Export BigQuery schema to file
+
+        Write a JSON schema file with data types inferred from the Salesforce client.
+
+        :param schema_file_name: file name of the schema file (e.g schema.JSON)

Review comment:
       i'd suggest calling this parameter just filename or path
   
   theres no other filenames it could be here and sometimes adding prefix in this scenario makes you second guess... is there something else i'm missing?
   
   

##########
File path: airflow/providers/salesforce/hooks/salesforce.py
##########
@@ -131,7 +140,7 @@ def get_available_fields(self, obj: str) -> List[str]:
 
         return [field['name'] for field in obj_description['fields']]
 
-    def get_object_from_salesforce(self, obj: str, fields: Iterable[str]) -> dict:
+    def get_object_from_salesforce(self, obj: str, fields: Iterable[str] = '', **kwargs) -> dict:

Review comment:
       ```suggestion
       def get_object_from_salesforce(self, obj: str, fields: Iterable[str] = None, **kwargs) -> dict:
   ```

##########
File path: airflow/providers/salesforce/hooks/salesforce.py
##########
@@ -336,3 +348,44 @@ def object_to_df(
             df["time_fetched_from_salesforce"] = fetched_time
 
         return df
+
+    def write_schema_to_file(self, schema_file_name: str, obj: str, record_time_added: bool):

Review comment:
       are you certain schema cannot be inferred from query result? e.g. with pyodbc you can

##########
File path: airflow/providers/google/cloud/transfers/salesforce_to_gcs.py
##########
@@ -121,5 +126,22 @@ def execute(self, context: Dict):
             )
 
             gcs_uri = f"gs://{self.bucket_name}/{self.object_name}"
-            self.log.info("%s uploaded to GCS", gcs_uri)
-            return gcs_uri
+            return_val = {'data': gcs_uri}

Review comment:
       this is a breaking change. why?

##########
File path: airflow/providers/salesforce/hooks/salesforce.py
##########
@@ -336,3 +348,44 @@ def object_to_df(
             df["time_fetched_from_salesforce"] = fetched_time
 
         return df
+
+    def write_schema_to_file(self, schema_file_name: str, obj: str, record_time_added: bool):
+        """
+        Export BigQuery schema to file
+
+        Write a JSON schema file with data types inferred from the Salesforce client.
+
+        :param schema_file_name: file name of the schema file (e.g schema.JSON)
+        :type schema_file_name: str
+        :param obj: Salesforce object name (e.g. Contact, Asset)
+        :type obj: str
+        :param record_time_added: Wheter to add a time_fetched_from_salesforce column

Review comment:
       `record_time_added` somewhat confusing name
   
   record can imply db record in this context
   
   fetched better than added
   
   maybe just `add_timestamp`? 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org