You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2019/05/13 15:59:26 UTC

[GitHub] [airflow] bryanyang0528 commented on a change in pull request #4996: [AIRFLOW-4184] Add an athena helper to insert into table

bryanyang0528 commented on a change in pull request #4996: [AIRFLOW-4184] Add an athena helper to insert into table
URL: https://github.com/apache/airflow/pull/4996#discussion_r279811535
 
 

 ##########
 File path: airflow/contrib/hooks/aws_athena_hook.py
 ##########
 @@ -148,3 +156,105 @@ def stop_query(self, query_execution_id):
         :return: dict
         """
         return self.conn.stop_query_execution(QueryExecutionId=query_execution_id)
+
+
+class AWSAthenaHelpers(AWSAthenaHook):
+    """
+    The Athena Helpers contains helper methods to execute queries against
+    Athena. The methods can be used directly by operators.
+    """
+
+    def __init__(self, aws_conn_id='aws_default', region_name=None, *args, **kwargs):
+        super(AWSAthenaHelpers, self).__init__(
+            aws_conn_id=aws_conn_id, region_name=region_name, **kwargs)
+        self.region_name = region_name
+        self.s3_hook = None
+        self.glue_hook = None
+
+    def get_s3_hook(self):
+        """
+        check if s3 hook exists already or create one and return it
+        :return: s3 hook
+        """
+        if not self.s3_hook:
+            self.s3_hook = S3Hook(
+                aws_conn_id=self.aws_conn_id, verify=self.verify)
+        return self.s3_hook
+
+    def get_glue_hook(self):
+        """
+        check if glue hook exists already or create one and return it
+        :return: glue hook
+        """
+        if not self.glue_hook:
+            self.glue_hook = AwsGlueCatalogHook(
+                aws_conn_id=self.aws_conn_id, region_name=self.region_name)
+        return self.glue_hook
+
+    def run_insert_into_table(self, src_db, src_table, dst_db, dst_table, mode='error'):
+        """
+        insert data in s3 from the source table to the destination table
 
 Review comment:
   @RosterIn  Thank you for your suggestion. I knew `ATLER TABLE` can assign an s3 location for partitions. But I would like to move data to the same s3 location after creating a new partition.
   
   For example, I have a table called `user_profile` in which I aggregated user behavior data on my website. I have to select data from the source table `daily` and insert the result to  `user_profile`. In this case, in Hive, I just use `insert into table user_profile partition (dt) as select count(*) from src_table`.   And data will be put into the same location as the target table.
   
   Maybe I could combine `Alter table` and `s3 copy` to implement this idea. Does it make sense?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services