You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by tu...@apache.org on 2020/06/30 13:25:47 UTC
[nifi] branch master updated: NIFI-7586 In CassandraSesionProvider
added properties to set socket-level read timeout and connect timeout.
This is an automated email from the ASF dual-hosted git repository.
turcsanyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/master by this push:
new c2f46c4 NIFI-7586 In CassandraSesionProvider added properties to set socket-level read timeout and connect timeout.
c2f46c4 is described below
commit c2f46c44ca29a07a0f418f6d46845f7ae7bccf91
Author: Tamas Palfy <ta...@gmail.com>
AuthorDate: Mon Jun 29 16:39:58 2020 +0200
NIFI-7586 In CassandraSesionProvider added properties to set socket-level read timeout and connect timeout.
In QueryCassandra when writing flowfile to the sesion it's done on the raw OutputStream.
Wrapped it in a BufferedOutputStream for better performance.
This closes #4368.
Signed-off-by: Peter Turcsanyi <tu...@apache.org>
---
.../nifi/processors/cassandra/QueryCassandra.java | 5 ++-
.../nifi/service/CassandraSessionProvider.java | 43 +++++++++++++++++++++-
.../nifi/service/TestCassandraSessionProvider.java | 2 +-
3 files changed, 45 insertions(+), 5 deletions(-)
diff --git a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/QueryCassandra.java b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/QueryCassandra.java
index 197e983..ebad736 100644
--- a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/QueryCassandra.java
+++ b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/QueryCassandra.java
@@ -56,6 +56,7 @@ import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.util.StopWatch;
+import java.io.BufferedOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.charset.Charset;
@@ -210,8 +211,8 @@ public class QueryCassandra extends AbstractCassandraProcessor {
fileToProcess = session.write(fileToProcess, new OutputStreamCallback() {
@Override
- public void process(final OutputStream out) throws IOException {
- try {
+ public void process(final OutputStream rawOut) throws IOException {
+ try (final OutputStream out = new BufferedOutputStream(rawOut)) {
logger.debug("Executing CQL query {}", new Object[]{selectQuery});
final ResultSet resultSet;
if (queryTimeout > 0) {
diff --git a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-services/src/main/java/org/apache/nifi/service/CassandraSessionProvider.java b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-services/src/main/java/org/apache/nifi/service/CassandraSessionProvider.java
index e645edc..a917ecb 100644
--- a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-services/src/main/java/org/apache/nifi/service/CassandraSessionProvider.java
+++ b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-services/src/main/java/org/apache/nifi/service/CassandraSessionProvider.java
@@ -25,7 +25,9 @@ import com.datastax.driver.core.Session;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
+import java.util.Optional;
import javax.net.ssl.SSLContext;
+import com.datastax.driver.core.SocketOptions;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
@@ -126,6 +128,24 @@ public class CassandraSessionProvider extends AbstractControllerService implemen
.defaultValue("NONE")
.build();
+ static final PropertyDescriptor READ_TIMEOUT_MS = new PropertyDescriptor.Builder()
+ .name("read-timeout-ms")
+ .displayName("Read Timout (ms)")
+ .description("Read timeout (in milliseconds). 0 means no timeout. If no value is set, the underlying default will be used.")
+ .required(false)
+ .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
+ .build();
+
+ static final PropertyDescriptor CONNECT_TIMEOUT_MS = new PropertyDescriptor.Builder()
+ .name("connect-timeout-ms")
+ .displayName("Connect Timout (ms)")
+ .description("Connection timeout (in milliseconds). 0 means no timeout. If no value is set, the underlying default will be used.")
+ .required(false)
+ .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
+ .build();
+
private List<PropertyDescriptor> properties;
private Cluster cluster;
private Session cassandraSession;
@@ -142,6 +162,8 @@ public class CassandraSessionProvider extends AbstractControllerService implemen
props.add(USERNAME);
props.add(PASSWORD);
props.add(PROP_SSL_CONTEXT_SERVICE);
+ props.add(READ_TIMEOUT_MS);
+ props.add(CONNECT_TIMEOUT_MS);
properties = props;
}
@@ -238,8 +260,18 @@ public class CassandraSessionProvider extends AbstractControllerService implemen
password = null;
}
+ PropertyValue readTimeoutMillisProperty = context.getProperty(READ_TIMEOUT_MS).evaluateAttributeExpressions();
+ Optional<Integer> readTimeoutMillisOptional = Optional.ofNullable(readTimeoutMillisProperty)
+ .filter(PropertyValue::isSet)
+ .map(PropertyValue::asInteger);
+
+ PropertyValue connectTimeoutMillisProperty = context.getProperty(CONNECT_TIMEOUT_MS).evaluateAttributeExpressions();
+ Optional<Integer> connectTimeoutMillisOptional = Optional.ofNullable(connectTimeoutMillisProperty)
+ .filter(PropertyValue::isSet)
+ .map(PropertyValue::asInteger);
+
// Create the cluster and connect to it
- Cluster newCluster = createCluster(contactPoints, sslContext, username, password, compressionType);
+ Cluster newCluster = createCluster(contactPoints, sslContext, username, password, compressionType, readTimeoutMillisOptional, connectTimeoutMillisOptional);
PropertyValue keyspaceProperty = context.getProperty(KEYSPACE).evaluateAttributeExpressions();
final Session newSession;
if (keyspaceProperty != null) {
@@ -277,7 +309,8 @@ public class CassandraSessionProvider extends AbstractControllerService implemen
}
private Cluster createCluster(List<InetSocketAddress> contactPoints, SSLContext sslContext,
- String username, String password, String compressionType) {
+ String username, String password, String compressionType,
+ Optional<Integer> readTimeoutMillisOptional, Optional<Integer> connectTimeoutMillisOptional) {
Cluster.Builder builder = Cluster.builder().addContactPointsWithPorts(contactPoints);
if (sslContext != null) {
@@ -297,6 +330,12 @@ public class CassandraSessionProvider extends AbstractControllerService implemen
builder = builder.withCompression(ProtocolOptions.Compression.LZ4);
}
+ SocketOptions socketOptions = new SocketOptions();
+ readTimeoutMillisOptional.ifPresent(socketOptions::setReadTimeoutMillis);
+ connectTimeoutMillisOptional.ifPresent(socketOptions::setConnectTimeoutMillis);
+
+ builder.withSocketOptions(socketOptions);
+
return builder.build();
}
}
diff --git a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-services/src/test/java/org/apache/nifi/service/TestCassandraSessionProvider.java b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-services/src/test/java/org/apache/nifi/service/TestCassandraSessionProvider.java
index 2f49d02..85d2d9a 100644
--- a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-services/src/test/java/org/apache/nifi/service/TestCassandraSessionProvider.java
+++ b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-services/src/test/java/org/apache/nifi/service/TestCassandraSessionProvider.java
@@ -46,7 +46,7 @@ public class TestCassandraSessionProvider {
public void testGetPropertyDescriptors() {
List<PropertyDescriptor> properties = sessionProvider.getPropertyDescriptors();
- assertEquals(8, properties.size());
+ assertEquals(10, properties.size());
assertTrue(properties.contains(CassandraSessionProvider.CLIENT_AUTH));
assertTrue(properties.contains(CassandraSessionProvider.CONSISTENCY_LEVEL));
assertTrue(properties.contains(CassandraSessionProvider.CONTACT_POINTS));