You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "cdkrot (via GitHub)" <gi...@apache.org> on 2023/10/31 01:38:26 UTC

[PR] [SPARK-45733][CONNECT][PYTHON] Support multiple retry policies [spark]

cdkrot opened a new pull request, #43591:
URL: https://github.com/apache/spark/pull/43591

   ### What changes were proposed in this pull request?
   
   Support multiple retry policies defined at the same time. Each policy determines which error types it can retry and how exactly those should be spread out.
   
   ### Why are the changes needed?
   
   Different error types should be treated differently For instance, networking connectivity errors and remote resources being initialized should be treated separately.
   
   ### Does this PR introduce _any_ user-facing change?
   No
   
   ### How was this patch tested?
   Unit tests
   
   ### Was this patch authored or co-authored using generative AI tooling?
   No
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45733][CONNECT][PYTHON] Support multiple retry policies [spark]

Posted by "cdkrot (via GitHub)" <gi...@apache.org>.
cdkrot commented on PR #43591:
URL: https://github.com/apache/spark/pull/43591#issuecomment-1805594233

   Apparently conflicted with some @grundprinzip's changes you merged just before mine :). I resolved, should be mergeable after CI


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45733][CONNECT][PYTHON] Support multiple retry policies [spark]

Posted by "juliuszsompolski (via GitHub)" <gi...@apache.org>.
juliuszsompolski commented on code in PR #43591:
URL: https://github.com/apache/spark/pull/43591#discussion_r1383539204


##########
python/pyspark/sql/connect/client/core.py:
##########
@@ -688,6 +653,36 @@ def enable_reattachable_execute(self) -> "SparkConnectClient":
         self._use_reattachable_execute = True
         return self
 
+    def register_retry_policy(self, policy: RetryPolicy):
+        """
+        Registers specified policy in the dictionary of known policies.
+
+        To activate it, use set_retry_policies().
+        """
+        if policy.name in self._known_retry_policies:
+            raise ValueError("Already known policy")
+        self._known_retry_policies[policy.name] = policy
+
+    def set_retry_policies(self, policies: List[Union[str, RetryPolicy]]):
+        """
+        Sets list of policies to be used for retries.
+        I.e. set_retry_policies(["DefaultPolicy", "CustomPolicy"]).
+
+        If policy is given as string, the policy object is sourced from previously
+        registered policies.
+        Specifying policy directly bypasses the registration mechanism.
+        """
+        self._retry_policies = [
+            policy if isinstance(policy, RetryPolicy) else self._known_retry_policies[policy]
+            for policy in policies
+        ]
+
+    def get_retry_policies(self) -> List[str]:
+        """
+        Return list of currently used policies, i.e. ["DefaultPolicy", "MyCustomPolicy"].
+        """
+        return [policy.name for policy in self._retry_policies]

Review Comment:
   In my opinion the registering of policies by name is still superfluous. Especially that `policy.name` is currently generated only from policy classname, so it's currently not possible to use a difference instance of the same policy with different parameters (e.g. different sleep timings). So I still think that any power-user who wants to customize it would just define it with their own instances of the objects.
   I think that now having some policies be registered under a name in known_retry_policies, but the key there being derived only from class name so registering a different instance will overwrite the entry is confusing, and having the set_retry_policies being either a string from known_retry_policies, or an instance (which may not be part of known_retry_policies, so when it's returned by-name by get_retry_policies, the user can't do anything with that name) is also confusing.
   
   WDYT @HyukjinKwon @grundprinzip  ?



##########
python/pyspark/sql/connect/client/retries.py:
##########
@@ -0,0 +1,282 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import grpc
+import random
+import time
+import typing
+from typing import Optional, Callable, Generator, List, Type
+from types import TracebackType
+from pyspark.sql.connect.client.logging import logger
+
+"""
+This module contains retry system. The system is designed to be
+significantly customizable.
+
+A key aspect of retries is RetryPolicy class, describing a single policy.
+There can be more than one policy defined at the same time. Each policy
+determines which error types it can retry and how exactly.
+
+For instance, networking errors should likely be retried differently that
+remote resource being available.
+
+Given a sequence of policies, retry logic applies all of them in sequential
+order, keeping track of different policies budgets.
+"""
+
+
+class RetryPolicy:
+    """
+    Describes key aspects of RetryPolicy.
+
+    It's advised that different policies are implemented as different subclasses.
+    """
+
+    def __init__(
+        self,
+        max_retries: Optional[int] = None,
+        initial_backoff: int = 1000,
+        max_backoff: Optional[int] = None,
+        backoff_multiplier: float = 1.0,
+        jitter: int = 0,
+        min_jitter_threshold: int = 0,
+    ):
+        self.max_retries = max_retries
+        self.initial_backoff = initial_backoff
+        self.max_backoff = max_backoff
+        self.backoff_multiplier = backoff_multiplier
+        self.jitter = jitter
+        self.min_jitter_threshold = min_jitter_threshold
+
+    @property
+    def name(self):
+        return self.__class__.__name__

Review Comment:
   using `self.__class__.__name__` will prevent us from registering a separate instance of the same class with different parameters, e.g. different number of retries, or different timing.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45733][CONNECT][PYTHON] Support multiple retry policies [spark]

Posted by "cdkrot (via GitHub)" <gi...@apache.org>.
cdkrot commented on PR #43591:
URL: https://github.com/apache/spark/pull/43591#issuecomment-1804252072

   Should be good to go


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45733][CONNECT][PYTHON] Support multiple retry policies [spark]

Posted by "cdkrot (via GitHub)" <gi...@apache.org>.
cdkrot commented on PR #43591:
URL: https://github.com/apache/spark/pull/43591#issuecomment-1798120757

   @juliuszsompolski 
   I think deriving policy identifier from class name is a rather good default,  but I also recognize your concern that someone might want to create new policy without actually going through class inheritance.
   
   I made a change to make it more easy to override and yet still keep the class name as default


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45733][CONNECT][PYTHON] Support multiple retry policies [spark]

Posted by "cdkrot (via GitHub)" <gi...@apache.org>.
cdkrot commented on code in PR #43591:
URL: https://github.com/apache/spark/pull/43591#discussion_r1383117798


##########
python/pyspark/sql/connect/client/retries.py:
##########
@@ -0,0 +1,228 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import random
+import time
+import typing
+from typing import Optional, Callable, Generator, List, Type
+from types import TracebackType
+from pyspark.sql.connect.client.logging import logger
+
+"""
+This module contains retry system. The system is designed to be
+significantly customizable.
+
+A key aspect of retries is RetryPolicy class, describing a single policy.
+There can be more than one policy defined at the same time. Each policy
+determines which error types it can retry and how exactly.
+
+For instance, networking errors should likely be retried differently that
+remote resource being available.
+
+Given a sequence of policies, retry logic applies all of them in sequential
+order, keeping track of different policies budgets.
+"""
+
+
+class RetryPolicy:
+    """
+    Describes key aspects of RetryPolicy.
+
+    It's advised that different policies are implemented as different subclasses.
+    """
+
+    def __init__(
+        self,
+        max_retries: Optional[int] = None,
+        initial_backoff: int = 1000,
+        max_backoff: Optional[int] = None,
+        backoff_multiplier: float = 1.0,
+        jitter: int = 0,
+        min_jitter_threshold: int = 0,
+    ):
+        self.max_retries = max_retries
+        self.initial_backoff = initial_backoff
+        self.max_backoff = max_backoff
+        self.backoff_multiplier = backoff_multiplier
+        self.jitter = jitter
+        self.min_jitter_threshold = min_jitter_threshold
+
+    @property
+    def name(self):
+        return self.__class__.__name__
+
+    def can_retry(self, exception: BaseException):
+        return False
+
+    def to_state(self) -> "RetryPolicyState":
+        return RetryPolicyState(self)
+
+
+class RetryPolicyState:
+    """
+    This class represents stateful part of the specific policy.
+    """
+
+    def __init__(self, policy: RetryPolicy):
+        self._policy = policy
+
+        # Will allow attempts [0, self._policy.max_retries)
+        self._attempt = 0
+        self._next_wait: float = self._policy.initial_backoff
+
+    @property
+    def policy(self):
+        return self._policy
+
+    @property
+    def name(self):
+        return self.policy.name
+
+    def can_retry(self, exception: BaseException):
+        return self.policy.can_retry(exception)
+
+    def next_attempt(self) -> Optional[int]:
+        """
+        Returns
+        -------
+            Randomized time (in milliseconds) to wait until this attempt
+            or None if this policy doesn't allow more retries.
+        """
+
+        if self.policy.max_retries is not None and self._attempt >= self.policy.max_retries:
+            # No more retries under this policy
+            return None
+
+        self._attempt += 1
+        wait_time = self._next_wait
+
+        # Calculate future backoff
+        if self.policy.max_backoff is not None:
+            self._next_wait = min(
+                float(self.policy.max_backoff), wait_time * self.policy.backoff_multiplier
+            )
+
+        # Jitter current backoff, after the future backoff was computed
+        if wait_time >= self.policy.min_jitter_threshold:
+            wait_time += random.uniform(0, self.policy.jitter)
+
+        # Round to whole number of milliseconds
+        return int(wait_time)
+
+
+class AttemptManager:
+    """
+    Simple ContextManager that is used to capture the exception thrown inside the context.
+    """
+
+    def __init__(self, retrying: "Retrying") -> None:
+        self._retrying = retrying
+
+    def __enter__(self) -> None:
+        pass
+
+    def __exit__(
+        self,
+        exc_type: Optional[Type[BaseException]],
+        exception: Optional[BaseException],
+        exc_tb: Optional[TracebackType],
+    ) -> Optional[bool]:
+        if isinstance(exception, BaseException):
+            # Swallow the exception.
+            if self._retrying.accept_exception(exception):
+                return True
+            # Bubble up the exception.
+            return False
+        else:
+            self._retrying.accept_succeeded()
+            return None
+
+
+class Retrying:
+    """
+    This class is a point of entry into the retry logic.
+    The class accepts a list of retry policies and applies them in given order.
+    The first policy accepting an exception will be used.
+    """
+
+    def __init__(
+        self,
+        policies: typing.Union[RetryPolicy, typing.Iterable[RetryPolicy]],
+        sleep: Callable[[float], None] = time.sleep,
+    ) -> None:
+        if isinstance(policies, RetryPolicy):
+            policies = [policies]
+        self._policies: List[RetryPolicyState] = [policy.to_state() for policy in policies]
+        self._sleep = sleep
+
+        self._exception: Optional[BaseException] = None
+        self._done = False
+
+    def can_retry(self, exception: BaseException) -> bool:
+        return any(policy.can_retry(exception) for policy in self._policies)
+
+    def accept_exception(self, exception: BaseException) -> bool:
+        if self.can_retry(exception):
+            self._exception = exception
+            return True
+        return False
+
+    def accept_succeeded(self):
+        self._done = True
+
+    def _last_exception(self) -> BaseException:
+        if self._exception is None:
+            raise RuntimeError("No active exception")
+        return self._exception
+
+    def _wait(self):
+        exception = self._last_exception()
+
+        # Attempt to find a policy to wait with
+        for policy in self._policies:
+            if not policy.can_retry(exception):
+                continue
+
+            wait_time = policy.next_attempt()
+            if wait_time is not None:
+                logger.debug(
+                    f"Got error: {exception}. "
+                    + f"Will retry after {wait_time} ms (policy: {policy.name})"
+                )
+
+                self._sleep(wait_time / 1000)
+                return
+
+        # Exceeded retries
+        logger.debug(f"Given up on retrying. error: {exception}")
+        raise exception

Review Comment:
   That seems sensible, we can do that



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45733][CONNECT][PYTHON] Support multiple retry policies [spark]

Posted by "juliuszsompolski (via GitHub)" <gi...@apache.org>.
juliuszsompolski commented on code in PR #43591:
URL: https://github.com/apache/spark/pull/43591#discussion_r1383280435


##########
python/pyspark/sql/connect/client/core.py:
##########
@@ -688,6 +695,14 @@ def enable_reattachable_execute(self) -> "SparkConnectClient":
         self._use_reattachable_execute = True
         return self
 
+    def register_retry_policy(self, policy: RetryPolicy):
+        if policy.name in self._known_retry_policies:
+            raise ValueError("Already known policy")
+        self._known_retry_policies[policy.name] = policy
+
+    def set_retry_policies(self, policies: List[str]):
+        self._retry_policies = [self._known_retry_policies[name] for name in policies]

Review Comment:
   I'm just wondering if any user that is a "power" user enough to work with changing these policies would not be comfortable with tossing these objects around anyway. E.g. if they would want to tweak the parameters that you mention, they would anyway have to deal with the object, instantiate it with different parameters, register it and then set it.
   For such a power user, it actually may be cleaner to just work with these objects directly instead of having a two step registration and setting.
   
   But I don't have a strong opinion here, I am also fine with keeping these extra APIs.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45733][CONNECT][PYTHON] Support multiple retry policies [spark]

Posted by "cdkrot (via GitHub)" <gi...@apache.org>.
cdkrot commented on PR #43591:
URL: https://github.com/apache/spark/pull/43591#issuecomment-1786303204

   cc @HyukjinKwon, @juliuszsompolski, @grundprinzip 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45733][CONNECT][PYTHON] Support multiple retry policies [spark]

Posted by "cdkrot (via GitHub)" <gi...@apache.org>.
cdkrot commented on code in PR #43591:
URL: https://github.com/apache/spark/pull/43591#discussion_r1383076362


##########
python/pyspark/sql/connect/client/core.py:
##########
@@ -688,6 +695,14 @@ def enable_reattachable_execute(self) -> "SparkConnectClient":
         self._use_reattachable_execute = True
         return self
 
+    def register_retry_policy(self, policy: RetryPolicy):
+        if policy.name in self._known_retry_policies:
+            raise ValueError("Already known policy")
+        self._known_retry_policies[policy.name] = policy
+
+    def set_retry_policies(self, policies: List[str]):
+        self._retry_policies = [self._known_retry_policies[name] for name in policies]

Review Comment:
   My rationale here was that it makes it easier for users to configure retry policies without needing to toss objects around. Policy consists not just of the class but also parameters set on the instance.
   
   For example, suppose you have a client which shipped with policies ["policyA", "policyB", "policyC"], and for some reason you aren't happy with policyB. Than it's easier to configure this in only one call without having to obtain objects for policyA, policyC. Also convenient to add you own PolicyD in the mix. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45733][CONNECT][PYTHON] Support multiple retry policies [spark]

Posted by "juliuszsompolski (via GitHub)" <gi...@apache.org>.
juliuszsompolski commented on code in PR #43591:
URL: https://github.com/apache/spark/pull/43591#discussion_r1380491812


##########
python/pyspark/sql/connect/client/core.py:
##########
@@ -550,13 +545,11 @@ def fromProto(cls, pb: pb2.ConfigResponse) -> "ConfigResult":
         )
 
 
-class SparkConnectClient(object):
-    """
-    Conceptually the remote spark session that communicates with the server
-    """
+class DefaultPolicy(RetryPolicy):

Review Comment:
   nit: DefaultRetryPolicy



##########
python/pyspark/sql/connect/client/core.py:
##########
@@ -688,6 +695,14 @@ def enable_reattachable_execute(self) -> "SparkConnectClient":
         self._use_reattachable_execute = True
         return self
 
+    def register_retry_policy(self, policy: RetryPolicy):
+        if policy.name in self._known_retry_policies:
+            raise ValueError("Already known policy")
+        self._known_retry_policies[policy.name] = policy
+
+    def set_retry_policies(self, policies: List[str]):
+        self._retry_policies = [self._known_retry_policies[name] for name in policies]

Review Comment:
   do we need the functionality of registering policies that we are not going to be using?
   is there a use case for adding and removing policies?
   unless there's a good reason to have a separate `register` and `set`, maybe simplify it and just set is enough?



##########
python/pyspark/sql/connect/client/retries.py:
##########
@@ -0,0 +1,228 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import random
+import time
+import typing
+from typing import Optional, Callable, Generator, List, Type
+from types import TracebackType
+from pyspark.sql.connect.client.logging import logger
+
+"""
+This module contains retry system. The system is designed to be
+significantly customizable.
+
+A key aspect of retries is RetryPolicy class, describing a single policy.
+There can be more than one policy defined at the same time. Each policy
+determines which error types it can retry and how exactly.
+
+For instance, networking errors should likely be retried differently that
+remote resource being available.
+
+Given a sequence of policies, retry logic applies all of them in sequential
+order, keeping track of different policies budgets.
+"""
+
+
+class RetryPolicy:
+    """
+    Describes key aspects of RetryPolicy.
+
+    It's advised that different policies are implemented as different subclasses.
+    """
+
+    def __init__(
+        self,
+        max_retries: Optional[int] = None,
+        initial_backoff: int = 1000,
+        max_backoff: Optional[int] = None,
+        backoff_multiplier: float = 1.0,
+        jitter: int = 0,
+        min_jitter_threshold: int = 0,
+    ):
+        self.max_retries = max_retries
+        self.initial_backoff = initial_backoff
+        self.max_backoff = max_backoff
+        self.backoff_multiplier = backoff_multiplier
+        self.jitter = jitter
+        self.min_jitter_threshold = min_jitter_threshold
+
+    @property
+    def name(self):
+        return self.__class__.__name__
+
+    def can_retry(self, exception: BaseException):
+        return False
+
+    def to_state(self) -> "RetryPolicyState":
+        return RetryPolicyState(self)
+
+
+class RetryPolicyState:
+    """
+    This class represents stateful part of the specific policy.
+    """
+
+    def __init__(self, policy: RetryPolicy):
+        self._policy = policy
+
+        # Will allow attempts [0, self._policy.max_retries)
+        self._attempt = 0
+        self._next_wait: float = self._policy.initial_backoff
+
+    @property
+    def policy(self):
+        return self._policy
+
+    @property
+    def name(self):
+        return self.policy.name
+
+    def can_retry(self, exception: BaseException):
+        return self.policy.can_retry(exception)
+
+    def next_attempt(self) -> Optional[int]:
+        """
+        Returns
+        -------
+            Randomized time (in milliseconds) to wait until this attempt
+            or None if this policy doesn't allow more retries.
+        """
+
+        if self.policy.max_retries is not None and self._attempt >= self.policy.max_retries:
+            # No more retries under this policy
+            return None
+
+        self._attempt += 1
+        wait_time = self._next_wait
+
+        # Calculate future backoff
+        if self.policy.max_backoff is not None:
+            self._next_wait = min(
+                float(self.policy.max_backoff), wait_time * self.policy.backoff_multiplier
+            )
+
+        # Jitter current backoff, after the future backoff was computed
+        if wait_time >= self.policy.min_jitter_threshold:
+            wait_time += random.uniform(0, self.policy.jitter)
+
+        # Round to whole number of milliseconds
+        return int(wait_time)
+
+
+class AttemptManager:
+    """
+    Simple ContextManager that is used to capture the exception thrown inside the context.
+    """
+
+    def __init__(self, retrying: "Retrying") -> None:
+        self._retrying = retrying
+
+    def __enter__(self) -> None:
+        pass
+
+    def __exit__(
+        self,
+        exc_type: Optional[Type[BaseException]],
+        exception: Optional[BaseException],
+        exc_tb: Optional[TracebackType],
+    ) -> Optional[bool]:
+        if isinstance(exception, BaseException):
+            # Swallow the exception.
+            if self._retrying.accept_exception(exception):
+                return True
+            # Bubble up the exception.
+            return False
+        else:
+            self._retrying.accept_succeeded()
+            return None
+
+
+class Retrying:
+    """
+    This class is a point of entry into the retry logic.
+    The class accepts a list of retry policies and applies them in given order.
+    The first policy accepting an exception will be used.
+    """
+
+    def __init__(
+        self,
+        policies: typing.Union[RetryPolicy, typing.Iterable[RetryPolicy]],
+        sleep: Callable[[float], None] = time.sleep,
+    ) -> None:
+        if isinstance(policies, RetryPolicy):
+            policies = [policies]
+        self._policies: List[RetryPolicyState] = [policy.to_state() for policy in policies]
+        self._sleep = sleep
+
+        self._exception: Optional[BaseException] = None
+        self._done = False
+
+    def can_retry(self, exception: BaseException) -> bool:
+        return any(policy.can_retry(exception) for policy in self._policies)

Review Comment:
   different policies can have different attempt numbers.
   by passing only `exception` how can a policy determine if it can retry, without e.g. knowing that this is the 10th attempt, while that policy allows only 4 attempts?
   
   edit: I see, this will check if the exception can even possibly be retried, and the 
   ```
               wait_time = policy.next_attempt()
               if wait_time is not None:
   ```
   will actually check if next attempt is allowed.
   
   maybe add this to can_retry documentation, and a comment here, that it only checks if the exception can be retried here, and whether it has more retry attempts is checked later.



##########
python/pyspark/sql/connect/client/reattach.py:
##########
@@ -335,6 +321,5 @@ def __del__(self) -> None:
 
 class RetryException(Exception):
     """
-    An exception that can be thrown upstream when inside retry and which will be retryable
-    regardless of policy.
+    An exception that can be thrown upstream when inside retry and which is always retryable
     """

Review Comment:
   nit: maybe move to retries.py? I think it shouldn't have landed in this file in the first place.



##########
python/pyspark/sql/connect/client/retries.py:
##########
@@ -0,0 +1,228 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import random
+import time
+import typing
+from typing import Optional, Callable, Generator, List, Type
+from types import TracebackType
+from pyspark.sql.connect.client.logging import logger
+
+"""
+This module contains retry system. The system is designed to be
+significantly customizable.
+
+A key aspect of retries is RetryPolicy class, describing a single policy.
+There can be more than one policy defined at the same time. Each policy
+determines which error types it can retry and how exactly.
+
+For instance, networking errors should likely be retried differently that
+remote resource being available.
+
+Given a sequence of policies, retry logic applies all of them in sequential
+order, keeping track of different policies budgets.
+"""
+
+
+class RetryPolicy:
+    """
+    Describes key aspects of RetryPolicy.
+
+    It's advised that different policies are implemented as different subclasses.
+    """
+
+    def __init__(
+        self,
+        max_retries: Optional[int] = None,
+        initial_backoff: int = 1000,
+        max_backoff: Optional[int] = None,
+        backoff_multiplier: float = 1.0,
+        jitter: int = 0,
+        min_jitter_threshold: int = 0,
+    ):
+        self.max_retries = max_retries
+        self.initial_backoff = initial_backoff
+        self.max_backoff = max_backoff
+        self.backoff_multiplier = backoff_multiplier
+        self.jitter = jitter
+        self.min_jitter_threshold = min_jitter_threshold
+
+    @property
+    def name(self):
+        return self.__class__.__name__
+
+    def can_retry(self, exception: BaseException):
+        return False
+
+    def to_state(self) -> "RetryPolicyState":
+        return RetryPolicyState(self)
+
+
+class RetryPolicyState:
+    """
+    This class represents stateful part of the specific policy.
+    """
+
+    def __init__(self, policy: RetryPolicy):
+        self._policy = policy
+
+        # Will allow attempts [0, self._policy.max_retries)
+        self._attempt = 0
+        self._next_wait: float = self._policy.initial_backoff
+
+    @property
+    def policy(self):
+        return self._policy
+
+    @property
+    def name(self):
+        return self.policy.name
+
+    def can_retry(self, exception: BaseException):
+        return self.policy.can_retry(exception)
+
+    def next_attempt(self) -> Optional[int]:
+        """
+        Returns
+        -------
+            Randomized time (in milliseconds) to wait until this attempt
+            or None if this policy doesn't allow more retries.
+        """
+
+        if self.policy.max_retries is not None and self._attempt >= self.policy.max_retries:
+            # No more retries under this policy
+            return None
+
+        self._attempt += 1
+        wait_time = self._next_wait
+
+        # Calculate future backoff
+        if self.policy.max_backoff is not None:
+            self._next_wait = min(
+                float(self.policy.max_backoff), wait_time * self.policy.backoff_multiplier
+            )
+
+        # Jitter current backoff, after the future backoff was computed
+        if wait_time >= self.policy.min_jitter_threshold:
+            wait_time += random.uniform(0, self.policy.jitter)
+
+        # Round to whole number of milliseconds
+        return int(wait_time)
+
+
+class AttemptManager:
+    """
+    Simple ContextManager that is used to capture the exception thrown inside the context.
+    """
+
+    def __init__(self, retrying: "Retrying") -> None:
+        self._retrying = retrying
+
+    def __enter__(self) -> None:
+        pass
+
+    def __exit__(
+        self,
+        exc_type: Optional[Type[BaseException]],
+        exception: Optional[BaseException],
+        exc_tb: Optional[TracebackType],
+    ) -> Optional[bool]:
+        if isinstance(exception, BaseException):
+            # Swallow the exception.
+            if self._retrying.accept_exception(exception):
+                return True
+            # Bubble up the exception.
+            return False
+        else:
+            self._retrying.accept_succeeded()
+            return None
+
+
+class Retrying:
+    """
+    This class is a point of entry into the retry logic.
+    The class accepts a list of retry policies and applies them in given order.
+    The first policy accepting an exception will be used.
+    """
+
+    def __init__(
+        self,
+        policies: typing.Union[RetryPolicy, typing.Iterable[RetryPolicy]],
+        sleep: Callable[[float], None] = time.sleep,
+    ) -> None:
+        if isinstance(policies, RetryPolicy):
+            policies = [policies]
+        self._policies: List[RetryPolicyState] = [policy.to_state() for policy in policies]
+        self._sleep = sleep
+
+        self._exception: Optional[BaseException] = None
+        self._done = False
+
+    def can_retry(self, exception: BaseException) -> bool:
+        return any(policy.can_retry(exception) for policy in self._policies)
+
+    def accept_exception(self, exception: BaseException) -> bool:
+        if self.can_retry(exception):
+            self._exception = exception
+            return True
+        return False
+
+    def accept_succeeded(self):
+        self._done = True
+
+    def _last_exception(self) -> BaseException:
+        if self._exception is None:
+            raise RuntimeError("No active exception")
+        return self._exception
+
+    def _wait(self):
+        exception = self._last_exception()
+
+        # Attempt to find a policy to wait with
+        for policy in self._policies:
+            if not policy.can_retry(exception):
+                continue
+
+            wait_time = policy.next_attempt()
+            if wait_time is not None:
+                logger.debug(
+                    f"Got error: {exception}. "
+                    + f"Will retry after {wait_time} ms (policy: {policy.name})"
+                )
+
+                self._sleep(wait_time / 1000)
+                return
+
+        # Exceeded retries
+        logger.debug(f"Given up on retrying. error: {exception}")
+        raise exception

Review Comment:
   from offline discussion / unrelated to the core "support multiple retry policies":
   maybe it would be nice to throw a specific exception type like "RetryExceeded", with all the previously retried exceptions wrapped inside? A user is unlikely to catch on exceptions that are normally retried, but they may want to catch one special exception type that will give them information what transient errors persisted and for how long, over how many retries etc.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45733][CONNECT][PYTHON] Support multiple retry policies [spark]

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon closed pull request #43591: [SPARK-45733][CONNECT][PYTHON] Support multiple retry policies 
URL: https://github.com/apache/spark/pull/43591


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45733][CONNECT][PYTHON] Support multiple retry policies [spark]

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on PR #43591:
URL: https://github.com/apache/spark/pull/43591#issuecomment-1807307318

   Merged to master.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45733][CONNECT][PYTHON] Support multiple retry policies [spark]

Posted by "cdkrot (via GitHub)" <gi...@apache.org>.
cdkrot commented on PR #43591:
URL: https://github.com/apache/spark/pull/43591#issuecomment-1798540009

   Ok, I don't object to switch to what's proposed above, that seems also to do what's needed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45733][CONNECT][PYTHON] Support multiple retry policies [spark]

Posted by "cdkrot (via GitHub)" <gi...@apache.org>.
cdkrot commented on code in PR #43591:
URL: https://github.com/apache/spark/pull/43591#discussion_r1383076362


##########
python/pyspark/sql/connect/client/core.py:
##########
@@ -688,6 +695,14 @@ def enable_reattachable_execute(self) -> "SparkConnectClient":
         self._use_reattachable_execute = True
         return self
 
+    def register_retry_policy(self, policy: RetryPolicy):
+        if policy.name in self._known_retry_policies:
+            raise ValueError("Already known policy")
+        self._known_retry_policies[policy.name] = policy
+
+    def set_retry_policies(self, policies: List[str]):
+        self._retry_policies = [self._known_retry_policies[name] for name in policies]

Review Comment:
   We can consider more options, but my rationale here was that it makes it easier for users to configure retry policies without needing to toss objects around. Policy consists not just of the class but also parameters set on the instance.
   
   For example, suppose you have a client which shipped with policies ["policyA", "policyB", "policyC"], and for some reason you aren't happy with policyB. Than it's easier to configure this in only one call without having to obtain objects for policyA, policyC. Also convenient to add you own PolicyD in the mix. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45733][CONNECT][PYTHON] Support multiple retry policies [spark]

Posted by "cdkrot (via GitHub)" <gi...@apache.org>.
cdkrot commented on PR #43591:
URL: https://github.com/apache/spark/pull/43591#issuecomment-1794818200

   Also yes, scala follow-up is definitely planned, as soon as we finalize the design here


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45733][CONNECT][PYTHON] Support multiple retry policies [spark]

Posted by "cdkrot (via GitHub)" <gi...@apache.org>.
cdkrot commented on PR #43591:
URL: https://github.com/apache/spark/pull/43591#issuecomment-1788204673

   meow


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45733][CONNECT][PYTHON] Support multiple retry policies [spark]

Posted by "cdkrot (via GitHub)" <gi...@apache.org>.
cdkrot commented on code in PR #43591:
URL: https://github.com/apache/spark/pull/43591#discussion_r1383329101


##########
python/pyspark/sql/connect/client/core.py:
##########
@@ -688,6 +695,14 @@ def enable_reattachable_execute(self) -> "SparkConnectClient":
         self._use_reattachable_execute = True
         return self
 
+    def register_retry_policy(self, policy: RetryPolicy):
+        if policy.name in self._known_retry_policies:
+            raise ValueError("Already known policy")
+        self._known_retry_policies[policy.name] = policy
+
+    def set_retry_policies(self, policies: List[str]):
+        self._retry_policies = [self._known_retry_policies[name] for name in policies]

Review Comment:
   > they would anyway have to deal with the object
   probably not really needs obtaining the actualy object, but rather import statement from somewhere, and a new instance construction



##########
python/pyspark/sql/connect/client/core.py:
##########
@@ -688,6 +695,14 @@ def enable_reattachable_execute(self) -> "SparkConnectClient":
         self._use_reattachable_execute = True
         return self
 
+    def register_retry_policy(self, policy: RetryPolicy):
+        if policy.name in self._known_retry_policies:
+            raise ValueError("Already known policy")
+        self._known_retry_policies[policy.name] = policy
+
+    def set_retry_policies(self, policies: List[str]):
+        self._retry_policies = [self._known_retry_policies[name] for name in policies]

Review Comment:
   > they would anyway have to deal with the object
   
   probably not really needs obtaining the actualy object, but rather import statement from somewhere, and a new instance construction



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45733][CONNECT][PYTHON] Support multiple retry policies [spark]

Posted by "cdkrot (via GitHub)" <gi...@apache.org>.
cdkrot commented on code in PR #43591:
URL: https://github.com/apache/spark/pull/43591#discussion_r1383375084


##########
python/pyspark/sql/connect/client/core.py:
##########
@@ -688,6 +695,14 @@ def enable_reattachable_execute(self) -> "SparkConnectClient":
         self._use_reattachable_execute = True
         return self
 
+    def register_retry_policy(self, policy: RetryPolicy):
+        if policy.name in self._known_retry_policies:
+            raise ValueError("Already known policy")
+        self._known_retry_policies[policy.name] = policy
+
+    def set_retry_policies(self, policies: List[str]):
+        self._retry_policies = [self._known_retry_policies[name] for name in policies]

Review Comment:
   I made a change in API which now also allows passing object directly



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45733][CONNECT][PYTHON] Support multiple retry policies [spark]

Posted by "cdkrot (via GitHub)" <gi...@apache.org>.
cdkrot commented on PR #43591:
URL: https://github.com/apache/spark/pull/43591#issuecomment-1805888408

   Ready to merge again


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45733][CONNECT][PYTHON] Support multiple retry policies [spark]

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on PR #43591:
URL: https://github.com/apache/spark/pull/43591#issuecomment-1804965324

   argh, seems there's a conflict against latest master branch 😢 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45733][CONNECT][PYTHON] Support multiple retry policies [spark]

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on PR #43591:
URL: https://github.com/apache/spark/pull/43591#issuecomment-1788205796

   @grundprinzip and @nija-at let me know if this is good to go.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45733][CONNECT][PYTHON] Support multiple retry policies [spark]

Posted by "juliuszsompolski (via GitHub)" <gi...@apache.org>.
juliuszsompolski commented on code in PR #43591:
URL: https://github.com/apache/spark/pull/43591#discussion_r1383273075


##########
python/pyspark/sql/connect/client/retries.py:
##########
@@ -0,0 +1,228 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import random
+import time
+import typing
+from typing import Optional, Callable, Generator, List, Type
+from types import TracebackType
+from pyspark.sql.connect.client.logging import logger
+
+"""
+This module contains retry system. The system is designed to be
+significantly customizable.
+
+A key aspect of retries is RetryPolicy class, describing a single policy.
+There can be more than one policy defined at the same time. Each policy
+determines which error types it can retry and how exactly.
+
+For instance, networking errors should likely be retried differently that
+remote resource being available.
+
+Given a sequence of policies, retry logic applies all of them in sequential
+order, keeping track of different policies budgets.
+"""
+
+
+class RetryPolicy:
+    """
+    Describes key aspects of RetryPolicy.
+
+    It's advised that different policies are implemented as different subclasses.
+    """
+
+    def __init__(
+        self,
+        max_retries: Optional[int] = None,
+        initial_backoff: int = 1000,
+        max_backoff: Optional[int] = None,
+        backoff_multiplier: float = 1.0,
+        jitter: int = 0,
+        min_jitter_threshold: int = 0,
+    ):
+        self.max_retries = max_retries
+        self.initial_backoff = initial_backoff
+        self.max_backoff = max_backoff
+        self.backoff_multiplier = backoff_multiplier
+        self.jitter = jitter
+        self.min_jitter_threshold = min_jitter_threshold
+
+    @property
+    def name(self):
+        return self.__class__.__name__
+
+    def can_retry(self, exception: BaseException):
+        return False
+
+    def to_state(self) -> "RetryPolicyState":
+        return RetryPolicyState(self)
+
+
+class RetryPolicyState:
+    """
+    This class represents stateful part of the specific policy.
+    """
+
+    def __init__(self, policy: RetryPolicy):
+        self._policy = policy
+
+        # Will allow attempts [0, self._policy.max_retries)
+        self._attempt = 0
+        self._next_wait: float = self._policy.initial_backoff
+
+    @property
+    def policy(self):
+        return self._policy
+
+    @property
+    def name(self):
+        return self.policy.name
+
+    def can_retry(self, exception: BaseException):
+        return self.policy.can_retry(exception)
+
+    def next_attempt(self) -> Optional[int]:
+        """
+        Returns
+        -------
+            Randomized time (in milliseconds) to wait until this attempt
+            or None if this policy doesn't allow more retries.
+        """
+
+        if self.policy.max_retries is not None and self._attempt >= self.policy.max_retries:
+            # No more retries under this policy
+            return None
+
+        self._attempt += 1
+        wait_time = self._next_wait
+
+        # Calculate future backoff
+        if self.policy.max_backoff is not None:
+            self._next_wait = min(
+                float(self.policy.max_backoff), wait_time * self.policy.backoff_multiplier
+            )
+
+        # Jitter current backoff, after the future backoff was computed
+        if wait_time >= self.policy.min_jitter_threshold:
+            wait_time += random.uniform(0, self.policy.jitter)
+
+        # Round to whole number of milliseconds
+        return int(wait_time)
+
+
+class AttemptManager:
+    """
+    Simple ContextManager that is used to capture the exception thrown inside the context.
+    """
+
+    def __init__(self, retrying: "Retrying") -> None:
+        self._retrying = retrying
+
+    def __enter__(self) -> None:
+        pass
+
+    def __exit__(
+        self,
+        exc_type: Optional[Type[BaseException]],
+        exception: Optional[BaseException],
+        exc_tb: Optional[TracebackType],
+    ) -> Optional[bool]:
+        if isinstance(exception, BaseException):
+            # Swallow the exception.
+            if self._retrying.accept_exception(exception):
+                return True
+            # Bubble up the exception.
+            return False
+        else:
+            self._retrying.accept_succeeded()
+            return None
+
+
+class Retrying:
+    """
+    This class is a point of entry into the retry logic.
+    The class accepts a list of retry policies and applies them in given order.
+    The first policy accepting an exception will be used.
+    """
+
+    def __init__(
+        self,
+        policies: typing.Union[RetryPolicy, typing.Iterable[RetryPolicy]],
+        sleep: Callable[[float], None] = time.sleep,
+    ) -> None:
+        if isinstance(policies, RetryPolicy):
+            policies = [policies]
+        self._policies: List[RetryPolicyState] = [policy.to_state() for policy in policies]
+        self._sleep = sleep
+
+        self._exception: Optional[BaseException] = None
+        self._done = False
+
+    def can_retry(self, exception: BaseException) -> bool:
+        return any(policy.can_retry(exception) for policy in self._policies)

Review Comment:
   Yep, it matches the current design. Just took me a moment to understand so some additional comments might be useful.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45733][CONNECT][PYTHON] Support multiple retry policies [spark]

Posted by "cdkrot (via GitHub)" <gi...@apache.org>.
cdkrot commented on code in PR #43591:
URL: https://github.com/apache/spark/pull/43591#discussion_r1383329101


##########
python/pyspark/sql/connect/client/core.py:
##########
@@ -688,6 +695,14 @@ def enable_reattachable_execute(self) -> "SparkConnectClient":
         self._use_reattachable_execute = True
         return self
 
+    def register_retry_policy(self, policy: RetryPolicy):
+        if policy.name in self._known_retry_policies:
+            raise ValueError("Already known policy")
+        self._known_retry_policies[policy.name] = policy
+
+    def set_retry_policies(self, policies: List[str]):
+        self._retry_policies = [self._known_retry_policies[name] for name in policies]

Review Comment:
   > they would anyway have to deal with the object
   
   probably not really needs obtaining the actualy object, but rather import statement from somewhere, and a new instance construction
   
   Don't really object other ways to do it, just explaining why I did that :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45733][CONNECT][PYTHON] Support multiple retry policies [spark]

Posted by "cdkrot (via GitHub)" <gi...@apache.org>.
cdkrot commented on code in PR #43591:
URL: https://github.com/apache/spark/pull/43591#discussion_r1383117165


##########
python/pyspark/sql/connect/client/retries.py:
##########
@@ -0,0 +1,228 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import random
+import time
+import typing
+from typing import Optional, Callable, Generator, List, Type
+from types import TracebackType
+from pyspark.sql.connect.client.logging import logger
+
+"""
+This module contains retry system. The system is designed to be
+significantly customizable.
+
+A key aspect of retries is RetryPolicy class, describing a single policy.
+There can be more than one policy defined at the same time. Each policy
+determines which error types it can retry and how exactly.
+
+For instance, networking errors should likely be retried differently that
+remote resource being available.
+
+Given a sequence of policies, retry logic applies all of them in sequential
+order, keeping track of different policies budgets.
+"""
+
+
+class RetryPolicy:
+    """
+    Describes key aspects of RetryPolicy.
+
+    It's advised that different policies are implemented as different subclasses.
+    """
+
+    def __init__(
+        self,
+        max_retries: Optional[int] = None,
+        initial_backoff: int = 1000,
+        max_backoff: Optional[int] = None,
+        backoff_multiplier: float = 1.0,
+        jitter: int = 0,
+        min_jitter_threshold: int = 0,
+    ):
+        self.max_retries = max_retries
+        self.initial_backoff = initial_backoff
+        self.max_backoff = max_backoff
+        self.backoff_multiplier = backoff_multiplier
+        self.jitter = jitter
+        self.min_jitter_threshold = min_jitter_threshold
+
+    @property
+    def name(self):
+        return self.__class__.__name__
+
+    def can_retry(self, exception: BaseException):
+        return False
+
+    def to_state(self) -> "RetryPolicyState":
+        return RetryPolicyState(self)
+
+
+class RetryPolicyState:
+    """
+    This class represents stateful part of the specific policy.
+    """
+
+    def __init__(self, policy: RetryPolicy):
+        self._policy = policy
+
+        # Will allow attempts [0, self._policy.max_retries)
+        self._attempt = 0
+        self._next_wait: float = self._policy.initial_backoff
+
+    @property
+    def policy(self):
+        return self._policy
+
+    @property
+    def name(self):
+        return self.policy.name
+
+    def can_retry(self, exception: BaseException):
+        return self.policy.can_retry(exception)
+
+    def next_attempt(self) -> Optional[int]:
+        """
+        Returns
+        -------
+            Randomized time (in milliseconds) to wait until this attempt
+            or None if this policy doesn't allow more retries.
+        """
+
+        if self.policy.max_retries is not None and self._attempt >= self.policy.max_retries:
+            # No more retries under this policy
+            return None
+
+        self._attempt += 1
+        wait_time = self._next_wait
+
+        # Calculate future backoff
+        if self.policy.max_backoff is not None:
+            self._next_wait = min(
+                float(self.policy.max_backoff), wait_time * self.policy.backoff_multiplier
+            )
+
+        # Jitter current backoff, after the future backoff was computed
+        if wait_time >= self.policy.min_jitter_threshold:
+            wait_time += random.uniform(0, self.policy.jitter)
+
+        # Round to whole number of milliseconds
+        return int(wait_time)
+
+
+class AttemptManager:
+    """
+    Simple ContextManager that is used to capture the exception thrown inside the context.
+    """
+
+    def __init__(self, retrying: "Retrying") -> None:
+        self._retrying = retrying
+
+    def __enter__(self) -> None:
+        pass
+
+    def __exit__(
+        self,
+        exc_type: Optional[Type[BaseException]],
+        exception: Optional[BaseException],
+        exc_tb: Optional[TracebackType],
+    ) -> Optional[bool]:
+        if isinstance(exception, BaseException):
+            # Swallow the exception.
+            if self._retrying.accept_exception(exception):
+                return True
+            # Bubble up the exception.
+            return False
+        else:
+            self._retrying.accept_succeeded()
+            return None
+
+
+class Retrying:
+    """
+    This class is a point of entry into the retry logic.
+    The class accepts a list of retry policies and applies them in given order.
+    The first policy accepting an exception will be used.
+    """
+
+    def __init__(
+        self,
+        policies: typing.Union[RetryPolicy, typing.Iterable[RetryPolicy]],
+        sleep: Callable[[float], None] = time.sleep,
+    ) -> None:
+        if isinstance(policies, RetryPolicy):
+            policies = [policies]
+        self._policies: List[RetryPolicyState] = [policy.to_state() for policy in policies]
+        self._sleep = sleep
+
+        self._exception: Optional[BaseException] = None
+        self._done = False
+
+    def can_retry(self, exception: BaseException) -> bool:
+        return any(policy.can_retry(exception) for policy in self._policies)

Review Comment:
   This is a behavior I've implemented specifically to match the current design -- it also does classify errors into "retryable" and "not retryable", and the logic of spacing retries is separate.
   
   I can add some more comments



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45733][CONNECT][PYTHON] Support multiple retry policies [spark]

Posted by "cdkrot (via GitHub)" <gi...@apache.org>.
cdkrot commented on code in PR #43591:
URL: https://github.com/apache/spark/pull/43591#discussion_r1383373607


##########
python/pyspark/sql/connect/client/reattach.py:
##########
@@ -335,6 +321,5 @@ def __del__(self) -> None:
 
 class RetryException(Exception):
     """
-    An exception that can be thrown upstream when inside retry and which will be retryable
-    regardless of policy.
+    An exception that can be thrown upstream when inside retry and which is always retryable
     """

Review Comment:
   Moved



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45733][CONNECT][PYTHON] Support multiple retry policies [spark]

Posted by "juliuszsompolski (via GitHub)" <gi...@apache.org>.
juliuszsompolski commented on code in PR #43591:
URL: https://github.com/apache/spark/pull/43591#discussion_r1384793010


##########
python/pyspark/sql/connect/client/core.py:
##########
@@ -688,6 +653,36 @@ def enable_reattachable_execute(self) -> "SparkConnectClient":
         self._use_reattachable_execute = True
         return self
 
+    def register_retry_policy(self, policy: RetryPolicy):
+        """
+        Registers specified policy in the dictionary of known policies.
+
+        To activate it, use set_retry_policies().
+        """
+        if policy.name in self._known_retry_policies:
+            raise ValueError("Already known policy")
+        self._known_retry_policies[policy.name] = policy
+
+    def set_retry_policies(self, policies: List[Union[str, RetryPolicy]]):
+        """
+        Sets list of policies to be used for retries.
+        I.e. set_retry_policies(["DefaultPolicy", "CustomPolicy"]).
+
+        If policy is given as string, the policy object is sourced from previously
+        registered policies.
+        Specifying policy directly bypasses the registration mechanism.
+        """
+        self._retry_policies = [
+            policy if isinstance(policy, RetryPolicy) else self._known_retry_policies[policy]
+            for policy in policies
+        ]
+
+    def get_retry_policies(self) -> List[str]:
+        """
+        Return list of currently used policies, i.e. ["DefaultPolicy", "MyCustomPolicy"].
+        """
+        return [policy.name for policy in self._retry_policies]

Review Comment:
   I continue to not see sense in having the `register_retry_policy` API and see confusion from either having `known_retry_policies` identified by a string which is just a class name, and different instances of the same class name being easily confused, or by specific instance of the object. I see this as unnecessarily convoluted and not intuitive.
   
   I would propose to leave just
   ```
       def set_retry_policies(self, policies: List[RetryPolicy]):
   
       def get_retry_policies(self) -> List[RetryPolicy]:
   ```
   and get rid of the `_known_retry_policies`.
   
   I leave it to other interviewers if they disagree and think that the current API makes sense.
   
   Otherwise, LGTM.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org