You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2017/09/06 17:33:56 UTC
nifi git commit: NIFI-4342 - Add EL support to PutHiveStreaming
Repository: nifi
Updated Branches:
refs/heads/master 458c987fe -> 9ac88d210
NIFI-4342 - Add EL support to PutHiveStreaming
Signed-off-by: Matthew Burgess <ma...@apache.org>
This closes #2120
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/9ac88d21
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/9ac88d21
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/9ac88d21
Branch: refs/heads/master
Commit: 9ac88d210a181b36679db701766c35d662fa7ce2
Parents: 458c987
Author: Pierre Villard <pi...@gmail.com>
Authored: Fri Sep 1 11:33:23 2017 +0200
Committer: Matthew Burgess <ma...@apache.org>
Committed: Wed Sep 6 13:33:18 2017 -0400
----------------------------------------------------------------------
.../nifi/processors/hive/PutHiveStreaming.java | 12 ++++++++----
.../nifi/processors/hive/TestPutHiveStreaming.java | 16 ++++++++++++----
2 files changed, 20 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/9ac88d21/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/PutHiveStreaming.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/PutHiveStreaming.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/PutHiveStreaming.java
index fe677e5..afb99fd 100644
--- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/PutHiveStreaming.java
+++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/PutHiveStreaming.java
@@ -143,6 +143,7 @@ public class PutHiveStreaming extends AbstractSessionFactoryProcessor {
.description("The URI location for the Hive Metastore. Note that this is not the location of the Hive Server. The default port for the "
+ "Hive metastore is 9043.")
.required(true)
+ .expressionLanguageSupported(true)
.addValidator(StandardValidators.URI_VALIDATOR)
.addValidator(StandardValidators.createRegexMatchingValidator(Pattern.compile("(^[^/]+.*[^/]+$|^[^/]+$|^$)"))) // no start with / or end with /
.build();
@@ -162,6 +163,7 @@ public class PutHiveStreaming extends AbstractSessionFactoryProcessor {
.displayName("Database Name")
.description("The name of the database in which to put the data.")
.required(true)
+ .expressionLanguageSupported(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
@@ -170,6 +172,7 @@ public class PutHiveStreaming extends AbstractSessionFactoryProcessor {
.displayName("Table Name")
.description("The name of the database table in which to put the data.")
.required(true)
+ .expressionLanguageSupported(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
@@ -179,6 +182,7 @@ public class PutHiveStreaming extends AbstractSessionFactoryProcessor {
.description("A comma-delimited list of column names on which the table has been partitioned. The order of values in this list must "
+ "correspond exactly to the order of partition columns specified during the table creation.")
.required(false)
+ .expressionLanguageSupported(true)
.addValidator(StandardValidators.createRegexMatchingValidator(Pattern.compile("[^,]+(,[^,]+)*"))) // comma-separated list with non-empty entries
.build();
@@ -322,9 +326,9 @@ public class PutHiveStreaming extends AbstractSessionFactoryProcessor {
public void setup(final ProcessContext context) {
ComponentLog log = getLogger();
- final String metastoreUri = context.getProperty(METASTORE_URI).getValue();
- final String dbName = context.getProperty(DB_NAME).getValue();
- final String tableName = context.getProperty(TABLE_NAME).getValue();
+ final String metastoreUri = context.getProperty(METASTORE_URI).evaluateAttributeExpressions().getValue();
+ final String dbName = context.getProperty(DB_NAME).evaluateAttributeExpressions().getValue();
+ final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions().getValue();
final boolean autoCreatePartitions = context.getProperty(AUTOCREATE_PARTITIONS).asBoolean();
final Integer maxConnections = context.getProperty(MAX_OPEN_CONNECTIONS).asInteger();
final Integer heartbeatInterval = context.getProperty(HEARTBEAT_INTERVAL).asInteger();
@@ -565,7 +569,7 @@ public class PutHiveStreaming extends AbstractSessionFactoryProcessor {
Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
final List<String> partitionColumnList;
- final String partitionColumns = context.getProperty(PARTITION_COLUMNS).getValue();
+ final String partitionColumns = context.getProperty(PARTITION_COLUMNS).evaluateAttributeExpressions().getValue();
if (partitionColumns == null || partitionColumns.isEmpty()) {
partitionColumnList = Collections.emptyList();
} else {
http://git-wip-us.apache.org/repos/asf/nifi/blob/9ac88d21/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHiveStreaming.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHiveStreaming.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHiveStreaming.java
index 6198619..e57bb08 100644
--- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHiveStreaming.java
+++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHiveStreaming.java
@@ -462,12 +462,20 @@ public class TestPutHiveStreaming {
@Test
public void onTriggerWithPartitionColumns() throws Exception {
- runner.setProperty(PutHiveStreaming.METASTORE_URI, "thrift://localhost:9083");
- runner.setProperty(PutHiveStreaming.DB_NAME, "default");
- runner.setProperty(PutHiveStreaming.TABLE_NAME, "users");
+ runner.setVariable("metastore", "thrift://localhost:9083");
+ runner.setVariable("database", "default");
+ runner.setVariable("table", "users");
+ runner.setVariable("partitions", "favorite_number, favorite_color");
+
+ runner.setProperty(PutHiveStreaming.METASTORE_URI, "${metastore}");
+ runner.setProperty(PutHiveStreaming.DB_NAME, "${database}");
+ runner.setProperty(PutHiveStreaming.TABLE_NAME, "${table}");
runner.setProperty(PutHiveStreaming.TXNS_PER_BATCH, "100");
- runner.setProperty(PutHiveStreaming.PARTITION_COLUMNS, "favorite_number, favorite_color");
+ runner.setProperty(PutHiveStreaming.PARTITION_COLUMNS, "${partitions}");
runner.setProperty(PutHiveStreaming.AUTOCREATE_PARTITIONS, "true");
+
+ runner.assertValid();
+
Map<String, Object> user1 = new HashMap<String, Object>() {
{
put("name", "Joe");