You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/07/01 05:58:57 UTC

[GitHub] [flink] dianfu commented on a diff in pull request #20120: [FLINK-27964][python] Support Cassandra connector in Python DataStream API

dianfu commented on code in PR #20120:
URL: https://github.com/apache/flink/pull/20120#discussion_r911637191


##########
flink-python/pyflink/datastream/connectors/cassandra.py:
##########
@@ -126,6 +127,149 @@ def if_not_exists(self, enabled: bool) -> 'MapperOptions':
         return self
 
 
+# ---- Classes introduced to construct the ClusterBuilder ----
+

Review Comment:
   Update the unit tests for the added functionalities?
   PS: the test_connector.py has been moved to directory pyflink/datastream/connectors/tests/. Need to rebase the PR when adding tests.



##########
flink-python/pyflink/datastream/connectors/cassandra.py:
##########
@@ -126,6 +127,149 @@ def if_not_exists(self, enabled: bool) -> 'MapperOptions':
         return self
 
 
+# ---- Classes introduced to construct the ClusterBuilder ----
+
+
+class LoadBalancingPolicy(object):
+    """
+    The policy that decides which Cassandra hosts to contact for each new query.
+
+    The LoadBalancingPolicy is informed of hosts up/down events. For efficiency purposes, the policy
+    is expected to exclude down hosts from query plans.
+    """
+
+    def __init__(self, j_load_balancing_policy):
+        self._j_load_balancing_policy = j_load_balancing_policy
+
+    @staticmethod
+    def default_load_balancing_policy() -> 'LoadBalancingPolicy':

Review Comment:
   ```suggestion
       def dc_aware_round_robin_policy() -> 'LoadBalancingPolicy':
   ```



##########
flink-python/pyflink/datastream/connectors/cassandra.py:
##########
@@ -135,6 +279,168 @@ class ClusterBuilder(object):
     def __init__(self, j_cluster_builder):
         self._j_cluster_builder = j_cluster_builder

Review Comment:
   If we introduce class SimpleClusterBuilder, we could set self._j_cluster_builder to SimpleClusterBuilder() in the constructor.



##########
flink-python/pyflink/datastream/connectors/cassandra.py:
##########
@@ -126,6 +127,149 @@ def if_not_exists(self, enabled: bool) -> 'MapperOptions':
         return self
 
 
+# ---- Classes introduced to construct the ClusterBuilder ----
+
+
+class LoadBalancingPolicy(object):
+    """
+    The policy that decides which Cassandra hosts to contact for each new query.
+
+    The LoadBalancingPolicy is informed of hosts up/down events. For efficiency purposes, the policy
+    is expected to exclude down hosts from query plans.
+    """
+
+    def __init__(self, j_load_balancing_policy):
+        self._j_load_balancing_policy = j_load_balancing_policy
+
+    @staticmethod
+    def default_load_balancing_policy() -> 'LoadBalancingPolicy':
+        """
+        The default load balancing policy.
+
+        The default load balancing policy is DCAwareRoundRobinPolicy with token awareness.
+        """
+        JPolicies = get_gateway().jvm.com.datastax.driver.core.policies.Policies
+        return LoadBalancingPolicy(JPolicies.defaultLoadBalancingPolicy())
+
+    @staticmethod
+    def round_robin_policy() -> 'LoadBalancingPolicy':
+        """
+        A Round-robin load balancing policy.
+
+        This policy queries nodes in a round-robin fashion. For a given query, if an host fail, the
+        next one (following the round-robin order) is tried, until all hosts have been tried.
+
+        This policy is not datacenter aware and will include every known Cassandra hosts in its
+        round-robin algorithm. If you use multiple datacenter this will be inefficient, and you will
+        want to use the DCAwareRoundRobinPolicy load balancing policy instead.
+        """
+        JRoundRobinPolicy = get_gateway().jvm.com.datastax.driver.core.policies.RoundRobinPolicy
+        return LoadBalancingPolicy(JRoundRobinPolicy())
+
+
+class ReconnectionPolicy(object):
+    """
+    Policy that decides how often the reconnection to a dead node is attempted.
+
+    Note that if the driver receives a push notification from the Cassandra cluster that a node is
+    UP, any existing ReconnectionSchedule on that node will be cancelled and a new one will be
+    created (in effect, the driver reset the scheduler).
+
+    The default ExponentialReconnectionPolicy policy is usually adequate.
+    """
+
+    def __init__(self, j_reconnection_policy):
+        self._j_reconnection_policy = j_reconnection_policy
+
+    @staticmethod
+    def constant_reconnection_policy(constant_delay_ms: int) -> 'ReconnectionPolicy':
+        """
+        A reconnection policy that waits a constant time between each reconnection attempt.
+        """
+        JConstantReconnectionPolicy = get_gateway().jvm.\
+            com.datastax.driver.core.policies.ConstantReconnectionPolicy
+        return ReconnectionPolicy(JConstantReconnectionPolicy(constant_delay_ms))
+
+    @staticmethod
+    def exponential_reconnection_policy(base_delay_ms: int = 1000, max_delay_ms: int = 600000) \
+            -> 'ReconnectionPolicy':
+        """
+        A reconnection policy that waits exponentially longer between each reconnection attempt
+        (but keeps a constant delay once a maximum delay is reached).
+        """
+        JExponentialReconnectionPolicy = get_gateway().jvm. \
+            com.datastax.driver.core.policies.ConstantReconnectionPolicy
+        return ReconnectionPolicy(JExponentialReconnectionPolicy(base_delay_ms, max_delay_ms))
+
+
+class RetryPolicy(object):
+    """
+    A policy that defines a default behavior to adopt when a request fails.
+
+    There are three possible decisions:
+    - RETHROW: no retry should be attempted and an exception should be thrown.
+    - RETRY: the operation will be retried. The consistency level of the retry should be specified.
+    - IGNORE: no retry should be attempted and the exception should be ignored. In that case, the
+              operation that triggered the Cassandra exception will return an empty result set.
+    """
+
+    def __init__(self, j_retry_policy):
+        self._j_retry_policy = j_retry_policy
+
+    @staticmethod
+    def default_retry_policy() -> 'RetryPolicy':
+        """
+        The default retry policy.

Review Comment:
   It's not clear what's the behavior of the default retry policy. It would be great to document it clearly. Besides, it would be great to also update the method name accordingly.



##########
flink-python/pyflink/datastream/connectors/cassandra.py:
##########
@@ -135,6 +279,168 @@ class ClusterBuilder(object):
     def __init__(self, j_cluster_builder):
         self._j_cluster_builder = j_cluster_builder
 
+    def with_cluster_name(self, name: str) -> 'ClusterBuilder':
+        """
+        An optional name for the creation cluster.
+
+        Note: this is not related to the Cassandra cluster name (though you are free to provide the
+        same name).
+        """
+        self._j_cluster_builder.withClusterName(name)

Review Comment:
   Where the _j_cluster_builder comes from? I guess we need to introduce a class SimpleClusterBuilder.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org