You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/04/29 12:39:50 UTC

[GitHub] [airflow] malthe commented on pull request #15581: Add bindvars Xcom result to Oracle operator

malthe commented on pull request #15581:
URL: https://github.com/apache/airflow/pull/15581#issuecomment-829203997


   The following is an example of an alternative implementation that works with the existing codebase, specifically addressing the requirement of calling a stored procedure that has one or more _out parameters_.
   
   Simply use a dummy parameter with the right Python type for each _out parameter_ (e.g. the integer `0` for a number) and the entire set of parameters (both in/out directions) will be pushed as Xcom (if enabled – which is the default setting).
   
   ```python
   from airflow.models import BaseOperator
   from airflow.providers.oracle.hooks.oracle import OracleHook
   from airflow.utils.decorators import apply_defaults
   
   class OracleCallOperator(BaseOperator):
       ui_color = '#ededed'
   
       @apply_defaults
       def __init__(
           self,
           *,
           function: str,
           oracle_conn_id: str = 'oracle_default',
           parameters: Optional[Union[Mapping, Iterable]] = None,
           autocommit: bool = False,
           **kwargs,
       ) -> None:
           super().__init__(**kwargs)
           self.oracle_conn_id = oracle_conn_id
           self.function = function
           self.autocommit = autocommit
           self.parameters = parameters
   
       def execute(self, context) -> None:
           self.log.info('Executing: %s', self.function)
           hook = OracleHook(oracle_conn_id=self.oracle_conn_id)
   
           args = ", ".join(
               f":{name}" for name in (
                   self.parameters if isinstance(self.parameters, dict) 
                   else range(1, len(self.parameters) + 1)
               )
           )
   
           sql = f"BEGIN {self.function}({args}); END;"
   
           with hook.get_conn() as conn, conn.cursor() as cursor:
               cursor.execute(sql, self.parameters)
               if isinstance(cursor.bindvars, list):
                   return [v.getvalue() for v in cursor.bindvars]
               if isinstance(cursor.bindvars, dict):
                   return {n: v.getvalue() for (n, v) in cursor.bindvars.items()}
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org