You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/01/06 10:34:51 UTC

[GitHub] [airflow] tuanchris opened a new pull request #13510: Add functionality for the SalesforceToGcsOperator

tuanchris opened a new pull request #13510:
URL: https://github.com/apache/airflow/pull/13510


   - Add the ability to write Salesforce schema (to be used later to load to BigQuery) when extracting to GCS. 
   - Added the ability to select all fields (select *) from a Salesforce object. 
   
   
   <!--
   Thank you for contributing! Please make sure that your code changes
   are covered with tests. And in case of new features or big changes
   remember to adjust the documentation.
   
   Feel free to ping committers for the review!
   
   In case of existing issue, reference it using one of the following:
   
   closes: #ISSUE
   related: #ISSUE
   
   How to write a good git commit message:
   http://chris.beams.io/posts/git-commit/
   -->
   
   ---
   **^ Add meaningful description above**
   
   Read the **[Pull Request Guidelines](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#pull-request-guidelines)** for more information.
   In case of fundamental code change, Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals)) is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in [UPDATING.md](https://github.com/apache/airflow/blob/master/UPDATING.md).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dstandish commented on a change in pull request #13510: Add functionality for the SalesforceToGcsOperator

Posted by GitBox <gi...@apache.org>.
dstandish commented on a change in pull request #13510:
URL: https://github.com/apache/airflow/pull/13510#discussion_r574233410



##########
File path: airflow/providers/salesforce/hooks/salesforce.py
##########
@@ -146,14 +155,17 @@ def get_object_from_salesforce(self, obj: str, fields: Iterable[str]) -> dict:
         :return: all instances of the object from Salesforce.
         :rtype: dict
         """
+        if not fields:
+            fields = self.get_available_fields(obj)

Review comment:
       hmm... so you make it optional, but if not supplied, it will fail if not object...
   
   starting to wonder if this thould be a subclass `SalesforceObjectToGcsOperator` instead of parameter

##########
File path: airflow/providers/google/cloud/transfers/salesforce_to_gcs.py
##########
@@ -121,5 +126,22 @@ def execute(self, context: Dict):
             )
 
             gcs_uri = f"gs://{self.bucket_name}/{self.object_name}"
-            self.log.info("%s uploaded to GCS", gcs_uri)
-            return gcs_uri
+            return_val = {'data': gcs_uri}
+            self.log.info("Data file: %s uploaded to GCS", gcs_uri)
+
+            # Writing schema only works if a salesforce_object is defined

Review comment:
       > Writing schema only works if a salesforce_object is defined
   
   then i'd raise for this combination in init so doesnt just silently fail

##########
File path: airflow/providers/salesforce/hooks/salesforce.py
##########
@@ -336,3 +348,44 @@ def object_to_df(
             df["time_fetched_from_salesforce"] = fetched_time
 
         return df
+
+    def write_schema_to_file(self, schema_file_name: str, obj: str, record_time_added: bool):
+        """
+        Export BigQuery schema to file
+
+        Write a JSON schema file with data types inferred from the Salesforce client.
+
+        :param schema_file_name: file name of the schema file (e.g schema.JSON)
+        :type schema_file_name: str
+        :param obj: Salesforce object name (e.g. Contact, Asset)
+        :type obj: str
+        :param record_time_added: Wheter to add a time_fetched_from_salesforce column
+        :type record_time_added: bool
+        """
+        descriptions = self.describe_object(obj)
+        fields = [field['name'] for field in descriptions['fields']]
+
+        data_types = [field['type'] for field in descriptions['fields']]
+        field_types = [self.type_map.get(data_type, "STRING") for data_type in data_types]
+
+        if record_time_added:
+            fields.append('time_fetched_from_salesforce')
+            field_types.append('TIMESTAMP')
+
+        schema = []
+
+        for field, field_type in zip(fields, field_types):
+            obj = {
+                'name': field,
+                'type': field_type,
+                'mode': 'NULLABLE',
+            }
+            schema.append(obj)
+
+        json_strings = json.dumps(schema)
+
+        json_file = open(schema_file_name, "w")
+        json_file.write(json_strings)
+        json_file.close()

Review comment:
       use ctx mgr

##########
File path: airflow/providers/salesforce/hooks/salesforce.py
##########
@@ -336,3 +348,44 @@ def object_to_df(
             df["time_fetched_from_salesforce"] = fetched_time
 
         return df
+
+    def write_schema_to_file(self, schema_file_name: str, obj: str, record_time_added: bool):
+        """
+        Export BigQuery schema to file
+
+        Write a JSON schema file with data types inferred from the Salesforce client.
+
+        :param schema_file_name: file name of the schema file (e.g schema.JSON)

Review comment:
       i'd suggest calling this parameter just filename or path
   
   theres no other filenames it could be here and sometimes adding prefix in this scenario makes you second guess... is there something else i'm missing?
   
   

##########
File path: airflow/providers/salesforce/hooks/salesforce.py
##########
@@ -131,7 +140,7 @@ def get_available_fields(self, obj: str) -> List[str]:
 
         return [field['name'] for field in obj_description['fields']]
 
-    def get_object_from_salesforce(self, obj: str, fields: Iterable[str]) -> dict:
+    def get_object_from_salesforce(self, obj: str, fields: Iterable[str] = '', **kwargs) -> dict:

Review comment:
       ```suggestion
       def get_object_from_salesforce(self, obj: str, fields: Iterable[str] = None, **kwargs) -> dict:
   ```

##########
File path: airflow/providers/salesforce/hooks/salesforce.py
##########
@@ -336,3 +348,44 @@ def object_to_df(
             df["time_fetched_from_salesforce"] = fetched_time
 
         return df
+
+    def write_schema_to_file(self, schema_file_name: str, obj: str, record_time_added: bool):

Review comment:
       are you certain schema cannot be inferred from query result? e.g. with pyodbc you can

##########
File path: airflow/providers/google/cloud/transfers/salesforce_to_gcs.py
##########
@@ -121,5 +126,22 @@ def execute(self, context: Dict):
             )
 
             gcs_uri = f"gs://{self.bucket_name}/{self.object_name}"
-            self.log.info("%s uploaded to GCS", gcs_uri)
-            return gcs_uri
+            return_val = {'data': gcs_uri}

Review comment:
       this is a breaking change. why?

##########
File path: airflow/providers/salesforce/hooks/salesforce.py
##########
@@ -336,3 +348,44 @@ def object_to_df(
             df["time_fetched_from_salesforce"] = fetched_time
 
         return df
+
+    def write_schema_to_file(self, schema_file_name: str, obj: str, record_time_added: bool):
+        """
+        Export BigQuery schema to file
+
+        Write a JSON schema file with data types inferred from the Salesforce client.
+
+        :param schema_file_name: file name of the schema file (e.g schema.JSON)
+        :type schema_file_name: str
+        :param obj: Salesforce object name (e.g. Contact, Asset)
+        :type obj: str
+        :param record_time_added: Wheter to add a time_fetched_from_salesforce column

Review comment:
       `record_time_added` somewhat confusing name
   
   record can imply db record in this context
   
   fetched better than added
   
   maybe just `add_timestamp`? 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #13510: Add functionality for the SalesforceToGcsOperator

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #13510:
URL: https://github.com/apache/airflow/pull/13510#issuecomment-812945492


   This pull request has been automatically marked as stale because it has not had recent activity. It will be closed in 5 days if no further activity occurs. Thank you for your contributions.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] closed pull request #13510: Add functionality for the SalesforceToGcsOperator

Posted by GitBox <gi...@apache.org>.
github-actions[bot] closed pull request #13510:
URL: https://github.com/apache/airflow/pull/13510


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dstandish commented on pull request #13510: Add functionality for the SalesforceToGcsOperator

Posted by GitBox <gi...@apache.org>.
dstandish commented on pull request #13510:
URL: https://github.com/apache/airflow/pull/13510#issuecomment-777186026


   i suggest chg title `Add export object to SalesforceToGcsOperator`


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org