You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/06/08 23:43:29 UTC

[GitHub] [airflow] ekarsten opened a new issue #16340: Allow user to pass parameters to PostgresHook in PostgresOperator

ekarsten opened a new issue #16340:
URL: https://github.com/apache/airflow/issues/16340


   **Description**
   
   Currently, the [PostgresOperator](https://github.com/apache/airflow/blob/main/airflow/providers/postgres/operators/postgres.py) defines the database hook in a fixed way: 
   ```self.hook = PostgresHook(postgres_conn_id=self.postgres_conn_id, schema=self.database)```
   This makes the operator inflexible for certain tech stacks. For example, long queries in Redshift will sometimes fail (in my experience) with `psycopg2.operationalerror: SSL SYSCALL error: EOF detected` because the hook timed out on the Redshift side of things. The workaround (described in this [StackExchange post](https://stackoverflow.com/a/63130830/10131478)) that worked for me is to specify some additional arguments in the Hook (see below) and then write my own custom operator. 
   ```
   PostgresHook(
           postgres_conn_id=postgres_conn_id,
           extra={
               "keepalives": 1,
               "keepalives_idle": 30,
               "keepalives_interval": 5,
               "keepalives_count": 5,
           }
       )
   ```
   Now I don't think the default implementation of the PostgresOperator should have these extra parameters, I just think it should be modified to be more flexible so that users can pass arguments to the hook as well. My proposed solution (and I will open a PR after some community input on this solution) would be either 
   
   OPTION 1: (User-specified hook)
   
   ```
       def __init__(
           self,
           *,
           sql: str,
           postgres_conn_id: str = 'postgres_default',
           autocommit: bool = False,
           parameters: Optional[Union[Mapping, Iterable]] = None,
           database: Optional[str] = None,
           hook = PostgresHook(postgres_conn_id=self.postgres_conn_id, schema=self.database),
           **kwargs,
       ) -> None:
           super().__init__(**kwargs)
           self.sql = sql
           self.postgres_conn_id = postgres_conn_id
           self.autocommit = autocommit
           self.parameters = parameters
           self.database = database
           self.hook = hook
   
       def execute(self, context):
           self.log.info('Executing: %s', self.sql)
           self.hook.run(self.sql, self.autocommit, parameters=self.parameters)
           for output in self.hook.conn.notices:
               self.log.info(output)
   ```
   
   OPTION 2: (Pass extra params to hook)
   
   ```
       def __init__(
           self,
           *,
           sql: str,
           postgres_conn_id: str = 'postgres_default',
           autocommit: bool = False,
           parameters: Optional[Union[Mapping, Iterable]] = None,
           database: Optional[str] = None,
           **kwargs,
       ) -> None:
           super().__init__(**kwargs)
           self.sql = sql
           self.postgres_conn_id = postgres_conn_id
           self.autocommit = autocommit
           self.parameters = parameters
           self.database = database
           self.extra = kwargs.pop("extra", None)
           self.hook = None
   
       def execute(self, context):
           self.log.info('Executing: %s', self.sql)
           self.hook = PostgresHook(
               postgres_conn_id=self.postgres_conn_id,
               schema=self.database,
               extra=self.extra
           )
           self.hook.run(self.sql, self.autocommit, parameters=self.parameters)
           for output in self.hook.conn.notices:
               self.log.info(output)
   ```
   
   I slightly favor Option 2 because I don't think one should be using the PostgresOperator with a hook other than the PostgresHook, so really all we want is for users to be able to pass the named parameters through.
   
   **Use case / motivation**
   
   This feature will make the PostgresOperator more flexible, allowing users to pass more arguments to the PostgresHook to make better use of its features.
   
   **Are you willing to submit a PR?**
   
   Yes, I would be happy to submit a PR.
   
   **Related Issues**
   
   I couldn't find a related issue, but those more familiar with the project might identify this as a duplicate and I would be happy to delete if that is the case.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ekarsten closed issue #16340: Allow user to pass parameters to PostgresHook in PostgresOperator

Posted by GitBox <gi...@apache.org>.
ekarsten closed issue #16340:
URL: https://github.com/apache/airflow/issues/16340


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ekarsten commented on issue #16340: Allow user to pass parameters to PostgresHook in PostgresOperator

Posted by GitBox <gi...@apache.org>.
ekarsten commented on issue #16340:
URL: https://github.com/apache/airflow/issues/16340#issuecomment-857898355


   I think this issue may not be a real issue then, it's just a case of user error on my part. I didn't realize these parameters should be passed through the connection, and in troubleshooting my error, I found a hacky way to pass them through the Hook (I have no idea why what I did works, but in my testing it really does appear to consistently work when I run slow insert statements and consistently not work when I take away the extras being passed).
   
   I'm going to close this issue now.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on issue #16340: Allow user to pass parameters to PostgresHook in PostgresOperator

Posted by GitBox <gi...@apache.org>.
potiuk commented on issue #16340:
URL: https://github.com/apache/airflow/issues/16340#issuecomment-857634303


   Option 2 is better but those parameters should be passed as 'extras' of the connection. Airflow already has support for those parameters to be passed as extras and they can be passed as json-encoded __extra parameter. You can then even add UI fields to be able to configure those fields via UI.
    See https://airflow.apache.org/docs/apache-airflow/stable/howto/connection.html#connection-uri-format and there are a number of other hooks that are using this mechanism (including extra_dejson method to get those parameter value as json)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] boring-cyborg[bot] commented on issue #16340: Allow user to pass parameters to PostgresHook in PostgresOperator

Posted by GitBox <gi...@apache.org>.
boring-cyborg[bot] commented on issue #16340:
URL: https://github.com/apache/airflow/issues/16340#issuecomment-857264068


   Thanks for opening your first issue here! Be sure to follow the issue template!
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org