You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/05/20 10:18:50 UTC

[GitHub] [beam] echauchot commented on a change in pull request #11732: [BEAM-10017] Expose Cassandra Connect and Read timeouts

echauchot commented on a change in pull request #11732:
URL: https://github.com/apache/beam/pull/11732#discussion_r427861203



##########
File path: sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
##########
@@ -106,19 +107,13 @@
  *
  * <h3>Cassandra Socket Options</h3>
  *
- * <p>The following example illustrates various options for tuning client socket:
+ * <p>The following example illustrates setting timeouts for the Cassandra client:

Review comment:
       Can you also change a leftover in the javadoc: An IO to read **and write** **from/to** Apache Cassandra ?

##########
File path: sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
##########
@@ -330,6 +331,24 @@ private CassandraIO() {}
       return builder().setMinNumberOfSplits(minNumberOfSplits).build();
     }
 
+    /** Cassandra client socket option to set the connect timeout. */
+    public Read<T> withConnectTimeout(Integer timeout) {
+      return withConnectTimeout(ValueProvider.StaticValueProvider.of(timeout));
+    }
+
+    public Read<T> withConnectTimeout(ValueProvider<Integer> timeout) {

Review comment:
       - you need a javadoc for both methods as they are both public (2 versions for backward compatibility as valueprovider was introduced lately)
   
   - specify that they are millis
   
   - add links to cassandra socketoptions setConnectTimeOut

##########
File path: sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
##########
@@ -330,6 +331,24 @@ private CassandraIO() {}
       return builder().setMinNumberOfSplits(minNumberOfSplits).build();
     }
 
+    /** Cassandra client socket option to set the connect timeout. */
+    public Read<T> withConnectTimeout(Integer timeout) {
+      return withConnectTimeout(ValueProvider.StaticValueProvider.of(timeout));

Review comment:
       Add input values check (!= null && > 0) as in the other methods with the checkArgument call. As the value provider version method relies on this version, put the checkArgument here. Do not forget to put the validation in the other parameters methods

##########
File path: sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
##########
@@ -330,6 +331,24 @@ private CassandraIO() {}
       return builder().setMinNumberOfSplits(minNumberOfSplits).build();
     }
 
+    /** Cassandra client socket option to set the connect timeout. */
+    public Read<T> withConnectTimeout(Integer timeout) {
+      return withConnectTimeout(ValueProvider.StaticValueProvider.of(timeout));
+    }
+
+    public Read<T> withConnectTimeout(ValueProvider<Integer> timeout) {
+      return builder().setConnectTimeout(timeout).build();
+    }
+
+    /** Cassandra client socket option to set the read timeout. */
+    public Read<T> withReadTimeout(Integer timeout) {
+      return withReadTimeout(ValueProvider.StaticValueProvider.of(timeout));
+    }
+
+    public Read<T> withReadTimeout(ValueProvider<Integer> timeout) {

Review comment:
       same as above

##########
File path: sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
##########
@@ -948,6 +980,15 @@ public T getCurrent() throws NoSuchElementException {
       return builder().setConsistencyLevel(consistencyLevel).build();
     }
 
+    /** Cassandra client socket option for connect timeout. */
+    public Write<T> withConnectTimeout(Integer timeout) {
+      return withConnectTimeout(ValueProvider.StaticValueProvider.of(timeout));
+    }
+
+    public Write<T> withConnectTimeout(ValueProvider<Integer> timeout) {
+      return builder().setConnectTimeout(timeout).build();

Review comment:
       add read timeout

##########
File path: sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
##########
@@ -1023,6 +1065,8 @@ private String getMutationTypeName() {
 
       abstract Builder<T> setMutationType(MutationType mutationType);
 
+      abstract Builder<T> setConnectTimeout(ValueProvider<Integer> timeout);

Review comment:
       add read timeout

##########
File path: sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
##########
@@ -948,6 +980,15 @@ public T getCurrent() throws NoSuchElementException {
       return builder().setConsistencyLevel(consistencyLevel).build();
     }
 
+    /** Cassandra client socket option for connect timeout. */
+    public Write<T> withConnectTimeout(Integer timeout) {
+      return withConnectTimeout(ValueProvider.StaticValueProvider.of(timeout));
+    }
+
+    public Write<T> withConnectTimeout(ValueProvider<Integer> timeout) {

Review comment:
       Add withReadTimeout cf global review comment

##########
File path: sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
##########
@@ -811,6 +840,9 @@ public T getCurrent() throws NoSuchElementException {
 
     abstract MutationType mutationType();
 
+    @Nullable

Review comment:
       Add readTimeout cf glocal review comment

##########
File path: sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
##########
@@ -1116,6 +1163,18 @@ private static Cluster getCluster(
           new QueryOptions().setConsistencyLevel(ConsistencyLevel.valueOf(consistencyLevel.get())));
     }
 
+    SocketOptions socketOptions = new SocketOptions();
+
+    builder.withSocketOptions(socketOptions);
+
+    if (connectTimeout != null) {

Review comment:
       no more null check needed if both timeouts are set as part of Read and Write and if you add the validation of inputs in the with* methods

##########
File path: sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
##########
@@ -1116,6 +1163,18 @@ private static Cluster getCluster(
           new QueryOptions().setConsistencyLevel(ConsistencyLevel.valueOf(consistencyLevel.get())));
     }
 
+    SocketOptions socketOptions = new SocketOptions();
+
+    builder.withSocketOptions(socketOptions);
+
+    if (connectTimeout != null) {
+      socketOptions.setConnectTimeoutMillis(connectTimeout.get());
+    }
+
+    if (readTimeout != null) {

Review comment:
       same as above

##########
File path: sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
##########
@@ -1142,7 +1201,9 @@ private static Cluster getCluster(
               spec.username(),
               spec.password(),
               spec.localDc(),
-              spec.consistencyLevel());
+              spec.consistencyLevel(),
+              spec.connectTimeout(),
+              null);

Review comment:
       pass the read timeout




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org