You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "ASF GitHub Bot (Jira)" <ji...@apache.org> on 2020/05/20 10:19:00 UTC

[jira] [Work logged] (BEAM-10017) Expose SocketOptions timeouts in CassandraIO

     [ https://issues.apache.org/jira/browse/BEAM-10017?focusedWorklogId=435417&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-435417 ]

ASF GitHub Bot logged work on BEAM-10017:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 20/May/20 10:18
            Start Date: 20/May/20 10:18
    Worklog Time Spent: 10m 
      Work Description: echauchot commented on a change in pull request #11732:
URL: https://github.com/apache/beam/pull/11732#discussion_r427861203



##########
File path: sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
##########
@@ -106,19 +107,13 @@
  *
  * <h3>Cassandra Socket Options</h3>
  *
- * <p>The following example illustrates various options for tuning client socket:
+ * <p>The following example illustrates setting timeouts for the Cassandra client:

Review comment:
       Can you also change a leftover in the javadoc: An IO to read **and write** **from/to** Apache Cassandra ?

##########
File path: sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
##########
@@ -330,6 +331,24 @@ private CassandraIO() {}
       return builder().setMinNumberOfSplits(minNumberOfSplits).build();
     }
 
+    /** Cassandra client socket option to set the connect timeout. */
+    public Read<T> withConnectTimeout(Integer timeout) {
+      return withConnectTimeout(ValueProvider.StaticValueProvider.of(timeout));
+    }
+
+    public Read<T> withConnectTimeout(ValueProvider<Integer> timeout) {

Review comment:
       - you need a javadoc for both methods as they are both public (2 versions for backward compatibility as valueprovider was introduced lately)
   
   - specify that they are millis
   
   - add links to cassandra socketoptions setConnectTimeOut

##########
File path: sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
##########
@@ -330,6 +331,24 @@ private CassandraIO() {}
       return builder().setMinNumberOfSplits(minNumberOfSplits).build();
     }
 
+    /** Cassandra client socket option to set the connect timeout. */
+    public Read<T> withConnectTimeout(Integer timeout) {
+      return withConnectTimeout(ValueProvider.StaticValueProvider.of(timeout));

Review comment:
       Add input values check (!= null && > 0) as in the other methods with the checkArgument call. As the value provider version method relies on this version, put the checkArgument here. Do not forget to put the validation in the other parameters methods

##########
File path: sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
##########
@@ -330,6 +331,24 @@ private CassandraIO() {}
       return builder().setMinNumberOfSplits(minNumberOfSplits).build();
     }
 
+    /** Cassandra client socket option to set the connect timeout. */
+    public Read<T> withConnectTimeout(Integer timeout) {
+      return withConnectTimeout(ValueProvider.StaticValueProvider.of(timeout));
+    }
+
+    public Read<T> withConnectTimeout(ValueProvider<Integer> timeout) {
+      return builder().setConnectTimeout(timeout).build();
+    }
+
+    /** Cassandra client socket option to set the read timeout. */
+    public Read<T> withReadTimeout(Integer timeout) {
+      return withReadTimeout(ValueProvider.StaticValueProvider.of(timeout));
+    }
+
+    public Read<T> withReadTimeout(ValueProvider<Integer> timeout) {

Review comment:
       same as above

##########
File path: sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
##########
@@ -948,6 +980,15 @@ public T getCurrent() throws NoSuchElementException {
       return builder().setConsistencyLevel(consistencyLevel).build();
     }
 
+    /** Cassandra client socket option for connect timeout. */
+    public Write<T> withConnectTimeout(Integer timeout) {
+      return withConnectTimeout(ValueProvider.StaticValueProvider.of(timeout));
+    }
+
+    public Write<T> withConnectTimeout(ValueProvider<Integer> timeout) {
+      return builder().setConnectTimeout(timeout).build();

Review comment:
       add read timeout

##########
File path: sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
##########
@@ -1023,6 +1065,8 @@ private String getMutationTypeName() {
 
       abstract Builder<T> setMutationType(MutationType mutationType);
 
+      abstract Builder<T> setConnectTimeout(ValueProvider<Integer> timeout);

Review comment:
       add read timeout

##########
File path: sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
##########
@@ -948,6 +980,15 @@ public T getCurrent() throws NoSuchElementException {
       return builder().setConsistencyLevel(consistencyLevel).build();
     }
 
+    /** Cassandra client socket option for connect timeout. */
+    public Write<T> withConnectTimeout(Integer timeout) {
+      return withConnectTimeout(ValueProvider.StaticValueProvider.of(timeout));
+    }
+
+    public Write<T> withConnectTimeout(ValueProvider<Integer> timeout) {

Review comment:
       Add withReadTimeout cf global review comment

##########
File path: sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
##########
@@ -811,6 +840,9 @@ public T getCurrent() throws NoSuchElementException {
 
     abstract MutationType mutationType();
 
+    @Nullable

Review comment:
       Add readTimeout cf glocal review comment

##########
File path: sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
##########
@@ -1116,6 +1163,18 @@ private static Cluster getCluster(
           new QueryOptions().setConsistencyLevel(ConsistencyLevel.valueOf(consistencyLevel.get())));
     }
 
+    SocketOptions socketOptions = new SocketOptions();
+
+    builder.withSocketOptions(socketOptions);
+
+    if (connectTimeout != null) {

Review comment:
       no more null check needed if both timeouts are set as part of Read and Write and if you add the validation of inputs in the with* methods

##########
File path: sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
##########
@@ -1116,6 +1163,18 @@ private static Cluster getCluster(
           new QueryOptions().setConsistencyLevel(ConsistencyLevel.valueOf(consistencyLevel.get())));
     }
 
+    SocketOptions socketOptions = new SocketOptions();
+
+    builder.withSocketOptions(socketOptions);
+
+    if (connectTimeout != null) {
+      socketOptions.setConnectTimeoutMillis(connectTimeout.get());
+    }
+
+    if (readTimeout != null) {

Review comment:
       same as above

##########
File path: sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
##########
@@ -1142,7 +1201,9 @@ private static Cluster getCluster(
               spec.username(),
               spec.password(),
               spec.localDc(),
-              spec.consistencyLevel());
+              spec.consistencyLevel(),
+              spec.connectTimeout(),
+              null);

Review comment:
       pass the read timeout




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 435417)
    Time Spent: 1h  (was: 50m)

> Expose SocketOptions timeouts in CassandraIO
> --------------------------------------------
>
>                 Key: BEAM-10017
>                 URL: https://issues.apache.org/jira/browse/BEAM-10017
>             Project: Beam
>          Issue Type: Improvement
>          Components: io-java-cassandra
>            Reporter: Nathan Fisher
>            Priority: P3
>          Time Spent: 1h
>  Remaining Estimate: 0h
>
> Currently there are no options to tune the configuration of the CassandraIO reader/writer. This can be useful for either slow clusters, large queries, or high latency links.
> The intent would be to expose the following configuration elements as setters on the CassandraIO builder similar to withKeyspace and other methods.
>  
> |{{[SocketOptions|https://docs.datastax.com/en/drivers/java/3.8/com/datastax/driver/core/SocketOptions.html]}}|{{[setConnectTimeoutMillis|https://docs.datastax.com/en/drivers/java/3.8/com/datastax/driver/core/SocketOptions.html#setConnectTimeoutMillis-int-](int connectTimeoutMillis)}}
> Sets the connection timeout in milliseconds.|
> |{{[SocketOptions|https://docs.datastax.com/en/drivers/java/3.8/com/datastax/driver/core/SocketOptions.html]}}|{{[setKeepAlive|https://docs.datastax.com/en/drivers/java/3.8/com/datastax/driver/core/SocketOptions.html#setKeepAlive-boolean-](boolean keepAlive)}}
> Sets whether to enable TCP keepalive.|
> |{{[SocketOptions|https://docs.datastax.com/en/drivers/java/3.8/com/datastax/driver/core/SocketOptions.html]}}|{{[setReadTimeoutMillis|https://docs.datastax.com/en/drivers/java/3.8/com/datastax/driver/core/SocketOptions.html#setReadTimeoutMillis-int-](int readTimeoutMillis)}}
> Sets the per-host read timeout in milliseconds.|
> |{{[SocketOptions|https://docs.datastax.com/en/drivers/java/3.8/com/datastax/driver/core/SocketOptions.html]}}|{{[setReceiveBufferSize|https://docs.datastax.com/en/drivers/java/3.8/com/datastax/driver/core/SocketOptions.html#setReceiveBufferSize-int-](int receiveBufferSize)}}
> Sets a hint to the size of the underlying buffers for incoming network I/O.|
> |{{[SocketOptions|https://docs.datastax.com/en/drivers/java/3.8/com/datastax/driver/core/SocketOptions.html]}}|{{[setReuseAddress|https://docs.datastax.com/en/drivers/java/3.8/com/datastax/driver/core/SocketOptions.html#setReuseAddress-boolean-](boolean reuseAddress)}}
> Sets whether to enable reuse-address.|
> |{{[SocketOptions|https://docs.datastax.com/en/drivers/java/3.8/com/datastax/driver/core/SocketOptions.html]}}|{{[setSendBufferSize|https://docs.datastax.com/en/drivers/java/3.8/com/datastax/driver/core/SocketOptions.html#setSendBufferSize-int-](int sendBufferSize)}}
> Sets a hint to the size of the underlying buffers for outgoing network I/O.|
> |{{[SocketOptions|https://docs.datastax.com/en/drivers/java/3.8/com/datastax/driver/core/SocketOptions.html]}}|{{[setSoLinger|https://docs.datastax.com/en/drivers/java/3.8/com/datastax/driver/core/SocketOptions.html#setSoLinger-int-](int soLinger)}}
> Sets the linger-on-close timeout.|
> |{{[SocketOptions|https://docs.datastax.com/en/drivers/java/3.8/com/datastax/driver/core/SocketOptions.html]}}|{{[setTcpNoDelay|https://docs.datastax.com/en/drivers/java/3.8/com/datastax/driver/core/SocketOptions.html#setTcpNoDelay-boolean-](boolean tcpNoDelay)}}
> Sets whether to disable Nagle's algorithm.|



--
This message was sent by Atlassian Jira
(v8.3.4#803005)