You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2019/10/12 00:16:00 UTC

[GitHub] [airflow] milton0825 commented on a change in pull request #6309: [AIRFLOW-3783][AIRFLOW-5395] Switch to HEADER arg to unload data from Redshift to S3

milton0825 commented on a change in pull request #6309: [AIRFLOW-3783][AIRFLOW-5395] Switch to HEADER arg to unload data from Redshift to S3
URL: https://github.com/apache/airflow/pull/6309#discussion_r334209505
 
 

 ##########
 File path: airflow/operators/redshift_to_s3_operator.py
 ##########
 @@ -85,52 +93,16 @@ def __init__(
         self.autocommit = autocommit
         self.include_header = include_header
 
-        if self.include_header and 'PARALLEL OFF' not in [uo.upper().strip() for uo in self.unload_options]:
-            self.unload_options = list(self.unload_options) + ['PARALLEL OFF', ]
+        if self.include_header and 'HEADER' not in [uo.upper().strip() for uo in self.unload_options]:
+            self.unload_options = list(self.unload_options) + ['HEADER', ]
 
     def execute(self, context):
-        self.hook = PostgresHook(postgres_conn_id=self.redshift_conn_id)
-        self.s3 = S3Hook(aws_conn_id=self.aws_conn_id, verify=self.verify)
-        credentials = self.s3.get_credentials()
-        unload_options = '\n\t\t\t'.join(self.unload_options)
-
-        if self.include_header:
-            self.log.info("Retrieving headers from %s.%s...",
-                          self.schema, self.table)
-
-            columns_query = """SELECT column_name
-                                        FROM information_schema.columns
-                                        WHERE table_schema = '{schema}'
-                                        AND   table_name = '{table}'
-                                        ORDER BY ordinal_position
-                            """.format(schema=self.schema,
-                                       table=self.table)
-
-            cursor = self.hook.get_conn().cursor()
-            cursor.execute(columns_query)
-            rows = cursor.fetchall()
-            columns = [row[0] for row in rows]
-            column_names = ', '.join("{0}".format(c) for c in columns)
-            column_headers = ', '.join("\\'{0}\\'".format(c) for c in columns)
-            column_castings = ', '.join("CAST({0} AS text) AS {0}".format(c)
-                                        for c in columns)
-
-            select_query = """SELECT {column_names} FROM
-                                    (SELECT 2 sort_order, {column_castings}
-                                     FROM {schema}.{table}
-                                    UNION ALL
-                                    SELECT 1 sort_order, {column_headers})
-                                 ORDER BY sort_order"""\
-                            .format(column_names=column_names,
-                                    column_castings=column_castings,
-                                    column_headers=column_headers,
-                                    schema=self.schema,
-                                    table=self.table)
-        else:
-            select_query = "SELECT * FROM {schema}.{table}"\
-                .format(schema=self.schema,
-                        table=self.table)
+        postgres_hook = PostgresHook(postgres_conn_id=self.redshift_conn_id)
+        s3_hook = S3Hook(aws_conn_id=self.aws_conn_id, verify=self.verify)
 
+        credentials = s3_hook.get_credentials()
+        unload_options = '\n\t\t\t'.join(self.unload_options)
+        select_query = "SELECT * FROM {schema}.{table}".format(schema=self.schema, table=self.table)
 
 Review comment:
   Should we allow user to choose which columns to select?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services