You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2020/11/30 08:41:59 UTC

[GitHub] [airflow] potiuk commented on a change in pull request #12710: Enable SparkSqlHook to use supplied connections

potiuk commented on a change in pull request #12710:
URL: https://github.com/apache/airflow/pull/12710#discussion_r532423467



##########
File path: airflow/providers/apache/spark/hooks/spark_sql.py
##########
@@ -67,27 +67,50 @@ def __init__(
         executor_memory: Optional[str] = None,
         keytab: Optional[str] = None,
         principal: Optional[str] = None,
-        master: str = 'yarn',

Review comment:
       This is not a backwards-compatible change. Can you please make it works also if someone calls it in the old way ? We have some mechanism (just look for "deprecated") that we mark certain parameters as deprecated, but the rule of thumb we have is that we deprecate first and only after next major release we remove the deprecated features. It seems possible to add (and test) the old way of using the hook here.
   
   Also the docstrings were not updated - but they should be - including the deprecation information and explanation what has been replaced.

##########
File path: airflow/providers/apache/spark/hooks/spark_sql.py
##########
@@ -67,27 +67,50 @@ def __init__(
         executor_memory: Optional[str] = None,
         keytab: Optional[str] = None,
         principal: Optional[str] = None,
-        master: str = 'yarn',
         name: str = 'default-name',
         num_executors: Optional[int] = None,
         verbose: bool = True,
-        yarn_queue: str = 'default',
     ) -> None:
         super().__init__()
         self._sql = sql
         self._conf = conf
-        self._conn = self.get_connection(conn_id)
+        self._conn_id = conn_id
         self._total_executor_cores = total_executor_cores
         self._executor_cores = executor_cores
         self._executor_memory = executor_memory
         self._keytab = keytab
         self._principal = principal
-        self._master = master
         self._name = name
         self._num_executors = num_executors
         self._verbose = verbose
-        self._yarn_queue = yarn_queue
         self._sp: Any = None
+        self._connection = self._resolve_connection()
+
+    def _resolve_connection(self) -> Dict[str, Any]:
+        # Build from connection master or default to yarn if not available
+        conn_data = {
+            'master': "yarn",
+            'queue': None,
+            'deploy_mode': None
+        }
+
+        try:
+            conn = self.get_connection(self._conn_id)
+            if conn.port:
+                conn_data['master'] = f"{conn.host}:{conn.port}"
+            else:
+                conn_data['master'] = conn.host
+
+            # Determine optional yarn queue from the extra field

Review comment:
       Can you plese describe this new behaviour in the documentation (including deprecated way, marking it as such? ) we have  a newly restructured documentation and it is now very easy to find out relevant part of the documentation (just rebase to latest master and see the `docs/apache-airflow-providers-apache-spark'.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org