You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2020/12/01 01:46:27 UTC

[GitHub] [airflow] danielenricocahall opened a new pull request #12710: Enable SparkSqlHook to use supplied connections

danielenricocahall opened a new pull request #12710:
URL: https://github.com/apache/airflow/pull/12710


   In case of existing issue, reference it using one of the following:
   
   closes: https://github.com/apache/airflow/issues/8713
   
   Minor update to the logic in `SparkSqlHook` to follow same behavior of `SparkSubmitHook` by determining master and connection parameters through connection ID.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] michalslowikowski00 commented on a change in pull request #12710: Enable SparkSqlHook to use supplied connections

Posted by GitBox <gi...@apache.org>.
michalslowikowski00 commented on a change in pull request #12710:
URL: https://github.com/apache/airflow/pull/12710#discussion_r533953467



##########
File path: tests/providers/apache/spark/hooks/test_spark_sql.py
##########
@@ -213,3 +209,29 @@ def test_spark_process_runcmd_and_fail(self, mock_popen):
                 sql, master, params, status
             ),
         )
+
+    def test_resolve_connection_yarn_default_connection(self):
+        hook = SparkSqlHook(conn_id='spark_default', sql='SELECT 1')

Review comment:
       This can be put into setup() test method, after that you will be have access through `self` to the hook and you will avoid code duplication. 
   ```
       def setUp(self):
               self.hook = SparkSqlHook(conn_id='spark_yarn_cluster', sql='SELECT 1')
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] boring-cyborg[bot] commented on pull request #12710: Enable SparkSqlHook to use supplied connections

Posted by GitBox <gi...@apache.org>.
boring-cyborg[bot] commented on pull request #12710:
URL: https://github.com/apache/airflow/pull/12710#issuecomment-735495150


   Congratulations on your first Pull Request and welcome to the Apache Airflow community! If you have any issues or are unsure about any anything please check our Contribution Guide (https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst)
   Here are some useful points:
   - Pay attention to the quality of your code (flake8, pylint and type annotations). Our [pre-commits]( https://github.com/apache/airflow/blob/master/STATIC_CODE_CHECKS.rst#prerequisites-for-pre-commit-hooks) will help you with that.
   - In case of a new feature add useful documentation (in docstrings or in `docs/` directory). Adding a new operator? Check this short [guide](https://github.com/apache/airflow/blob/master/docs/howto/custom-operator.rst) Consider adding an example DAG that shows how users should use it.
   - Consider using [Breeze environment](https://github.com/apache/airflow/blob/master/BREEZE.rst) for testing locally, itโ€™s a heavy docker but it ships with a working Airflow and a lot of integrations.
   - Be patient and persistent. It might take some time to get a review or get the final approval from Committers.
   - Please follow [ASF Code of Conduct](https://www.apache.org/foundation/policies/conduct) for all communication including (but not limited to) comments on Pull Requests, Mailing list and Slack.
   - Be sure to read the [Airflow Coding style]( https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#coding-style-and-best-practices).
   Apache Airflow is a community-driven project and together we are making it better ๐Ÿš€.
   In case of doubts contact the developers at:
   Mailing List: dev@airflow.apache.org
   Slack: https://s.apache.org/airflow-slack
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] michalslowikowski00 commented on a change in pull request #12710: Enable SparkSqlHook to use supplied connections

Posted by GitBox <gi...@apache.org>.
michalslowikowski00 commented on a change in pull request #12710:
URL: https://github.com/apache/airflow/pull/12710#discussion_r533953467



##########
File path: tests/providers/apache/spark/hooks/test_spark_sql.py
##########
@@ -213,3 +209,29 @@ def test_spark_process_runcmd_and_fail(self, mock_popen):
                 sql, master, params, status
             ),
         )
+
+    def test_resolve_connection_yarn_default_connection(self):
+        hook = SparkSqlHook(conn_id='spark_default', sql='SELECT 1')

Review comment:
       This can be put into setup() test method, after that you will be have access through `self` to the hook and you will avoid code duplication. Also connection = hook._resolve_connection() can be put in setUp().
   ```
       def setUp(self):
               self.hook = SparkSqlHook(conn_id='spark_yarn_cluster', sql='SELECT 1')
               self.connection = self.hook._resolve_connection()
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #12710: Enable SparkSqlHook to use supplied connections

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #12710:
URL: https://github.com/apache/airflow/pull/12710#issuecomment-787209773


   This pull request has been automatically marked as stale because it has not had recent activity. It will be closed in 5 days if no further activity occurs. Thank you for your contributions.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] danielenricocahall commented on a change in pull request #12710: Enable SparkSqlHook to use supplied connections

Posted by GitBox <gi...@apache.org>.
danielenricocahall commented on a change in pull request #12710:
URL: https://github.com/apache/airflow/pull/12710#discussion_r533006035



##########
File path: airflow/providers/apache/spark/hooks/spark_sql.py
##########
@@ -67,27 +67,50 @@ def __init__(
         executor_memory: Optional[str] = None,
         keytab: Optional[str] = None,
         principal: Optional[str] = None,
-        master: str = 'yarn',

Review comment:
       @potiuk reverted to make backwards compatible and add deprecation explanation in docstrings!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] michalslowikowski00 commented on a change in pull request #12710: Enable SparkSqlHook to use supplied connections

Posted by GitBox <gi...@apache.org>.
michalslowikowski00 commented on a change in pull request #12710:
URL: https://github.com/apache/airflow/pull/12710#discussion_r533956161



##########
File path: tests/providers/apache/spark/hooks/test_spark_sql.py
##########
@@ -213,3 +209,29 @@ def test_spark_process_runcmd_and_fail(self, mock_popen):
                 sql, master, params, status
             ),
         )
+
+    def test_resolve_connection_yarn_default_connection(self):
+        hook = SparkSqlHook(conn_id='spark_default', sql='SELECT 1')

Review comment:
       I would go even further and use https://github.com/wolever/parameterized. 
   This is basically the same test case with different data `expected_spark_connection` that can be parametrized thorough @parameterized(). 
   Just saying. :)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] michalslowikowski00 commented on a change in pull request #12710: Enable SparkSqlHook to use supplied connections

Posted by GitBox <gi...@apache.org>.
michalslowikowski00 commented on a change in pull request #12710:
URL: https://github.com/apache/airflow/pull/12710#discussion_r534890540



##########
File path: tests/providers/apache/spark/hooks/test_spark_sql.py
##########
@@ -213,3 +209,29 @@ def test_spark_process_runcmd_and_fail(self, mock_popen):
                 sql, master, params, status
             ),
         )
+
+    def test_resolve_connection_yarn_default_connection(self):
+        hook = SparkSqlHook(conn_id='spark_default', sql='SELECT 1')

Review comment:
       Cool :D




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] danielenricocahall commented on pull request #12710: Enable SparkSqlHook to use supplied connections

Posted by GitBox <gi...@apache.org>.
danielenricocahall commented on pull request #12710:
URL: https://github.com/apache/airflow/pull/12710#issuecomment-737624588


   @potiuk It looks like CI fails when pushing the prod images (`unauthorized: Your request could not be authenticated by the GitHub Packages service. Please ensure your access token is valid and has the appropriate scopes configured.` - do I need to fix something on my end?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] danielenricocahall commented on a change in pull request #12710: Enable SparkSqlHook to use supplied connections

Posted by GitBox <gi...@apache.org>.
danielenricocahall commented on a change in pull request #12710:
URL: https://github.com/apache/airflow/pull/12710#discussion_r533006276



##########
File path: airflow/providers/apache/spark/hooks/spark_sql.py
##########
@@ -67,27 +67,50 @@ def __init__(
         executor_memory: Optional[str] = None,
         keytab: Optional[str] = None,
         principal: Optional[str] = None,
-        master: str = 'yarn',
         name: str = 'default-name',
         num_executors: Optional[int] = None,
         verbose: bool = True,
-        yarn_queue: str = 'default',
     ) -> None:
         super().__init__()
         self._sql = sql
         self._conf = conf
-        self._conn = self.get_connection(conn_id)
+        self._conn_id = conn_id
         self._total_executor_cores = total_executor_cores
         self._executor_cores = executor_cores
         self._executor_memory = executor_memory
         self._keytab = keytab
         self._principal = principal
-        self._master = master
         self._name = name
         self._num_executors = num_executors
         self._verbose = verbose
-        self._yarn_queue = yarn_queue
         self._sp: Any = None
+        self._connection = self._resolve_connection()
+
+    def _resolve_connection(self) -> Dict[str, Any]:
+        # Build from connection master or default to yarn if not available
+        conn_data = {
+            'master': "yarn",
+            'queue': None,
+            'deploy_mode': None
+        }
+
+        try:
+            conn = self.get_connection(self._conn_id)
+            if conn.port:
+                conn_data['master'] = f"{conn.host}:{conn.port}"
+            else:
+                conn_data['master'] = conn.host
+
+            # Determine optional yarn queue from the extra field

Review comment:
       @potiuk added some pieces in documentation - how does it look?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] danielenricocahall commented on a change in pull request #12710: Enable SparkSqlHook to use supplied connections

Posted by GitBox <gi...@apache.org>.
danielenricocahall commented on a change in pull request #12710:
URL: https://github.com/apache/airflow/pull/12710#discussion_r534176653



##########
File path: tests/providers/apache/spark/hooks/test_spark_sql.py
##########
@@ -213,3 +209,29 @@ def test_spark_process_runcmd_and_fail(self, mock_popen):
                 sql, master, params, status
             ),
         )
+
+    def test_resolve_connection_yarn_default_connection(self):
+        hook = SparkSqlHook(conn_id='spark_default', sql='SELECT 1')

Review comment:
       Great idea! Yeah I am familiar with the concept as pytest has `parametrize` which I use. I will make those changes and resubmit sometime today.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] danielenricocahall commented on a change in pull request #12710: Enable SparkSqlHook to use supplied connections

Posted by GitBox <gi...@apache.org>.
danielenricocahall commented on a change in pull request #12710:
URL: https://github.com/apache/airflow/pull/12710#discussion_r534593787



##########
File path: tests/providers/apache/spark/hooks/test_spark_sql.py
##########
@@ -213,3 +209,29 @@ def test_spark_process_runcmd_and_fail(self, mock_popen):
                 sql, master, params, status
             ),
         )
+
+    def test_resolve_connection_yarn_default_connection(self):
+        hook = SparkSqlHook(conn_id='spark_default', sql='SELECT 1')

Review comment:
       @michalslowikowski00 done!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #12710: Enable SparkSqlHook to use supplied connections

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #12710:
URL: https://github.com/apache/airflow/pull/12710#issuecomment-737618864


   [The Workflow run](https://github.com/apache/airflow/actions/runs/397347841) is cancelling this PR. Building images for the PR has failed. Follow the the workflow link to check the reason.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #12710: Enable SparkSqlHook to use supplied connections

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #12710:
URL: https://github.com/apache/airflow/pull/12710#discussion_r551924009



##########
File path: airflow/providers/apache/spark/hooks/spark_sql.py
##########
@@ -44,15 +44,19 @@ class SparkSqlHook(BaseHook):
     :type executor_memory: str
     :param keytab: Full path to the file that contains the keytab
     :type keytab: str
-    :param master: spark://host:port, mesos://host:port, yarn, or local
+    :param master: (Deprecated) spark://host:port, mesos://host:port, yarn, or local
+        This parameter has been deprecated. Master and connection parameters (such as YARN queue)
+        are determined by the conn_id parameter.
     :type master: str
     :param name: Name of the job.
     :type name: str
     :param num_executors: Number of executors to launch
     :type num_executors: int
     :param verbose: Whether to pass the verbose flag to spark-sql
     :type verbose: bool
-    :param yarn_queue: The YARN queue to submit to (Default: "default")
+    :param yarn_queue: (Deprecated) The YARN queue to submit to (Default: "default")

Review comment:
       You haven't just deprecated this -- you've made it do nothing at all.
   
   Please make this argument still be respected if passed

##########
File path: airflow/providers/apache/spark/operators/spark_sql.py
##########
@@ -56,6 +58,8 @@ class SparkSqlOperator(BaseOperator):
     :param verbose: Whether to pass the verbose flag to spark-sql
     :type verbose: bool
     :param yarn_queue: The YARN queue to submit to (Default: "default")
+        This parameter has been deprecated. Master and connection parameters (such as YARN queue)

Review comment:
       I don't think we want to deprecate this --  it is a valid option to override the queue for just a single operator for instance.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] closed pull request #12710: Enable SparkSqlHook to use supplied connections

Posted by GitBox <gi...@apache.org>.
github-actions[bot] closed pull request #12710:
URL: https://github.com/apache/airflow/pull/12710


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] danielenricocahall closed pull request #12710: Enable SparkSqlHook to use supplied connections

Posted by GitBox <gi...@apache.org>.
danielenricocahall closed pull request #12710:
URL: https://github.com/apache/airflow/pull/12710


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on a change in pull request #12710: Enable SparkSqlHook to use supplied connections

Posted by GitBox <gi...@apache.org>.
potiuk commented on a change in pull request #12710:
URL: https://github.com/apache/airflow/pull/12710#discussion_r532423467



##########
File path: airflow/providers/apache/spark/hooks/spark_sql.py
##########
@@ -67,27 +67,50 @@ def __init__(
         executor_memory: Optional[str] = None,
         keytab: Optional[str] = None,
         principal: Optional[str] = None,
-        master: str = 'yarn',

Review comment:
       This is not a backwards-compatible change. Can you please make it works also if someone calls it in the old way ? We have some mechanism (just look for "deprecated") that we mark certain parameters as deprecated, but the rule of thumb we have is that we deprecate first and only after next major release we remove the deprecated features. It seems possible to add (and test) the old way of using the hook here.
   
   Also the docstrings were not updated - but they should be - including the deprecation information and explanation what has been replaced.

##########
File path: airflow/providers/apache/spark/hooks/spark_sql.py
##########
@@ -67,27 +67,50 @@ def __init__(
         executor_memory: Optional[str] = None,
         keytab: Optional[str] = None,
         principal: Optional[str] = None,
-        master: str = 'yarn',
         name: str = 'default-name',
         num_executors: Optional[int] = None,
         verbose: bool = True,
-        yarn_queue: str = 'default',
     ) -> None:
         super().__init__()
         self._sql = sql
         self._conf = conf
-        self._conn = self.get_connection(conn_id)
+        self._conn_id = conn_id
         self._total_executor_cores = total_executor_cores
         self._executor_cores = executor_cores
         self._executor_memory = executor_memory
         self._keytab = keytab
         self._principal = principal
-        self._master = master
         self._name = name
         self._num_executors = num_executors
         self._verbose = verbose
-        self._yarn_queue = yarn_queue
         self._sp: Any = None
+        self._connection = self._resolve_connection()
+
+    def _resolve_connection(self) -> Dict[str, Any]:
+        # Build from connection master or default to yarn if not available
+        conn_data = {
+            'master': "yarn",
+            'queue': None,
+            'deploy_mode': None
+        }
+
+        try:
+            conn = self.get_connection(self._conn_id)
+            if conn.port:
+                conn_data['master'] = f"{conn.host}:{conn.port}"
+            else:
+                conn_data['master'] = conn.host
+
+            # Determine optional yarn queue from the extra field

Review comment:
       Can you plese describe this new behaviour in the documentation (including deprecated way, marking it as such? ) we have  a newly restructured documentation and it is now very easy to find out relevant part of the documentation (just rebase to latest master and see the `docs/apache-airflow-providers-apache-spark'.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org