You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/05/16 18:08:38 UTC

[GitHub] [beam] nfisher opened a new pull request #11732: [BEAM-10017] Expose Cassandra SocketOptions

nfisher opened a new pull request #11732:
URL: https://github.com/apache/beam/pull/11732


   This PR is intended to expose all of the available Cassandra client socket options so the client can be tuned for various environments.
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`).
    - [x] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
    - [ ] Update `CHANGES.md` with noteworthy changes.
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   Post-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark
   --- | --- | --- | --- | --- | --- | --- | ---
   Go | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/)
   Java | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/)
   Python | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/) | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/)
   XLang | --- | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/)
   
   Pre-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   --- |Java | Python | Go | Website
   --- | --- | --- | --- | ---
   Non-portable | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/) 
   Portable | --- | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/) | --- | ---
   
   See [.test-infra/jenkins/README](https://github.com/apache/beam/blob/master/.test-infra/jenkins/README.md) for trigger phrase, status and link of all Jenkins jobs.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] nfisher commented on a change in pull request #11732: [BEAM-10017] Expose Cassandra Connect and Read timeouts

Posted by GitBox <gi...@apache.org>.
nfisher commented on a change in pull request #11732:
URL: https://github.com/apache/beam/pull/11732#discussion_r431519019



##########
File path: sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
##########
@@ -1142,7 +1201,9 @@ private static Cluster getCluster(
               spec.username(),
               spec.password(),
               spec.localDc(),
-              spec.consistencyLevel());
+              spec.consistencyLevel(),
+              spec.connectTimeout(),
+              null);

Review comment:
       done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] echauchot commented on pull request #11732: [BEAM-10017] Expose Cassandra Connect and Read timeouts

Posted by GitBox <gi...@apache.org>.
echauchot commented on pull request #11732:
URL: https://github.com/apache/beam/pull/11732#issuecomment-635960916


   @nfisher reviewed your last changes, only missing last small changes: please rely on un-resolved conversations


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] echauchot commented on a change in pull request #11732: [BEAM-10017] Expose Cassandra Connect and Read timeouts

Posted by GitBox <gi...@apache.org>.
echauchot commented on a change in pull request #11732:
URL: https://github.com/apache/beam/pull/11732#discussion_r432455805



##########
File path: sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
##########
@@ -330,6 +331,24 @@ private CassandraIO() {}
       return builder().setMinNumberOfSplits(minNumberOfSplits).build();
     }
 
+    /** Cassandra client socket option to set the connect timeout. */
+    public Read<T> withConnectTimeout(Integer timeout) {
+      return withConnectTimeout(ValueProvider.StaticValueProvider.of(timeout));

Review comment:
       Add two checkArgument as for the other methods one for null and one for >0. Also I wrote a error above: valueprovider version is the low level method, the other relies on it. So it is in the valueprovider version you need to put the checks.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] nfisher commented on a change in pull request #11732: [BEAM-10017] Expose Cassandra Connect and Read timeouts

Posted by GitBox <gi...@apache.org>.
nfisher commented on a change in pull request #11732:
URL: https://github.com/apache/beam/pull/11732#discussion_r428008097



##########
File path: sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
##########
@@ -330,6 +331,24 @@ private CassandraIO() {}
       return builder().setMinNumberOfSplits(minNumberOfSplits).build();
     }
 
+    /** Cassandra client socket option to set the connect timeout. */
+    public Read<T> withConnectTimeout(Integer timeout) {
+      return withConnectTimeout(ValueProvider.StaticValueProvider.of(timeout));
+    }
+
+    public Read<T> withConnectTimeout(ValueProvider<Integer> timeout) {

Review comment:
       Done first 2. Will do 3rd after work.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] nfisher commented on pull request #11732: [BEAM-10017] Expose Cassandra SocketOptions

Posted by GitBox <gi...@apache.org>.
nfisher commented on pull request #11732:
URL: https://github.com/apache/beam/pull/11732#issuecomment-629687704


   R: @echauchot 
   
   Not sure if exposing all of the configuration parameters is desirable. Let me know what you think.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] nfisher commented on pull request #11732: [BEAM-10017] Expose Cassandra Connect and Read timeouts

Posted by GitBox <gi...@apache.org>.
nfisher commented on pull request #11732:
URL: https://github.com/apache/beam/pull/11732#issuecomment-638299088


   @echauchot sorry little crazy with work at the moment. Was hoping to get to the remainder of it this weekend.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] echauchot commented on pull request #11732: [BEAM-10017] Expose Cassandra Connect and Read timeouts

Posted by GitBox <gi...@apache.org>.
echauchot commented on pull request #11732:
URL: https://github.com/apache/beam/pull/11732#issuecomment-638214566


   @nfisher you seem to lack time, don't worry I'll do the final touches. I will manually merge your commit to master alongside with one of my own and close the PR


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] echauchot commented on pull request #11732: [BEAM-10017] Expose Cassandra Connect and Read timeouts

Posted by GitBox <gi...@apache.org>.
echauchot commented on pull request #11732:
URL: https://github.com/apache/beam/pull/11732#issuecomment-638758007


   Did the final minor touches myself and merged the PR. Thanks Nathan for your work!


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] nfisher commented on a change in pull request #11732: [BEAM-10017] Expose Cassandra Connect and Read timeouts

Posted by GitBox <gi...@apache.org>.
nfisher commented on a change in pull request #11732:
URL: https://github.com/apache/beam/pull/11732#discussion_r431519624



##########
File path: sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
##########
@@ -1023,6 +1065,8 @@ private String getMutationTypeName() {
 
       abstract Builder<T> setMutationType(MutationType mutationType);
 
+      abstract Builder<T> setConnectTimeout(ValueProvider<Integer> timeout);

Review comment:
       done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] nfisher edited a comment on pull request #11732: [BEAM-10017] Expose Cassandra SocketOptions

Posted by GitBox <gi...@apache.org>.
nfisher edited a comment on pull request #11732:
URL: https://github.com/apache/beam/pull/11732#issuecomment-629687704


   Not sure if exposing all of the configuration parameters is desirable. Let me know what you think.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] echauchot commented on a change in pull request #11732: [BEAM-10017] Expose Cassandra Connect and Read timeouts

Posted by GitBox <gi...@apache.org>.
echauchot commented on a change in pull request #11732:
URL: https://github.com/apache/beam/pull/11732#discussion_r432461930



##########
File path: sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
##########
@@ -1116,6 +1163,18 @@ private static Cluster getCluster(
           new QueryOptions().setConsistencyLevel(ConsistencyLevel.valueOf(consistencyLevel.get())));
     }
 
+    SocketOptions socketOptions = new SocketOptions();
+
+    builder.withSocketOptions(socketOptions);
+
+    if (connectTimeout != null) {

Review comment:
       I was wrong: null tests are needed because there is no default value. If the user to not specify timeouts, then the valueProvider will be null.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] nfisher commented on pull request #11732: [BEAM-10017] Expose Cassandra SocketOptions

Posted by GitBox <gi...@apache.org>.
nfisher commented on pull request #11732:
URL: https://github.com/apache/beam/pull/11732#issuecomment-629688041


   R: @echauchot 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] echauchot commented on a change in pull request #11732: [BEAM-10017] Expose Cassandra Connect and Read timeouts

Posted by GitBox <gi...@apache.org>.
echauchot commented on a change in pull request #11732:
URL: https://github.com/apache/beam/pull/11732#discussion_r427861203



##########
File path: sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
##########
@@ -106,19 +107,13 @@
  *
  * <h3>Cassandra Socket Options</h3>
  *
- * <p>The following example illustrates various options for tuning client socket:
+ * <p>The following example illustrates setting timeouts for the Cassandra client:

Review comment:
       Can you also change a leftover in the javadoc: An IO to read **and write** **from/to** Apache Cassandra ?

##########
File path: sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
##########
@@ -330,6 +331,24 @@ private CassandraIO() {}
       return builder().setMinNumberOfSplits(minNumberOfSplits).build();
     }
 
+    /** Cassandra client socket option to set the connect timeout. */
+    public Read<T> withConnectTimeout(Integer timeout) {
+      return withConnectTimeout(ValueProvider.StaticValueProvider.of(timeout));
+    }
+
+    public Read<T> withConnectTimeout(ValueProvider<Integer> timeout) {

Review comment:
       - you need a javadoc for both methods as they are both public (2 versions for backward compatibility as valueprovider was introduced lately)
   
   - specify that they are millis
   
   - add links to cassandra socketoptions setConnectTimeOut

##########
File path: sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
##########
@@ -330,6 +331,24 @@ private CassandraIO() {}
       return builder().setMinNumberOfSplits(minNumberOfSplits).build();
     }
 
+    /** Cassandra client socket option to set the connect timeout. */
+    public Read<T> withConnectTimeout(Integer timeout) {
+      return withConnectTimeout(ValueProvider.StaticValueProvider.of(timeout));

Review comment:
       Add input values check (!= null && > 0) as in the other methods with the checkArgument call. As the value provider version method relies on this version, put the checkArgument here. Do not forget to put the validation in the other parameters methods

##########
File path: sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
##########
@@ -330,6 +331,24 @@ private CassandraIO() {}
       return builder().setMinNumberOfSplits(minNumberOfSplits).build();
     }
 
+    /** Cassandra client socket option to set the connect timeout. */
+    public Read<T> withConnectTimeout(Integer timeout) {
+      return withConnectTimeout(ValueProvider.StaticValueProvider.of(timeout));
+    }
+
+    public Read<T> withConnectTimeout(ValueProvider<Integer> timeout) {
+      return builder().setConnectTimeout(timeout).build();
+    }
+
+    /** Cassandra client socket option to set the read timeout. */
+    public Read<T> withReadTimeout(Integer timeout) {
+      return withReadTimeout(ValueProvider.StaticValueProvider.of(timeout));
+    }
+
+    public Read<T> withReadTimeout(ValueProvider<Integer> timeout) {

Review comment:
       same as above

##########
File path: sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
##########
@@ -948,6 +980,15 @@ public T getCurrent() throws NoSuchElementException {
       return builder().setConsistencyLevel(consistencyLevel).build();
     }
 
+    /** Cassandra client socket option for connect timeout. */
+    public Write<T> withConnectTimeout(Integer timeout) {
+      return withConnectTimeout(ValueProvider.StaticValueProvider.of(timeout));
+    }
+
+    public Write<T> withConnectTimeout(ValueProvider<Integer> timeout) {
+      return builder().setConnectTimeout(timeout).build();

Review comment:
       add read timeout

##########
File path: sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
##########
@@ -1023,6 +1065,8 @@ private String getMutationTypeName() {
 
       abstract Builder<T> setMutationType(MutationType mutationType);
 
+      abstract Builder<T> setConnectTimeout(ValueProvider<Integer> timeout);

Review comment:
       add read timeout

##########
File path: sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
##########
@@ -948,6 +980,15 @@ public T getCurrent() throws NoSuchElementException {
       return builder().setConsistencyLevel(consistencyLevel).build();
     }
 
+    /** Cassandra client socket option for connect timeout. */
+    public Write<T> withConnectTimeout(Integer timeout) {
+      return withConnectTimeout(ValueProvider.StaticValueProvider.of(timeout));
+    }
+
+    public Write<T> withConnectTimeout(ValueProvider<Integer> timeout) {

Review comment:
       Add withReadTimeout cf global review comment

##########
File path: sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
##########
@@ -811,6 +840,9 @@ public T getCurrent() throws NoSuchElementException {
 
     abstract MutationType mutationType();
 
+    @Nullable

Review comment:
       Add readTimeout cf glocal review comment

##########
File path: sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
##########
@@ -1116,6 +1163,18 @@ private static Cluster getCluster(
           new QueryOptions().setConsistencyLevel(ConsistencyLevel.valueOf(consistencyLevel.get())));
     }
 
+    SocketOptions socketOptions = new SocketOptions();
+
+    builder.withSocketOptions(socketOptions);
+
+    if (connectTimeout != null) {

Review comment:
       no more null check needed if both timeouts are set as part of Read and Write and if you add the validation of inputs in the with* methods

##########
File path: sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
##########
@@ -1116,6 +1163,18 @@ private static Cluster getCluster(
           new QueryOptions().setConsistencyLevel(ConsistencyLevel.valueOf(consistencyLevel.get())));
     }
 
+    SocketOptions socketOptions = new SocketOptions();
+
+    builder.withSocketOptions(socketOptions);
+
+    if (connectTimeout != null) {
+      socketOptions.setConnectTimeoutMillis(connectTimeout.get());
+    }
+
+    if (readTimeout != null) {

Review comment:
       same as above

##########
File path: sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
##########
@@ -1142,7 +1201,9 @@ private static Cluster getCluster(
               spec.username(),
               spec.password(),
               spec.localDc(),
-              spec.consistencyLevel());
+              spec.consistencyLevel(),
+              spec.connectTimeout(),
+              null);

Review comment:
       pass the read timeout




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] nfisher commented on a change in pull request #11732: [BEAM-10017] Expose Cassandra Connect and Read timeouts

Posted by GitBox <gi...@apache.org>.
nfisher commented on a change in pull request #11732:
URL: https://github.com/apache/beam/pull/11732#discussion_r431519562



##########
File path: sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
##########
@@ -948,6 +980,15 @@ public T getCurrent() throws NoSuchElementException {
       return builder().setConsistencyLevel(consistencyLevel).build();
     }
 
+    /** Cassandra client socket option for connect timeout. */
+    public Write<T> withConnectTimeout(Integer timeout) {
+      return withConnectTimeout(ValueProvider.StaticValueProvider.of(timeout));
+    }
+
+    public Write<T> withConnectTimeout(ValueProvider<Integer> timeout) {

Review comment:
       Done line 1010




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] echauchot commented on pull request #11732: [BEAM-10017] Expose Cassandra Connect and Read timeouts

Posted by GitBox <gi...@apache.org>.
echauchot commented on pull request #11732:
URL: https://github.com/apache/beam/pull/11732#issuecomment-635975058


   @nfisher also run spotless task to fix format violations


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] echauchot commented on a change in pull request #11732: [BEAM-10017] Expose Cassandra Connect and Read timeouts

Posted by GitBox <gi...@apache.org>.
echauchot commented on a change in pull request #11732:
URL: https://github.com/apache/beam/pull/11732#discussion_r432453437



##########
File path: sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
##########
@@ -330,6 +331,24 @@ private CassandraIO() {}
       return builder().setMinNumberOfSplits(minNumberOfSplits).build();
     }
 
+    /** Cassandra client socket option to set the connect timeout. */
+    public Read<T> withConnectTimeout(Integer timeout) {
+      return withConnectTimeout(ValueProvider.StaticValueProvider.of(timeout));
+    }
+
+    public Read<T> withConnectTimeout(ValueProvider<Integer> timeout) {

Review comment:
       missing link. Also write the javadoc as in the other with methods with "Specify ..."




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] echauchot commented on a change in pull request #11732: [BEAM-10017] Expose Cassandra Connect and Read timeouts

Posted by GitBox <gi...@apache.org>.
echauchot commented on a change in pull request #11732:
URL: https://github.com/apache/beam/pull/11732#discussion_r432453785



##########
File path: sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
##########
@@ -330,6 +331,24 @@ private CassandraIO() {}
       return builder().setMinNumberOfSplits(minNumberOfSplits).build();
     }
 
+    /** Cassandra client socket option to set the connect timeout. */
+    public Read<T> withConnectTimeout(Integer timeout) {
+      return withConnectTimeout(ValueProvider.StaticValueProvider.of(timeout));
+    }
+
+    public Read<T> withConnectTimeout(ValueProvider<Integer> timeout) {
+      return builder().setConnectTimeout(timeout).build();
+    }
+
+    /** Cassandra client socket option to set the read timeout. */
+    public Read<T> withReadTimeout(Integer timeout) {
+      return withReadTimeout(ValueProvider.StaticValueProvider.of(timeout));
+    }
+
+    public Read<T> withReadTimeout(ValueProvider<Integer> timeout) {

Review comment:
       missing link




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] echauchot commented on a change in pull request #11732: [BEAM-10017] Expose Cassandra Connect and Read timeouts

Posted by GitBox <gi...@apache.org>.
echauchot commented on a change in pull request #11732:
URL: https://github.com/apache/beam/pull/11732#discussion_r432453437



##########
File path: sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
##########
@@ -330,6 +331,24 @@ private CassandraIO() {}
       return builder().setMinNumberOfSplits(minNumberOfSplits).build();
     }
 
+    /** Cassandra client socket option to set the connect timeout. */
+    public Read<T> withConnectTimeout(Integer timeout) {
+      return withConnectTimeout(ValueProvider.StaticValueProvider.of(timeout));
+    }
+
+    public Read<T> withConnectTimeout(ValueProvider<Integer> timeout) {

Review comment:
       missing link




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] nfisher commented on pull request #11732: [BEAM-10017] Expose Cassandra SocketOptions

Posted by GitBox <gi...@apache.org>.
nfisher commented on pull request #11732:
URL: https://github.com/apache/beam/pull/11732#issuecomment-629688169


   R: @jbonofre


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] echauchot merged pull request #11732: [BEAM-10017] Expose Cassandra Connect and Read timeouts

Posted by GitBox <gi...@apache.org>.
echauchot merged pull request #11732:
URL: https://github.com/apache/beam/pull/11732


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] echauchot commented on a change in pull request #11732: [BEAM-10017] Expose Cassandra Connect and Read timeouts

Posted by GitBox <gi...@apache.org>.
echauchot commented on a change in pull request #11732:
URL: https://github.com/apache/beam/pull/11732#discussion_r432455805



##########
File path: sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
##########
@@ -330,6 +331,24 @@ private CassandraIO() {}
       return builder().setMinNumberOfSplits(minNumberOfSplits).build();
     }
 
+    /** Cassandra client socket option to set the connect timeout. */
+    public Read<T> withConnectTimeout(Integer timeout) {
+      return withConnectTimeout(ValueProvider.StaticValueProvider.of(timeout));

Review comment:
       Add two checkArgument as in the other with* methods one for null and one for >0. Also I wrote a error above: valueprovider version is the low level method, the other relies on it. So it is in the valueprovider version you need to put the checks.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] nfisher commented on a change in pull request #11732: [BEAM-10017] Expose Cassandra Connect and Read timeouts

Posted by GitBox <gi...@apache.org>.
nfisher commented on a change in pull request #11732:
URL: https://github.com/apache/beam/pull/11732#discussion_r428007520



##########
File path: sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
##########
@@ -106,19 +107,13 @@
  *
  * <h3>Cassandra Socket Options</h3>
  *
- * <p>The following example illustrates various options for tuning client socket:
+ * <p>The following example illustrates setting timeouts for the Cassandra client:

Review comment:
       Done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org