You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by tu...@apache.org on 2021/08/09 13:16:09 UTC
[nifi] branch main updated: NIFI-8955 Add Max Connection Lifetime
property to Hive(_1_1)ConnectionPool CS
This is an automated email from the ASF dual-hosted git repository.
turcsanyi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new f57685e NIFI-8955 Add Max Connection Lifetime property to Hive(_1_1)ConnectionPool CS
f57685e is described below
commit f57685e971c44c62bcebf8b964cf427bbc30b7c8
Author: Denes Arvay <de...@apache.org>
AuthorDate: Thu Jul 22 16:36:29 2021 +0200
NIFI-8955 Add Max Connection Lifetime property to Hive(_1_1)ConnectionPool CS
This closes #5259.
Signed-off-by: Peter Turcsanyi <tu...@apache.org>
---
.../nifi-hive-bundle/nifi-hive-processors/pom.xml | 5 ++++
.../apache/nifi/dbcp/hive/HiveConnectionPool.java | 34 ++++++++++++++++++++--
.../nifi/dbcp/hive/HiveConnectionPoolTest.java | 10 +++++--
.../nifi-hive_1_1-processors/pom.xml | 5 ++++
.../nifi/dbcp/hive/Hive_1_1ConnectionPool.java | 33 +++++++++++++++++++--
.../nifi/dbcp/hive/Hive_1_1ConnectionPoolTest.java | 10 +++++--
6 files changed, 85 insertions(+), 12 deletions(-)
diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/pom.xml b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/pom.xml
index a762844..777a5ab 100644
--- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/pom.xml
@@ -128,6 +128,11 @@
<version>1.8</version>
</dependency>
<dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-dbcp2</artifactId>
+ <version>2.7.0</version>
+ </dependency>
+ <dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-mock</artifactId>
<version>1.15.0-SNAPSHOT</version>
diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/dbcp/hive/HiveConnectionPool.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/dbcp/hive/HiveConnectionPool.java
index 4454386..959c31b 100644
--- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/dbcp/hive/HiveConnectionPool.java
+++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/dbcp/hive/HiveConnectionPool.java
@@ -16,7 +16,7 @@
*/
package org.apache.nifi.dbcp.hive;
-import org.apache.commons.dbcp.BasicDataSource;
+import org.apache.commons.dbcp2.BasicDataSource;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hive.jdbc.HiveDriver;
@@ -26,6 +26,7 @@ import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnDisabled;
import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.resource.ResourceCardinality;
@@ -33,6 +34,7 @@ import org.apache.nifi.components.resource.ResourceType;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.controller.ControllerServiceInitializationContext;
+import org.apache.nifi.dbcp.DBCPValidator;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.hadoop.KerberosProperties;
import org.apache.nifi.hadoop.SecurityUtil;
@@ -72,6 +74,8 @@ import java.util.concurrent.atomic.AtomicReference;
public class HiveConnectionPool extends AbstractControllerService implements HiveDBCPService {
private static final String ALLOW_EXPLICIT_KEYTAB = "NIFI_ALLOW_EXPLICIT_KEYTAB";
+ private static final String DEFAULT_MAX_CONN_LIFETIME = "-1";
+
public static final PropertyDescriptor DATABASE_URL = new PropertyDescriptor.Builder()
.name("hive-db-connect-url")
.displayName("Database Connection URL")
@@ -137,6 +141,18 @@ public class HiveConnectionPool extends AbstractControllerService implements Hiv
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
+ public static final PropertyDescriptor MAX_CONN_LIFETIME = new PropertyDescriptor.Builder()
+ .displayName("Max Connection Lifetime")
+ .name("hive-max-conn-lifetime")
+ .description("The maximum lifetime in milliseconds of a connection. After this time is exceeded the " +
+ "connection pool will invalidate the connection. A value of zero or -1 " +
+ "means the connection has an infinite lifetime.")
+ .defaultValue(DEFAULT_MAX_CONN_LIFETIME)
+ .required(true)
+ .addValidator(DBCPValidator.CUSTOM_TIME_PERIOD_VALIDATOR)
+ .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .build();
+
public static final PropertyDescriptor VALIDATION_QUERY = new PropertyDescriptor.Builder()
.name("Validation-query")
.displayName("Validation query")
@@ -181,6 +197,7 @@ public class HiveConnectionPool extends AbstractControllerService implements Hiv
props.add(DB_PASSWORD);
props.add(MAX_WAIT_TIME);
props.add(MAX_TOTAL_CONNECTIONS);
+ props.add(MAX_CONN_LIFETIME);
props.add(VALIDATION_QUERY);
props.add(KERBEROS_CREDENTIALS_SERVICE);
@@ -335,14 +352,16 @@ public class HiveConnectionPool extends AbstractControllerService implements Hiv
final String passw = context.getProperty(DB_PASSWORD).evaluateAttributeExpressions().getValue();
final Long maxWaitMillis = context.getProperty(MAX_WAIT_TIME).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS);
final Integer maxTotal = context.getProperty(MAX_TOTAL_CONNECTIONS).evaluateAttributeExpressions().asInteger();
+ final long maxConnectionLifetimeMillis = extractMillisWithInfinite(context.getProperty(MAX_CONN_LIFETIME).evaluateAttributeExpressions());
dataSource = new BasicDataSource();
dataSource.setDriverClassName(drv);
connectionUrl = context.getProperty(DATABASE_URL).evaluateAttributeExpressions().getValue();
- dataSource.setMaxWait(maxWaitMillis);
- dataSource.setMaxActive(maxTotal);
+ dataSource.setMaxWaitMillis(maxWaitMillis);
+ dataSource.setMaxTotal(maxTotal);
+ dataSource.setMaxConnLifetimeMillis(maxConnectionLifetimeMillis);
if (validationQuery != null && !validationQuery.isEmpty()) {
dataSource.setValidationQuery(validationQuery);
@@ -426,4 +445,13 @@ public class HiveConnectionPool extends AbstractControllerService implements Hiv
boolean isAllowExplicitKeytab() {
return Boolean.parseBoolean(System.getenv(ALLOW_EXPLICIT_KEYTAB));
}
+
+ private long extractMillisWithInfinite(PropertyValue prop) {
+ if (prop.getValue() == null || DEFAULT_MAX_CONN_LIFETIME.equals(prop.getValue())) {
+ return -1;
+ } else {
+ return prop.asTimePeriod(TimeUnit.MILLISECONDS);
+ }
+ }
+
}
diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/dbcp/hive/HiveConnectionPoolTest.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/dbcp/hive/HiveConnectionPoolTest.java
index b109740..be8c2c5 100644
--- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/dbcp/hive/HiveConnectionPoolTest.java
+++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/dbcp/hive/HiveConnectionPoolTest.java
@@ -31,7 +31,7 @@ import java.sql.SQLException;
import java.util.HashMap;
import java.util.Map;
-import org.apache.commons.dbcp.BasicDataSource;
+import org.apache.commons.dbcp2.BasicDataSource;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.AbstractControllerService;
@@ -131,6 +131,7 @@ public class HiveConnectionPoolTest {
final String USER = "user";
final String PASS = "pass";
final int MAX_CONN = 7;
+ final String MAX_CONN_LIFETIME = "1 sec";
final String MAX_WAIT = "10 sec"; // 10000 milliseconds
final String CONF = "/path/to/hive-site.xml";
hiveConnectionPool = new HiveConnectionPool();
@@ -140,6 +141,7 @@ public class HiveConnectionPoolTest {
put(HiveConnectionPool.DB_USER, "${username}");
put(HiveConnectionPool.DB_PASSWORD, "${password}");
put(HiveConnectionPool.MAX_TOTAL_CONNECTIONS, "${maxconn}");
+ put(HiveConnectionPool.MAX_CONN_LIFETIME, "${maxconnlifetime}");
put(HiveConnectionPool.MAX_WAIT_TIME, "${maxwait}");
put(HiveConnectionPool.HIVE_CONFIGURATION_RESOURCES, "${hiveconf}");
}};
@@ -149,6 +151,7 @@ public class HiveConnectionPoolTest {
registry.setVariable(new VariableDescriptor("username"), USER);
registry.setVariable(new VariableDescriptor("password"), PASS);
registry.setVariable(new VariableDescriptor("maxconn"), Integer.toString(MAX_CONN));
+ registry.setVariable(new VariableDescriptor("maxconnlifetime"), MAX_CONN_LIFETIME);
registry.setVariable(new VariableDescriptor("maxwait"), MAX_WAIT);
registry.setVariable(new VariableDescriptor("hiveconf"), CONF);
@@ -162,8 +165,9 @@ public class HiveConnectionPoolTest {
assertEquals(URL, basicDataSource.getUrl());
assertEquals(USER, basicDataSource.getUsername());
assertEquals(PASS, basicDataSource.getPassword());
- assertEquals(MAX_CONN, basicDataSource.getMaxActive());
- assertEquals(10000L, basicDataSource.getMaxWait());
+ assertEquals(MAX_CONN, basicDataSource.getMaxTotal());
+ assertEquals(1000L, basicDataSource.getMaxConnLifetimeMillis());
+ assertEquals(10000L, basicDataSource.getMaxWaitMillis());
assertEquals(URL, hiveConnectionPool.getConnectionURL());
}
diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive_1_1-processors/pom.xml b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive_1_1-processors/pom.xml
index 3fdaed2..d00eb17 100644
--- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive_1_1-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive_1_1-processors/pom.xml
@@ -132,6 +132,11 @@
<version>1.8</version>
</dependency>
<dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-dbcp2</artifactId>
+ <version>2.7.0</version>
+ </dependency>
+ <dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-mock</artifactId>
<version>1.15.0-SNAPSHOT</version>
diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive_1_1-processors/src/main/java/org/apache/nifi/dbcp/hive/Hive_1_1ConnectionPool.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive_1_1-processors/src/main/java/org/apache/nifi/dbcp/hive/Hive_1_1ConnectionPool.java
index 2b47335..ebd0942 100644
--- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive_1_1-processors/src/main/java/org/apache/nifi/dbcp/hive/Hive_1_1ConnectionPool.java
+++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive_1_1-processors/src/main/java/org/apache/nifi/dbcp/hive/Hive_1_1ConnectionPool.java
@@ -16,7 +16,7 @@
*/
package org.apache.nifi.dbcp.hive;
-import org.apache.commons.dbcp.BasicDataSource;
+import org.apache.commons.dbcp2.BasicDataSource;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hive.jdbc.HiveDriver;
@@ -26,6 +26,7 @@ import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnDisabled;
import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.resource.ResourceCardinality;
@@ -33,6 +34,7 @@ import org.apache.nifi.components.resource.ResourceType;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.controller.ControllerServiceInitializationContext;
+import org.apache.nifi.dbcp.DBCPValidator;
import org.apache.nifi.expression.AttributeExpression;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.hadoop.SecurityUtil;
@@ -69,6 +71,9 @@ import java.util.concurrent.atomic.AtomicReference;
@Tags({"hive", "dbcp", "jdbc", "database", "connection", "pooling", "store"})
@CapabilityDescription("Provides Database Connection Pooling Service for Apache Hive 1.1.x. Connections can be asked from pool and returned after usage.")
public class Hive_1_1ConnectionPool extends AbstractControllerService implements Hive_1_1DBCPService {
+
+ private static final String DEFAULT_MAX_CONN_LIFETIME = "-1";
+
public static final PropertyDescriptor DATABASE_URL = new PropertyDescriptor.Builder()
.name("hive-db-connect-url")
.displayName("Database Connection URL")
@@ -134,6 +139,18 @@ public class Hive_1_1ConnectionPool extends AbstractControllerService implements
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
+ public static final PropertyDescriptor MAX_CONN_LIFETIME = new PropertyDescriptor.Builder()
+ .displayName("Max Connection Lifetime")
+ .name("hive-max-conn-lifetime")
+ .description("The maximum lifetime in milliseconds of a connection. After this time is exceeded the " +
+ "connection pool will invalidate the connection. A value of zero or -1 " +
+ "means the connection has an infinite lifetime.")
+ .defaultValue(DEFAULT_MAX_CONN_LIFETIME)
+ .required(true)
+ .addValidator(DBCPValidator.CUSTOM_TIME_PERIOD_VALIDATOR)
+ .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .build();
+
public static final PropertyDescriptor VALIDATION_QUERY = new PropertyDescriptor.Builder()
.name("Validation-query")
.displayName("Validation query")
@@ -195,6 +212,7 @@ public class Hive_1_1ConnectionPool extends AbstractControllerService implements
props.add(DB_PASSWORD);
props.add(MAX_WAIT_TIME);
props.add(MAX_TOTAL_CONNECTIONS);
+ props.add(MAX_CONN_LIFETIME);
props.add(VALIDATION_QUERY);
props.add(KERBEROS_CREDENTIALS_SERVICE);
props.add(KERBEROS_PRINCIPAL);
@@ -334,14 +352,16 @@ public class Hive_1_1ConnectionPool extends AbstractControllerService implements
final String passw = context.getProperty(DB_PASSWORD).evaluateAttributeExpressions().getValue();
final Long maxWaitMillis = context.getProperty(MAX_WAIT_TIME).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS);
final Integer maxTotal = context.getProperty(MAX_TOTAL_CONNECTIONS).evaluateAttributeExpressions().asInteger();
+ final long maxConnectionLifetimeMillis = extractMillisWithInfinite(context.getProperty(MAX_CONN_LIFETIME).evaluateAttributeExpressions());
dataSource = new BasicDataSource();
dataSource.setDriverClassName(drv);
connectionUrl = context.getProperty(DATABASE_URL).evaluateAttributeExpressions().getValue();
- dataSource.setMaxWait(maxWaitMillis);
- dataSource.setMaxActive(maxTotal);
+ dataSource.setMaxWaitMillis(maxWaitMillis);
+ dataSource.setMaxTotal(maxTotal);
+ dataSource.setMaxConnLifetimeMillis(maxConnectionLifetimeMillis);
if (validationQuery != null && !validationQuery.isEmpty()) {
dataSource.setValidationQuery(validationQuery);
@@ -421,4 +441,11 @@ public class Hive_1_1ConnectionPool extends AbstractControllerService implements
return connectionUrl;
}
+ private long extractMillisWithInfinite(PropertyValue prop) {
+ if (prop.getValue() == null || DEFAULT_MAX_CONN_LIFETIME.equals(prop.getValue())) {
+ return -1;
+ } else {
+ return prop.asTimePeriod(TimeUnit.MILLISECONDS);
+ }
+ }
}
diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive_1_1-processors/src/test/java/org/apache/nifi/dbcp/hive/Hive_1_1ConnectionPoolTest.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive_1_1-processors/src/test/java/org/apache/nifi/dbcp/hive/Hive_1_1ConnectionPoolTest.java
index d129084..5881dba 100644
--- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive_1_1-processors/src/test/java/org/apache/nifi/dbcp/hive/Hive_1_1ConnectionPoolTest.java
+++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive_1_1-processors/src/test/java/org/apache/nifi/dbcp/hive/Hive_1_1ConnectionPoolTest.java
@@ -32,7 +32,7 @@ import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
-import org.apache.commons.dbcp.BasicDataSource;
+import org.apache.commons.dbcp2.BasicDataSource;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.AbstractControllerService;
@@ -113,6 +113,7 @@ public class Hive_1_1ConnectionPoolTest {
final String USER = "user";
final String PASS = "pass";
final int MAX_CONN = 7;
+ final String MAX_CONN_LIFETIME = "1 sec";
final String MAX_WAIT = "10 sec"; // 10000 milliseconds
final String CONF = "/path/to/hive-site.xml";
hiveConnectionPool = new Hive_1_1ConnectionPool();
@@ -122,6 +123,7 @@ public class Hive_1_1ConnectionPoolTest {
put(Hive_1_1ConnectionPool.DB_USER, "${username}");
put(Hive_1_1ConnectionPool.DB_PASSWORD, "${password}");
put(Hive_1_1ConnectionPool.MAX_TOTAL_CONNECTIONS, "${maxconn}");
+ put(Hive_1_1ConnectionPool.MAX_CONN_LIFETIME, "${maxconnlifetime}");
put(Hive_1_1ConnectionPool.MAX_WAIT_TIME, "${maxwait}");
put(Hive_1_1ConnectionPool.HIVE_CONFIGURATION_RESOURCES, "${hiveconf}");
}};
@@ -131,6 +133,7 @@ public class Hive_1_1ConnectionPoolTest {
registry.setVariable(new VariableDescriptor("username"), USER);
registry.setVariable(new VariableDescriptor("password"), PASS);
registry.setVariable(new VariableDescriptor("maxconn"), Integer.toString(MAX_CONN));
+ registry.setVariable(new VariableDescriptor("maxconnlifetime"), MAX_CONN_LIFETIME);
registry.setVariable(new VariableDescriptor("maxwait"), MAX_WAIT);
registry.setVariable(new VariableDescriptor("hiveconf"), CONF);
@@ -144,8 +147,9 @@ public class Hive_1_1ConnectionPoolTest {
assertEquals(URL, basicDataSource.getUrl());
assertEquals(USER, basicDataSource.getUsername());
assertEquals(PASS, basicDataSource.getPassword());
- assertEquals(MAX_CONN, basicDataSource.getMaxActive());
- assertEquals(10000L, basicDataSource.getMaxWait());
+ assertEquals(MAX_CONN, basicDataSource.getMaxTotal());
+ assertEquals(1000L, basicDataSource.getMaxConnLifetimeMillis());
+ assertEquals(10000L, basicDataSource.getMaxWaitMillis());
assertEquals(URL, hiveConnectionPool.getConnectionURL());
}