You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2019/09/03 19:48:33 UTC

[nifi] branch master updated: - Removed unused AUTOCREATE_PARTITIONS from PutHive3Streaming - Renamed PARTITION_VALUES to STATIC_PARTITION_VALUES for correctness and better understanding - STATIC_PARTITION_VALUES descriptions clearly states that having that property filler implies Hive Static Partitioning

This is an automated email from the ASF dual-hosted git repository.

mattyb149 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/master by this push:
     new 9071e5b  - Removed unused AUTOCREATE_PARTITIONS from PutHive3Streaming - Renamed PARTITION_VALUES to STATIC_PARTITION_VALUES for correctness and better understanding - STATIC_PARTITION_VALUES descriptions clearly states that having that property filler implies Hive Static Partitioning
9071e5b is described below

commit 9071e5baa7a8af03392fce703d539e9fa94980be
Author: Alessandro D'Armiento <al...@agilelab.it>
AuthorDate: Sat Aug 3 13:49:44 2019 +0200

    - Removed unused AUTOCREATE_PARTITIONS from PutHive3Streaming
    - Renamed PARTITION_VALUES to STATIC_PARTITION_VALUES for correctness and better understanding
    - STATIC_PARTITION_VALUES descriptions clearly states that having that property filler implies Hive Static Partitioning
    
    NIFI-6536: Additional documentation for Static Partition Values
    
    Signed-off-by: Matthew Burgess <ma...@apache.org>
    
    This closes #3631
---
 .../nifi/processors/hive/PutHive3Streaming.java    | 35 ++++++++--------------
 .../org/apache/nifi/util/hive/HiveOptions.java     |  6 ----
 2 files changed, 12 insertions(+), 29 deletions(-)

diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/hive/PutHive3Streaming.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/hive/PutHive3Streaming.java
index 2224b06..5558c79 100644
--- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/hive/PutHive3Streaming.java
+++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/hive/PutHive3Streaming.java
@@ -84,9 +84,10 @@ import java.util.stream.Collectors;
 import static org.apache.nifi.processors.hive.AbstractHive3QLProcessor.ATTR_OUTPUT_TABLES;
 
 @Tags({"hive", "streaming", "put", "database", "store"})
-@CapabilityDescription("This processor uses Hive Streaming to send flow file records to an Apache Hive 3.0+ table. "
-        + "The partition values are expected to be the 'last' fields of each record, so if the table is partitioned on column A for example, then the last field in "
-        + "each record should be field A.")
+@CapabilityDescription("This processor uses Hive Streaming to send flow file records to an Apache Hive 3.0+ table. If 'Static Partition Values' is not set, then "
+        + "the partition values are expected to be the 'last' fields of each record, so if the table is partitioned on column A for example, then the last field in "
+        + "each record should be field A. If 'Static Partition Values' is set, those values will be used as the partition values, and any record fields corresponding to "
+        + "partition columns will be ignored.")
 @WritesAttributes({
         @WritesAttribute(attribute = "hivestreaming.record.count", description = "This attribute is written on the flow files routed to the 'success' "
                 + "and 'failure' relationships, and contains the number of records from the incoming flow file. All records in a flow file are committed as a single transaction."),
@@ -151,28 +152,19 @@ public class PutHive3Streaming extends AbstractProcessor {
             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
             .build();
 
-    static final PropertyDescriptor PARTITION_VALUES = new PropertyDescriptor.Builder()
+    static final PropertyDescriptor STATIC_PARTITION_VALUES = new PropertyDescriptor.Builder()
             .name("hive3-stream-part-vals")
-            .displayName("Partition Values")
+            .displayName("Static Partition Values")
             .description("Specifies a comma-separated list of the values for the partition columns of the target table. If the incoming records all have the same values "
                     + "for the partition columns, those values can be entered here, resulting in a performance gain. If specified, this property will often contain "
                     + "Expression Language, for example if PartitionRecord is upstream and two partitions 'name' and 'age' are used, then this property can be set to "
-                    + "${name},${age}.")
+                    + "${name},${age}. If this property is set, the values will be used as the partition values, and any record fields corresponding to "
+                    + "partition columns will be ignored. If this property is not set, then the partition values are expected to be the last fields of each record.")
             .required(false)
             .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
             .build();
 
-    static final PropertyDescriptor AUTOCREATE_PARTITIONS = new PropertyDescriptor.Builder()
-            .name("hive3-stream-autocreate-partition")
-            .displayName("Auto-Create Partitions")
-            .description("Flag indicating whether partitions should be automatically created")
-            .required(true)
-            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
-            .allowableValues("true", "false")
-            .defaultValue("true")
-            .build();
-
     static final PropertyDescriptor CALL_TIMEOUT = new PropertyDescriptor.Builder()
             .name("hive3-stream-call-timeout")
             .displayName("Call Timeout")
@@ -250,8 +242,7 @@ public class PutHive3Streaming extends AbstractProcessor {
         props.add(HIVE_CONFIGURATION_RESOURCES);
         props.add(DB_NAME);
         props.add(TABLE_NAME);
-        props.add(PARTITION_VALUES);
-        props.add(AUTOCREATE_PARTITIONS);
+        props.add(STATIC_PARTITION_VALUES);
         props.add(CALL_TIMEOUT);
         props.add(DISABLE_STREAMING_OPTIMIZATIONS);
         props.add(ROLLBACK_ON_FAILURE);
@@ -362,8 +353,7 @@ public class PutHive3Streaming extends AbstractProcessor {
             }
         }
 
-        final String partitionValuesString = context.getProperty(PARTITION_VALUES).evaluateAttributeExpressions(flowFile).getValue();
-        final boolean autoCreatePartitions = context.getProperty(AUTOCREATE_PARTITIONS).asBoolean();
+        final String staticPartitionValuesString = context.getProperty(STATIC_PARTITION_VALUES).evaluateAttributeExpressions(flowFile).getValue();
         final boolean disableStreamingOptimizations = context.getProperty(DISABLE_STREAMING_OPTIMIZATIONS).asBoolean();
 
         // Override the Hive Metastore URIs in the config if set by the user
@@ -373,12 +363,11 @@ public class PutHive3Streaming extends AbstractProcessor {
 
         HiveOptions o = new HiveOptions(metastoreURIs, dbName, tableName)
                 .withHiveConf(hiveConfig)
-                .withAutoCreatePartitions(autoCreatePartitions)
                 .withCallTimeout(callTimeout)
                 .withStreamingOptimizations(!disableStreamingOptimizations);
 
-        if (!StringUtils.isEmpty(partitionValuesString)) {
-            List<String> staticPartitionValues = Arrays.stream(partitionValuesString.split(",")).filter(Objects::nonNull).map(String::trim).collect(Collectors.toList());
+        if (!StringUtils.isEmpty(staticPartitionValuesString)) {
+            List<String> staticPartitionValues = Arrays.stream(staticPartitionValuesString.split(",")).filter(Objects::nonNull).map(String::trim).collect(Collectors.toList());
             o = o.withStaticPartitionValues(staticPartitionValues);
         }
 
diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/util/hive/HiveOptions.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/util/hive/HiveOptions.java
index ca6e6eb..82f6856 100644
--- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/util/hive/HiveOptions.java
+++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/util/hive/HiveOptions.java
@@ -32,7 +32,6 @@ public class HiveOptions implements Serializable {
     protected Integer idleTimeout = 60000;
     protected Integer callTimeout = 0;
     protected List<String> staticPartitionValues = null;
-    protected Boolean autoCreatePartitions = true;
     protected String kerberosPrincipal;
     protected String kerberosKeytab;
     protected HiveConf hiveConf;
@@ -54,11 +53,6 @@ public class HiveOptions implements Serializable {
         return this;
     }
 
-    public HiveOptions withAutoCreatePartitions(Boolean autoCreatePartitions) {
-        this.autoCreatePartitions = autoCreatePartitions;
-        return this;
-    }
-
     public HiveOptions withKerberosKeytab(String kerberosKeytab) {
         this.kerberosKeytab = kerberosKeytab;
         return this;