You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/07/01 03:10:11 UTC

[GitHub] [flink] pengmide opened a new pull request, #20120: [FLINK-27964][python] Support Cassandra connector in Python DataStream API

pengmide opened a new pull request, #20120:
URL: https://github.com/apache/flink/pull/20120

   ## What is the purpose of the change
   
   Support Cassandra connector in Python DataStream API.
   
   ## Brief change log
   
     - Introduces policy methods in `ClusterBuilder`.
   
   ## Verifying this change
   
     - CassandraSinkTest could verify the ClusterBuilder in CassandraSink.
     
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): (yes / **no**)
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**)
     - The serializers: (yes / **no** / don't know)
     - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know)
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / **no** / don't know)
     - The S3 file system connector: (yes / **no** / don't know)
   
   ## Documentation
   
     - Does this pull request introduce a new feature? (yes / **no**)
     - If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not documented)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] pengmide commented on a diff in pull request #20120: [FLINK-27964][python] Support Cassandra connector in Python DataStream API

Posted by GitBox <gi...@apache.org>.
pengmide commented on code in PR #20120:
URL: https://github.com/apache/flink/pull/20120#discussion_r915551455


##########
flink-python/pyflink/datastream/connectors/cassandra.py:
##########
@@ -126,14 +132,340 @@ def if_not_exists(self, enabled: bool) -> 'MapperOptions':
         return self
 
 
+# ---- Classes introduced to construct the ClusterBuilder ----
+
+
+class LoadBalancingPolicy(object):
+    """
+    The policy that decides which Cassandra hosts to contact for each new query.
+
+    The LoadBalancingPolicy is informed of hosts up/down events. For efficiency purposes, the policy
+    is expected to exclude down hosts from query plans.
+    """
+
+    def __init__(self, j_load_balancing_policy):
+        self._j_load_balancing_policy = j_load_balancing_policy
+
+    @staticmethod
+    def dc_aware_round_robin_policy() -> 'LoadBalancingPolicy':
+        """
+        The default load balancing policy.
+
+        The default load balancing policy is DCAwareRoundRobinPolicy with token awareness.
+        """
+        JPolicies = get_gateway().jvm.com.datastax.driver.core.policies.Policies
+        return LoadBalancingPolicy(JPolicies.defaultLoadBalancingPolicy())
+
+    @staticmethod
+    def round_robin_policy() -> 'LoadBalancingPolicy':
+        """
+        A Round-robin load balancing policy.
+
+        This policy queries nodes in a round-robin fashion. For a given query, if an host fail, the
+        next one (following the round-robin order) is tried, until all hosts have been tried.
+
+        This policy is not datacenter aware and will include every known Cassandra hosts in its
+        round-robin algorithm. If you use multiple datacenter this will be inefficient, and you will
+        want to use the DCAwareRoundRobinPolicy load balancing policy instead.
+        """
+        JRoundRobinPolicy = get_gateway().jvm.com.datastax.driver.core.policies.RoundRobinPolicy
+        return LoadBalancingPolicy(JRoundRobinPolicy())
+
+
+class ReconnectionPolicy(object):
+    """
+    Policy that decides how often the reconnection to a dead node is attempted.
+
+    Note that if the driver receives a push notification from the Cassandra cluster that a node is
+    UP, any existing ReconnectionSchedule on that node will be cancelled and a new one will be
+    created (in effect, the driver reset the scheduler).
+
+    The default ExponentialReconnectionPolicy policy is usually adequate.
+    """
+
+    def __init__(self, j_reconnection_policy):
+        self._j_reconnection_policy = j_reconnection_policy
+
+    @staticmethod
+    def exponential_reconnection_policy(base_delay_ms: int = 1000, max_delay_ms: int = 600000) \
+            -> 'ReconnectionPolicy':
+        """
+        The default load reconnection policy.
+
+        A reconnection policy that waits exponentially longer between each reconnection attempt
+        (but keeps a constant delay once a maximum delay is reached).
+        """
+        JExponentialReconnectionPolicy = get_gateway().jvm. \
+            com.datastax.driver.core.policies.ExponentialReconnectionPolicy
+        return ReconnectionPolicy(JExponentialReconnectionPolicy(base_delay_ms, max_delay_ms))
+
+    @staticmethod
+    def constant_reconnection_policy(constant_delay_ms: int) -> 'ReconnectionPolicy':
+        """
+        A reconnection policy that waits a constant time between each reconnection attempt.
+        """
+        JConstantReconnectionPolicy = get_gateway().jvm.\
+            com.datastax.driver.core.policies.ConstantReconnectionPolicy
+        return ReconnectionPolicy(JConstantReconnectionPolicy(constant_delay_ms))
+
+
+class RetryPolicy(object):
+    """
+    A policy that defines a default behavior to adopt when a request fails.
+
+    There are three possible decisions:
+    - RETHROW: no retry should be attempted and an exception should be thrown.
+    - RETRY: the operation will be retried. The consistency level of the retry should be specified.
+    - IGNORE: no retry should be attempted and the exception should be ignored. In that case, the
+              operation that triggered the Cassandra exception will return an empty result set.
+    """
+
+    def __init__(self, j_retry_policy):
+        self._j_retry_policy = j_retry_policy
+
+    @staticmethod
+    def consistency_retry_policy() -> 'RetryPolicy':
+        """
+        The default retry policy.
+
+        This policy retries queries in only two cases:
+        - On a read timeout, retries once on the same host if enough replicas replied but data was
+          not retrieved.
+        - On a write timeout, retries once on the same host if we timeout while writing the
+          distributed log used by batch statements.
+        - On an unavailable exception, retries once on the next host.
+        - On a request error, such as a client timeout, the query is retried on the next host.
+          Do not retry on read or write failures.
+        """
+        JPolicies = get_gateway().jvm.com.datastax.driver.core.policies.Policies
+        return RetryPolicy(JPolicies.defaultRetryPolicy())
+
+    @staticmethod
+    def fallthrough_retry_policy() -> 'RetryPolicy':
+        """

Review Comment:
   public LoggingRetryPolicy(RetryPolicy policy) method has interface parameters: policy that can not be serialized. Do you have a better idea?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] pengmide commented on a diff in pull request #20120: [FLINK-27964][python] Support Cassandra connector in Python DataStream API

Posted by GitBox <gi...@apache.org>.
pengmide commented on code in PR #20120:
URL: https://github.com/apache/flink/pull/20120#discussion_r911802784


##########
flink-python/pyflink/datastream/connectors/cassandra.py:
##########
@@ -126,14 +132,340 @@ def if_not_exists(self, enabled: bool) -> 'MapperOptions':
         return self
 
 
+# ---- Classes introduced to construct the ClusterBuilder ----
+
+
+class LoadBalancingPolicy(object):
+    """
+    The policy that decides which Cassandra hosts to contact for each new query.
+
+    The LoadBalancingPolicy is informed of hosts up/down events. For efficiency purposes, the policy
+    is expected to exclude down hosts from query plans.
+    """
+
+    def __init__(self, j_load_balancing_policy):
+        self._j_load_balancing_policy = j_load_balancing_policy
+
+    @staticmethod
+    def dc_aware_round_robin_policy() -> 'LoadBalancingPolicy':

Review Comment:
   What about fixing this in a separate PR as this PR is already very big?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] pengmide commented on a diff in pull request #20120: [FLINK-27964][python] Support Cassandra connector in Python DataStream API

Posted by GitBox <gi...@apache.org>.
pengmide commented on code in PR #20120:
URL: https://github.com/apache/flink/pull/20120#discussion_r915548401


##########
flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/SimpleClusterBuilder.java:
##########
@@ -0,0 +1,338 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.cassandra;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.Metrics;
+import com.datastax.driver.core.ProtocolVersion;
+import com.datastax.driver.core.policies.AddressTranslator;
+import com.datastax.driver.core.policies.LoadBalancingPolicy;
+import com.datastax.driver.core.policies.Policies;
+import com.datastax.driver.core.policies.ReconnectionPolicy;
+import com.datastax.driver.core.policies.RetryPolicy;
+import com.datastax.driver.core.policies.SpeculativeExecutionPolicy;
+
+import java.io.Serializable;
+import java.net.InetSocketAddress;
+import java.util.Collection;
+
+/** A Simple ClusterBuilder which is currently used in PyFlink Cassandra connector. */
+public class SimpleClusterBuilder extends ClusterBuilder implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    public static Cluster.Builder clusterBuilder;

Review Comment:
   @dianfu The Cluster.Builder cannot be serialized, I have update the implementation in buildCluster method, PTAL~



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] dianfu commented on a diff in pull request #20120: [FLINK-27964][python] Support Cassandra connector in Python DataStream API

Posted by GitBox <gi...@apache.org>.
dianfu commented on code in PR #20120:
URL: https://github.com/apache/flink/pull/20120#discussion_r911637191


##########
flink-python/pyflink/datastream/connectors/cassandra.py:
##########
@@ -126,6 +127,149 @@ def if_not_exists(self, enabled: bool) -> 'MapperOptions':
         return self
 
 
+# ---- Classes introduced to construct the ClusterBuilder ----
+

Review Comment:
   Update the unit tests for the added functionalities?
   PS: the test_connector.py has been moved to directory pyflink/datastream/connectors/tests/. Need to rebase the PR when adding tests.



##########
flink-python/pyflink/datastream/connectors/cassandra.py:
##########
@@ -126,6 +127,149 @@ def if_not_exists(self, enabled: bool) -> 'MapperOptions':
         return self
 
 
+# ---- Classes introduced to construct the ClusterBuilder ----
+
+
+class LoadBalancingPolicy(object):
+    """
+    The policy that decides which Cassandra hosts to contact for each new query.
+
+    The LoadBalancingPolicy is informed of hosts up/down events. For efficiency purposes, the policy
+    is expected to exclude down hosts from query plans.
+    """
+
+    def __init__(self, j_load_balancing_policy):
+        self._j_load_balancing_policy = j_load_balancing_policy
+
+    @staticmethod
+    def default_load_balancing_policy() -> 'LoadBalancingPolicy':

Review Comment:
   ```suggestion
       def dc_aware_round_robin_policy() -> 'LoadBalancingPolicy':
   ```



##########
flink-python/pyflink/datastream/connectors/cassandra.py:
##########
@@ -135,6 +279,168 @@ class ClusterBuilder(object):
     def __init__(self, j_cluster_builder):
         self._j_cluster_builder = j_cluster_builder

Review Comment:
   If we introduce class SimpleClusterBuilder, we could set self._j_cluster_builder to SimpleClusterBuilder() in the constructor.



##########
flink-python/pyflink/datastream/connectors/cassandra.py:
##########
@@ -126,6 +127,149 @@ def if_not_exists(self, enabled: bool) -> 'MapperOptions':
         return self
 
 
+# ---- Classes introduced to construct the ClusterBuilder ----
+
+
+class LoadBalancingPolicy(object):
+    """
+    The policy that decides which Cassandra hosts to contact for each new query.
+
+    The LoadBalancingPolicy is informed of hosts up/down events. For efficiency purposes, the policy
+    is expected to exclude down hosts from query plans.
+    """
+
+    def __init__(self, j_load_balancing_policy):
+        self._j_load_balancing_policy = j_load_balancing_policy
+
+    @staticmethod
+    def default_load_balancing_policy() -> 'LoadBalancingPolicy':
+        """
+        The default load balancing policy.
+
+        The default load balancing policy is DCAwareRoundRobinPolicy with token awareness.
+        """
+        JPolicies = get_gateway().jvm.com.datastax.driver.core.policies.Policies
+        return LoadBalancingPolicy(JPolicies.defaultLoadBalancingPolicy())
+
+    @staticmethod
+    def round_robin_policy() -> 'LoadBalancingPolicy':
+        """
+        A Round-robin load balancing policy.
+
+        This policy queries nodes in a round-robin fashion. For a given query, if an host fail, the
+        next one (following the round-robin order) is tried, until all hosts have been tried.
+
+        This policy is not datacenter aware and will include every known Cassandra hosts in its
+        round-robin algorithm. If you use multiple datacenter this will be inefficient, and you will
+        want to use the DCAwareRoundRobinPolicy load balancing policy instead.
+        """
+        JRoundRobinPolicy = get_gateway().jvm.com.datastax.driver.core.policies.RoundRobinPolicy
+        return LoadBalancingPolicy(JRoundRobinPolicy())
+
+
+class ReconnectionPolicy(object):
+    """
+    Policy that decides how often the reconnection to a dead node is attempted.
+
+    Note that if the driver receives a push notification from the Cassandra cluster that a node is
+    UP, any existing ReconnectionSchedule on that node will be cancelled and a new one will be
+    created (in effect, the driver reset the scheduler).
+
+    The default ExponentialReconnectionPolicy policy is usually adequate.
+    """
+
+    def __init__(self, j_reconnection_policy):
+        self._j_reconnection_policy = j_reconnection_policy
+
+    @staticmethod
+    def constant_reconnection_policy(constant_delay_ms: int) -> 'ReconnectionPolicy':
+        """
+        A reconnection policy that waits a constant time between each reconnection attempt.
+        """
+        JConstantReconnectionPolicy = get_gateway().jvm.\
+            com.datastax.driver.core.policies.ConstantReconnectionPolicy
+        return ReconnectionPolicy(JConstantReconnectionPolicy(constant_delay_ms))
+
+    @staticmethod
+    def exponential_reconnection_policy(base_delay_ms: int = 1000, max_delay_ms: int = 600000) \
+            -> 'ReconnectionPolicy':
+        """
+        A reconnection policy that waits exponentially longer between each reconnection attempt
+        (but keeps a constant delay once a maximum delay is reached).
+        """
+        JExponentialReconnectionPolicy = get_gateway().jvm. \
+            com.datastax.driver.core.policies.ConstantReconnectionPolicy
+        return ReconnectionPolicy(JExponentialReconnectionPolicy(base_delay_ms, max_delay_ms))
+
+
+class RetryPolicy(object):
+    """
+    A policy that defines a default behavior to adopt when a request fails.
+
+    There are three possible decisions:
+    - RETHROW: no retry should be attempted and an exception should be thrown.
+    - RETRY: the operation will be retried. The consistency level of the retry should be specified.
+    - IGNORE: no retry should be attempted and the exception should be ignored. In that case, the
+              operation that triggered the Cassandra exception will return an empty result set.
+    """
+
+    def __init__(self, j_retry_policy):
+        self._j_retry_policy = j_retry_policy
+
+    @staticmethod
+    def default_retry_policy() -> 'RetryPolicy':
+        """
+        The default retry policy.

Review Comment:
   It's not clear what's the behavior of the default retry policy. It would be great to document it clearly. Besides, it would be great to also update the method name accordingly.



##########
flink-python/pyflink/datastream/connectors/cassandra.py:
##########
@@ -135,6 +279,168 @@ class ClusterBuilder(object):
     def __init__(self, j_cluster_builder):
         self._j_cluster_builder = j_cluster_builder
 
+    def with_cluster_name(self, name: str) -> 'ClusterBuilder':
+        """
+        An optional name for the creation cluster.
+
+        Note: this is not related to the Cassandra cluster name (though you are free to provide the
+        same name).
+        """
+        self._j_cluster_builder.withClusterName(name)

Review Comment:
   Where the _j_cluster_builder comes from? I guess we need to introduce a class SimpleClusterBuilder.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] pengmide commented on a diff in pull request #20120: [FLINK-27964][python] Support Cassandra connector in Python DataStream API

Posted by GitBox <gi...@apache.org>.
pengmide commented on code in PR #20120:
URL: https://github.com/apache/flink/pull/20120#discussion_r911803654


##########
flink-python/pyflink/datastream/connectors/cassandra.py:
##########
@@ -126,14 +132,340 @@ def if_not_exists(self, enabled: bool) -> 'MapperOptions':
         return self
 
 
+# ---- Classes introduced to construct the ClusterBuilder ----
+
+
+class LoadBalancingPolicy(object):
+    """
+    The policy that decides which Cassandra hosts to contact for each new query.
+
+    The LoadBalancingPolicy is informed of hosts up/down events. For efficiency purposes, the policy
+    is expected to exclude down hosts from query plans.
+    """
+
+    def __init__(self, j_load_balancing_policy):
+        self._j_load_balancing_policy = j_load_balancing_policy
+
+    @staticmethod
+    def dc_aware_round_robin_policy() -> 'LoadBalancingPolicy':
+        """
+        The default load balancing policy.
+
+        The default load balancing policy is DCAwareRoundRobinPolicy with token awareness.
+        """
+        JPolicies = get_gateway().jvm.com.datastax.driver.core.policies.Policies
+        return LoadBalancingPolicy(JPolicies.defaultLoadBalancingPolicy())
+
+    @staticmethod
+    def round_robin_policy() -> 'LoadBalancingPolicy':
+        """
+        A Round-robin load balancing policy.
+
+        This policy queries nodes in a round-robin fashion. For a given query, if an host fail, the
+        next one (following the round-robin order) is tried, until all hosts have been tried.
+
+        This policy is not datacenter aware and will include every known Cassandra hosts in its
+        round-robin algorithm. If you use multiple datacenter this will be inefficient, and you will
+        want to use the DCAwareRoundRobinPolicy load balancing policy instead.
+        """
+        JRoundRobinPolicy = get_gateway().jvm.com.datastax.driver.core.policies.RoundRobinPolicy
+        return LoadBalancingPolicy(JRoundRobinPolicy())
+
+
+class ReconnectionPolicy(object):
+    """
+    Policy that decides how often the reconnection to a dead node is attempted.
+
+    Note that if the driver receives a push notification from the Cassandra cluster that a node is
+    UP, any existing ReconnectionSchedule on that node will be cancelled and a new one will be
+    created (in effect, the driver reset the scheduler).
+
+    The default ExponentialReconnectionPolicy policy is usually adequate.
+    """
+
+    def __init__(self, j_reconnection_policy):
+        self._j_reconnection_policy = j_reconnection_policy
+
+    @staticmethod
+    def exponential_reconnection_policy(base_delay_ms: int = 1000, max_delay_ms: int = 600000) \
+            -> 'ReconnectionPolicy':
+        """
+        The default load reconnection policy.
+
+        A reconnection policy that waits exponentially longer between each reconnection attempt
+        (but keeps a constant delay once a maximum delay is reached).
+        """
+        JExponentialReconnectionPolicy = get_gateway().jvm. \
+            com.datastax.driver.core.policies.ExponentialReconnectionPolicy
+        return ReconnectionPolicy(JExponentialReconnectionPolicy(base_delay_ms, max_delay_ms))
+
+    @staticmethod
+    def constant_reconnection_policy(constant_delay_ms: int) -> 'ReconnectionPolicy':
+        """
+        A reconnection policy that waits a constant time between each reconnection attempt.
+        """
+        JConstantReconnectionPolicy = get_gateway().jvm.\
+            com.datastax.driver.core.policies.ConstantReconnectionPolicy
+        return ReconnectionPolicy(JConstantReconnectionPolicy(constant_delay_ms))
+
+
+class RetryPolicy(object):
+    """
+    A policy that defines a default behavior to adopt when a request fails.
+
+    There are three possible decisions:
+    - RETHROW: no retry should be attempted and an exception should be thrown.
+    - RETRY: the operation will be retried. The consistency level of the retry should be specified.
+    - IGNORE: no retry should be attempted and the exception should be ignored. In that case, the
+              operation that triggered the Cassandra exception will return an empty result set.
+    """
+
+    def __init__(self, j_retry_policy):
+        self._j_retry_policy = j_retry_policy
+
+    @staticmethod
+    def consistency_retry_policy() -> 'RetryPolicy':
+        """
+        The default retry policy.
+
+        This policy retries queries in only two cases:
+        - On a read timeout, retries once on the same host if enough replicas replied but data was
+          not retrieved.
+        - On a write timeout, retries once on the same host if we timeout while writing the
+          distributed log used by batch statements.
+        - On an unavailable exception, retries once on the next host.
+        - On a request error, such as a client timeout, the query is retried on the next host.
+          Do not retry on read or write failures.
+        """
+        JPolicies = get_gateway().jvm.com.datastax.driver.core.policies.Policies
+        return RetryPolicy(JPolicies.defaultRetryPolicy())
+
+    @staticmethod
+    def fallthrough_retry_policy() -> 'RetryPolicy':
+        """
+        A retry policy that never retries (nor ignores).
+        """
+        JFallthroughRetryPolicy = get_gateway().jvm.com.datastax.driver.core.policies. \
+            FallthroughRetryPolicy
+        return RetryPolicy(JFallthroughRetryPolicy.INSTANCE)
+
+
+class SpeculativeExecutionPolicy(object):
+    """
+    The policy that decides if the driver will send speculative queries to the next hosts when the
+    current host takes too long to respond.
+
+    Note that only idempotent statements will be speculatively retried.
+    """
+
+    def __init__(self, j_speculative_execution_policy):
+        self._j_speculative_execution_policy = j_speculative_execution_policy
+

Review Comment:
   What about fixing this in a separate PR as this PR is already very big?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] flinkbot commented on pull request #20120: [FLINK-27964][python] Support Cassandra connector in Python DataStream API

Posted by GitBox <gi...@apache.org>.
flinkbot commented on PR #20120:
URL: https://github.com/apache/flink/pull/20120#issuecomment-1171884235

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "5ce536787b928c5434d228219b25f47f475e48f7",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "5ce536787b928c5434d228219b25f47f475e48f7",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 5ce536787b928c5434d228219b25f47f475e48f7 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] dianfu commented on a diff in pull request #20120: [FLINK-27964][python] Support Cassandra connector in Python DataStream API

Posted by GitBox <gi...@apache.org>.
dianfu commented on code in PR #20120:
URL: https://github.com/apache/flink/pull/20120#discussion_r911791239


##########
flink-python/pyflink/datastream/connectors/cassandra.py:
##########
@@ -126,14 +132,340 @@ def if_not_exists(self, enabled: bool) -> 'MapperOptions':
         return self
 
 
+# ---- Classes introduced to construct the ClusterBuilder ----
+
+
+class LoadBalancingPolicy(object):
+    """
+    The policy that decides which Cassandra hosts to contact for each new query.
+
+    The LoadBalancingPolicy is informed of hosts up/down events. For efficiency purposes, the policy
+    is expected to exclude down hosts from query plans.
+    """
+
+    def __init__(self, j_load_balancing_policy):
+        self._j_load_balancing_policy = j_load_balancing_policy
+
+    @staticmethod
+    def dc_aware_round_robin_policy() -> 'LoadBalancingPolicy':

Review Comment:
   Could we also investigate whether it's possible to support the following policy: 
   
   - WhiteListPolicy: See WhiteListPolicy.ofHosts for more details on how to create WhiteListPolicy
   - HostFilterPolicy: See HostFilterPolicy.fromDCWhiteList and HostFilterPolicy.fromDCBlackList for more details
   - LatencyAwarePolicy
   - TokenAwarePolicy



##########
flink-python/pyflink/datastream/connectors/cassandra.py:
##########
@@ -126,14 +132,340 @@ def if_not_exists(self, enabled: bool) -> 'MapperOptions':
         return self
 
 
+# ---- Classes introduced to construct the ClusterBuilder ----
+
+
+class LoadBalancingPolicy(object):
+    """
+    The policy that decides which Cassandra hosts to contact for each new query.
+
+    The LoadBalancingPolicy is informed of hosts up/down events. For efficiency purposes, the policy
+    is expected to exclude down hosts from query plans.
+    """
+
+    def __init__(self, j_load_balancing_policy):
+        self._j_load_balancing_policy = j_load_balancing_policy
+
+    @staticmethod
+    def dc_aware_round_robin_policy() -> 'LoadBalancingPolicy':
+        """
+        The default load balancing policy.

Review Comment:
   The doc could be updated to the following: 
   ```
   A DCAwareRoundRobinPolicy with token awareness. This is also the default load balancing policy.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] dianfu commented on a diff in pull request #20120: [FLINK-27964][python] Support Cassandra connector in Python DataStream API

Posted by GitBox <gi...@apache.org>.
dianfu commented on code in PR #20120:
URL: https://github.com/apache/flink/pull/20120#discussion_r911793908


##########
flink-python/pyflink/datastream/connectors/cassandra.py:
##########
@@ -126,14 +132,340 @@ def if_not_exists(self, enabled: bool) -> 'MapperOptions':
         return self
 
 
+# ---- Classes introduced to construct the ClusterBuilder ----
+
+
+class LoadBalancingPolicy(object):
+    """
+    The policy that decides which Cassandra hosts to contact for each new query.
+
+    The LoadBalancingPolicy is informed of hosts up/down events. For efficiency purposes, the policy
+    is expected to exclude down hosts from query plans.
+    """
+
+    def __init__(self, j_load_balancing_policy):
+        self._j_load_balancing_policy = j_load_balancing_policy
+
+    @staticmethod
+    def dc_aware_round_robin_policy() -> 'LoadBalancingPolicy':
+        """
+        The default load balancing policy.
+
+        The default load balancing policy is DCAwareRoundRobinPolicy with token awareness.
+        """
+        JPolicies = get_gateway().jvm.com.datastax.driver.core.policies.Policies
+        return LoadBalancingPolicy(JPolicies.defaultLoadBalancingPolicy())
+
+    @staticmethod
+    def round_robin_policy() -> 'LoadBalancingPolicy':
+        """
+        A Round-robin load balancing policy.
+
+        This policy queries nodes in a round-robin fashion. For a given query, if an host fail, the
+        next one (following the round-robin order) is tried, until all hosts have been tried.
+
+        This policy is not datacenter aware and will include every known Cassandra hosts in its
+        round-robin algorithm. If you use multiple datacenter this will be inefficient, and you will
+        want to use the DCAwareRoundRobinPolicy load balancing policy instead.
+        """
+        JRoundRobinPolicy = get_gateway().jvm.com.datastax.driver.core.policies.RoundRobinPolicy
+        return LoadBalancingPolicy(JRoundRobinPolicy())
+
+
+class ReconnectionPolicy(object):
+    """
+    Policy that decides how often the reconnection to a dead node is attempted.
+
+    Note that if the driver receives a push notification from the Cassandra cluster that a node is
+    UP, any existing ReconnectionSchedule on that node will be cancelled and a new one will be
+    created (in effect, the driver reset the scheduler).
+
+    The default ExponentialReconnectionPolicy policy is usually adequate.
+    """
+
+    def __init__(self, j_reconnection_policy):
+        self._j_reconnection_policy = j_reconnection_policy
+
+    @staticmethod
+    def exponential_reconnection_policy(base_delay_ms: int = 1000, max_delay_ms: int = 600000) \
+            -> 'ReconnectionPolicy':
+        """
+        The default load reconnection policy.
+
+        A reconnection policy that waits exponentially longer between each reconnection attempt
+        (but keeps a constant delay once a maximum delay is reached).
+        """
+        JExponentialReconnectionPolicy = get_gateway().jvm. \
+            com.datastax.driver.core.policies.ExponentialReconnectionPolicy
+        return ReconnectionPolicy(JExponentialReconnectionPolicy(base_delay_ms, max_delay_ms))
+
+    @staticmethod
+    def constant_reconnection_policy(constant_delay_ms: int) -> 'ReconnectionPolicy':
+        """
+        A reconnection policy that waits a constant time between each reconnection attempt.
+        """
+        JConstantReconnectionPolicy = get_gateway().jvm.\
+            com.datastax.driver.core.policies.ConstantReconnectionPolicy
+        return ReconnectionPolicy(JConstantReconnectionPolicy(constant_delay_ms))
+
+
+class RetryPolicy(object):
+    """
+    A policy that defines a default behavior to adopt when a request fails.
+
+    There are three possible decisions:
+    - RETHROW: no retry should be attempted and an exception should be thrown.
+    - RETRY: the operation will be retried. The consistency level of the retry should be specified.
+    - IGNORE: no retry should be attempted and the exception should be ignored. In that case, the
+              operation that triggered the Cassandra exception will return an empty result set.
+    """
+
+    def __init__(self, j_retry_policy):
+        self._j_retry_policy = j_retry_policy
+
+    @staticmethod
+    def consistency_retry_policy() -> 'RetryPolicy':
+        """
+        The default retry policy.
+
+        This policy retries queries in only two cases:
+        - On a read timeout, retries once on the same host if enough replicas replied but data was
+          not retrieved.
+        - On a write timeout, retries once on the same host if we timeout while writing the
+          distributed log used by batch statements.
+        - On an unavailable exception, retries once on the next host.
+        - On a request error, such as a client timeout, the query is retried on the next host.
+          Do not retry on read or write failures.
+        """
+        JPolicies = get_gateway().jvm.com.datastax.driver.core.policies.Policies
+        return RetryPolicy(JPolicies.defaultRetryPolicy())
+
+    @staticmethod
+    def fallthrough_retry_policy() -> 'RetryPolicy':
+        """

Review Comment:
   Add LoggingRetryPolicy?



##########
flink-python/pyflink/datastream/connectors/cassandra.py:
##########
@@ -126,14 +132,340 @@ def if_not_exists(self, enabled: bool) -> 'MapperOptions':
         return self
 
 
+# ---- Classes introduced to construct the ClusterBuilder ----
+
+
+class LoadBalancingPolicy(object):
+    """
+    The policy that decides which Cassandra hosts to contact for each new query.
+
+    The LoadBalancingPolicy is informed of hosts up/down events. For efficiency purposes, the policy
+    is expected to exclude down hosts from query plans.
+    """
+
+    def __init__(self, j_load_balancing_policy):
+        self._j_load_balancing_policy = j_load_balancing_policy
+
+    @staticmethod
+    def dc_aware_round_robin_policy() -> 'LoadBalancingPolicy':
+        """
+        The default load balancing policy.
+
+        The default load balancing policy is DCAwareRoundRobinPolicy with token awareness.
+        """
+        JPolicies = get_gateway().jvm.com.datastax.driver.core.policies.Policies
+        return LoadBalancingPolicy(JPolicies.defaultLoadBalancingPolicy())
+
+    @staticmethod
+    def round_robin_policy() -> 'LoadBalancingPolicy':
+        """
+        A Round-robin load balancing policy.
+
+        This policy queries nodes in a round-robin fashion. For a given query, if an host fail, the
+        next one (following the round-robin order) is tried, until all hosts have been tried.
+
+        This policy is not datacenter aware and will include every known Cassandra hosts in its
+        round-robin algorithm. If you use multiple datacenter this will be inefficient, and you will
+        want to use the DCAwareRoundRobinPolicy load balancing policy instead.
+        """
+        JRoundRobinPolicy = get_gateway().jvm.com.datastax.driver.core.policies.RoundRobinPolicy
+        return LoadBalancingPolicy(JRoundRobinPolicy())
+
+
+class ReconnectionPolicy(object):
+    """
+    Policy that decides how often the reconnection to a dead node is attempted.
+
+    Note that if the driver receives a push notification from the Cassandra cluster that a node is
+    UP, any existing ReconnectionSchedule on that node will be cancelled and a new one will be
+    created (in effect, the driver reset the scheduler).
+
+    The default ExponentialReconnectionPolicy policy is usually adequate.
+    """
+
+    def __init__(self, j_reconnection_policy):
+        self._j_reconnection_policy = j_reconnection_policy
+
+    @staticmethod
+    def exponential_reconnection_policy(base_delay_ms: int = 1000, max_delay_ms: int = 600000) \
+            -> 'ReconnectionPolicy':
+        """
+        The default load reconnection policy.
+
+        A reconnection policy that waits exponentially longer between each reconnection attempt
+        (but keeps a constant delay once a maximum delay is reached).
+        """
+        JExponentialReconnectionPolicy = get_gateway().jvm. \
+            com.datastax.driver.core.policies.ExponentialReconnectionPolicy
+        return ReconnectionPolicy(JExponentialReconnectionPolicy(base_delay_ms, max_delay_ms))
+
+    @staticmethod
+    def constant_reconnection_policy(constant_delay_ms: int) -> 'ReconnectionPolicy':
+        """
+        A reconnection policy that waits a constant time between each reconnection attempt.
+        """
+        JConstantReconnectionPolicy = get_gateway().jvm.\
+            com.datastax.driver.core.policies.ConstantReconnectionPolicy
+        return ReconnectionPolicy(JConstantReconnectionPolicy(constant_delay_ms))
+
+
+class RetryPolicy(object):
+    """
+    A policy that defines a default behavior to adopt when a request fails.
+
+    There are three possible decisions:
+    - RETHROW: no retry should be attempted and an exception should be thrown.
+    - RETRY: the operation will be retried. The consistency level of the retry should be specified.
+    - IGNORE: no retry should be attempted and the exception should be ignored. In that case, the
+              operation that triggered the Cassandra exception will return an empty result set.
+    """
+
+    def __init__(self, j_retry_policy):
+        self._j_retry_policy = j_retry_policy
+
+    @staticmethod
+    def consistency_retry_policy() -> 'RetryPolicy':
+        """
+        The default retry policy.
+
+        This policy retries queries in only two cases:
+        - On a read timeout, retries once on the same host if enough replicas replied but data was
+          not retrieved.
+        - On a write timeout, retries once on the same host if we timeout while writing the
+          distributed log used by batch statements.
+        - On an unavailable exception, retries once on the next host.
+        - On a request error, such as a client timeout, the query is retried on the next host.
+          Do not retry on read or write failures.
+        """
+        JPolicies = get_gateway().jvm.com.datastax.driver.core.policies.Policies
+        return RetryPolicy(JPolicies.defaultRetryPolicy())
+
+    @staticmethod
+    def fallthrough_retry_policy() -> 'RetryPolicy':
+        """
+        A retry policy that never retries (nor ignores).
+        """
+        JFallthroughRetryPolicy = get_gateway().jvm.com.datastax.driver.core.policies. \
+            FallthroughRetryPolicy
+        return RetryPolicy(JFallthroughRetryPolicy.INSTANCE)
+
+
+class SpeculativeExecutionPolicy(object):
+    """
+    The policy that decides if the driver will send speculative queries to the next hosts when the
+    current host takes too long to respond.
+
+    Note that only idempotent statements will be speculatively retried.
+    """
+
+    def __init__(self, j_speculative_execution_policy):
+        self._j_speculative_execution_policy = j_speculative_execution_policy
+

Review Comment:
   Also add PercentileSpeculativeExecutionPolicy?



##########
flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/SimpleClusterBuilder.java:
##########
@@ -0,0 +1,338 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.cassandra;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.Metrics;
+import com.datastax.driver.core.ProtocolVersion;
+import com.datastax.driver.core.policies.AddressTranslator;
+import com.datastax.driver.core.policies.LoadBalancingPolicy;
+import com.datastax.driver.core.policies.Policies;
+import com.datastax.driver.core.policies.ReconnectionPolicy;
+import com.datastax.driver.core.policies.RetryPolicy;
+import com.datastax.driver.core.policies.SpeculativeExecutionPolicy;
+
+import java.io.Serializable;
+import java.net.InetSocketAddress;
+import java.util.Collection;
+
+/** A Simple ClusterBuilder which is currently used in PyFlink Cassandra connector. */
+public class SimpleClusterBuilder extends ClusterBuilder implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    public static Cluster.Builder clusterBuilder;

Review Comment:
   ```suggestion
       private final Cluster.Builder clusterBuilder;
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org