You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2017/09/06 17:33:56 UTC

nifi git commit: NIFI-4342 - Add EL support to PutHiveStreaming

Repository: nifi
Updated Branches:
  refs/heads/master 458c987fe -> 9ac88d210


NIFI-4342 - Add EL support to PutHiveStreaming

Signed-off-by: Matthew Burgess <ma...@apache.org>

This closes #2120


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/9ac88d21
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/9ac88d21
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/9ac88d21

Branch: refs/heads/master
Commit: 9ac88d210a181b36679db701766c35d662fa7ce2
Parents: 458c987
Author: Pierre Villard <pi...@gmail.com>
Authored: Fri Sep 1 11:33:23 2017 +0200
Committer: Matthew Burgess <ma...@apache.org>
Committed: Wed Sep 6 13:33:18 2017 -0400

----------------------------------------------------------------------
 .../nifi/processors/hive/PutHiveStreaming.java      | 12 ++++++++----
 .../nifi/processors/hive/TestPutHiveStreaming.java  | 16 ++++++++++++----
 2 files changed, 20 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/9ac88d21/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/PutHiveStreaming.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/PutHiveStreaming.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/PutHiveStreaming.java
index fe677e5..afb99fd 100644
--- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/PutHiveStreaming.java
+++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/PutHiveStreaming.java
@@ -143,6 +143,7 @@ public class PutHiveStreaming extends AbstractSessionFactoryProcessor {
             .description("The URI location for the Hive Metastore. Note that this is not the location of the Hive Server. The default port for the "
                     + "Hive metastore is 9043.")
             .required(true)
+            .expressionLanguageSupported(true)
             .addValidator(StandardValidators.URI_VALIDATOR)
             .addValidator(StandardValidators.createRegexMatchingValidator(Pattern.compile("(^[^/]+.*[^/]+$|^[^/]+$|^$)"))) // no start with / or end with /
             .build();
@@ -162,6 +163,7 @@ public class PutHiveStreaming extends AbstractSessionFactoryProcessor {
             .displayName("Database Name")
             .description("The name of the database in which to put the data.")
             .required(true)
+            .expressionLanguageSupported(true)
             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
             .build();
 
@@ -170,6 +172,7 @@ public class PutHiveStreaming extends AbstractSessionFactoryProcessor {
             .displayName("Table Name")
             .description("The name of the database table in which to put the data.")
             .required(true)
+            .expressionLanguageSupported(true)
             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
             .build();
 
@@ -179,6 +182,7 @@ public class PutHiveStreaming extends AbstractSessionFactoryProcessor {
             .description("A comma-delimited list of column names on which the table has been partitioned. The order of values in this list must "
                     + "correspond exactly to the order of partition columns specified during the table creation.")
             .required(false)
+            .expressionLanguageSupported(true)
             .addValidator(StandardValidators.createRegexMatchingValidator(Pattern.compile("[^,]+(,[^,]+)*"))) // comma-separated list with non-empty entries
             .build();
 
@@ -322,9 +326,9 @@ public class PutHiveStreaming extends AbstractSessionFactoryProcessor {
     public void setup(final ProcessContext context) {
         ComponentLog log = getLogger();
 
-        final String metastoreUri = context.getProperty(METASTORE_URI).getValue();
-        final String dbName = context.getProperty(DB_NAME).getValue();
-        final String tableName = context.getProperty(TABLE_NAME).getValue();
+        final String metastoreUri = context.getProperty(METASTORE_URI).evaluateAttributeExpressions().getValue();
+        final String dbName = context.getProperty(DB_NAME).evaluateAttributeExpressions().getValue();
+        final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions().getValue();
         final boolean autoCreatePartitions = context.getProperty(AUTOCREATE_PARTITIONS).asBoolean();
         final Integer maxConnections = context.getProperty(MAX_OPEN_CONNECTIONS).asInteger();
         final Integer heartbeatInterval = context.getProperty(HEARTBEAT_INTERVAL).asInteger();
@@ -565,7 +569,7 @@ public class PutHiveStreaming extends AbstractSessionFactoryProcessor {
         Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
 
         final List<String> partitionColumnList;
-        final String partitionColumns = context.getProperty(PARTITION_COLUMNS).getValue();
+        final String partitionColumns = context.getProperty(PARTITION_COLUMNS).evaluateAttributeExpressions().getValue();
         if (partitionColumns == null || partitionColumns.isEmpty()) {
             partitionColumnList = Collections.emptyList();
         } else {

http://git-wip-us.apache.org/repos/asf/nifi/blob/9ac88d21/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHiveStreaming.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHiveStreaming.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHiveStreaming.java
index 6198619..e57bb08 100644
--- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHiveStreaming.java
+++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHiveStreaming.java
@@ -462,12 +462,20 @@ public class TestPutHiveStreaming {
 
     @Test
     public void onTriggerWithPartitionColumns() throws Exception {
-        runner.setProperty(PutHiveStreaming.METASTORE_URI, "thrift://localhost:9083");
-        runner.setProperty(PutHiveStreaming.DB_NAME, "default");
-        runner.setProperty(PutHiveStreaming.TABLE_NAME, "users");
+        runner.setVariable("metastore", "thrift://localhost:9083");
+        runner.setVariable("database", "default");
+        runner.setVariable("table", "users");
+        runner.setVariable("partitions", "favorite_number, favorite_color");
+
+        runner.setProperty(PutHiveStreaming.METASTORE_URI, "${metastore}");
+        runner.setProperty(PutHiveStreaming.DB_NAME, "${database}");
+        runner.setProperty(PutHiveStreaming.TABLE_NAME, "${table}");
         runner.setProperty(PutHiveStreaming.TXNS_PER_BATCH, "100");
-        runner.setProperty(PutHiveStreaming.PARTITION_COLUMNS, "favorite_number, favorite_color");
+        runner.setProperty(PutHiveStreaming.PARTITION_COLUMNS, "${partitions}");
         runner.setProperty(PutHiveStreaming.AUTOCREATE_PARTITIONS, "true");
+
+        runner.assertValid();
+
         Map<String, Object> user1 = new HashMap<String, Object>() {
             {
                 put("name", "Joe");