You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2019/09/03 19:48:33 UTC
[nifi] branch master updated: - Removed unused
AUTOCREATE_PARTITIONS from PutHive3Streaming - Renamed PARTITION_VALUES to
STATIC_PARTITION_VALUES for correctness and better understanding -
STATIC_PARTITION_VALUES descriptions clearly states that having that
property filler implies Hive Static Partitioning
This is an automated email from the ASF dual-hosted git repository.
mattyb149 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/master by this push:
new 9071e5b - Removed unused AUTOCREATE_PARTITIONS from PutHive3Streaming - Renamed PARTITION_VALUES to STATIC_PARTITION_VALUES for correctness and better understanding - STATIC_PARTITION_VALUES descriptions clearly states that having that property filler implies Hive Static Partitioning
9071e5b is described below
commit 9071e5baa7a8af03392fce703d539e9fa94980be
Author: Alessandro D'Armiento <al...@agilelab.it>
AuthorDate: Sat Aug 3 13:49:44 2019 +0200
- Removed unused AUTOCREATE_PARTITIONS from PutHive3Streaming
- Renamed PARTITION_VALUES to STATIC_PARTITION_VALUES for correctness and better understanding
- STATIC_PARTITION_VALUES descriptions clearly states that having that property filler implies Hive Static Partitioning
NIFI-6536: Additional documentation for Static Partition Values
Signed-off-by: Matthew Burgess <ma...@apache.org>
This closes #3631
---
.../nifi/processors/hive/PutHive3Streaming.java | 35 ++++++++--------------
.../org/apache/nifi/util/hive/HiveOptions.java | 6 ----
2 files changed, 12 insertions(+), 29 deletions(-)
diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/hive/PutHive3Streaming.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/hive/PutHive3Streaming.java
index 2224b06..5558c79 100644
--- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/hive/PutHive3Streaming.java
+++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/hive/PutHive3Streaming.java
@@ -84,9 +84,10 @@ import java.util.stream.Collectors;
import static org.apache.nifi.processors.hive.AbstractHive3QLProcessor.ATTR_OUTPUT_TABLES;
@Tags({"hive", "streaming", "put", "database", "store"})
-@CapabilityDescription("This processor uses Hive Streaming to send flow file records to an Apache Hive 3.0+ table. "
- + "The partition values are expected to be the 'last' fields of each record, so if the table is partitioned on column A for example, then the last field in "
- + "each record should be field A.")
+@CapabilityDescription("This processor uses Hive Streaming to send flow file records to an Apache Hive 3.0+ table. If 'Static Partition Values' is not set, then "
+ + "the partition values are expected to be the 'last' fields of each record, so if the table is partitioned on column A for example, then the last field in "
+ + "each record should be field A. If 'Static Partition Values' is set, those values will be used as the partition values, and any record fields corresponding to "
+ + "partition columns will be ignored.")
@WritesAttributes({
@WritesAttribute(attribute = "hivestreaming.record.count", description = "This attribute is written on the flow files routed to the 'success' "
+ "and 'failure' relationships, and contains the number of records from the incoming flow file. All records in a flow file are committed as a single transaction."),
@@ -151,28 +152,19 @@ public class PutHive3Streaming extends AbstractProcessor {
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
- static final PropertyDescriptor PARTITION_VALUES = new PropertyDescriptor.Builder()
+ static final PropertyDescriptor STATIC_PARTITION_VALUES = new PropertyDescriptor.Builder()
.name("hive3-stream-part-vals")
- .displayName("Partition Values")
+ .displayName("Static Partition Values")
.description("Specifies a comma-separated list of the values for the partition columns of the target table. If the incoming records all have the same values "
+ "for the partition columns, those values can be entered here, resulting in a performance gain. If specified, this property will often contain "
+ "Expression Language, for example if PartitionRecord is upstream and two partitions 'name' and 'age' are used, then this property can be set to "
- + "${name},${age}.")
+ + "${name},${age}. If this property is set, the values will be used as the partition values, and any record fields corresponding to "
+ + "partition columns will be ignored. If this property is not set, then the partition values are expected to be the last fields of each record.")
.required(false)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
- static final PropertyDescriptor AUTOCREATE_PARTITIONS = new PropertyDescriptor.Builder()
- .name("hive3-stream-autocreate-partition")
- .displayName("Auto-Create Partitions")
- .description("Flag indicating whether partitions should be automatically created")
- .required(true)
- .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
- .allowableValues("true", "false")
- .defaultValue("true")
- .build();
-
static final PropertyDescriptor CALL_TIMEOUT = new PropertyDescriptor.Builder()
.name("hive3-stream-call-timeout")
.displayName("Call Timeout")
@@ -250,8 +242,7 @@ public class PutHive3Streaming extends AbstractProcessor {
props.add(HIVE_CONFIGURATION_RESOURCES);
props.add(DB_NAME);
props.add(TABLE_NAME);
- props.add(PARTITION_VALUES);
- props.add(AUTOCREATE_PARTITIONS);
+ props.add(STATIC_PARTITION_VALUES);
props.add(CALL_TIMEOUT);
props.add(DISABLE_STREAMING_OPTIMIZATIONS);
props.add(ROLLBACK_ON_FAILURE);
@@ -362,8 +353,7 @@ public class PutHive3Streaming extends AbstractProcessor {
}
}
- final String partitionValuesString = context.getProperty(PARTITION_VALUES).evaluateAttributeExpressions(flowFile).getValue();
- final boolean autoCreatePartitions = context.getProperty(AUTOCREATE_PARTITIONS).asBoolean();
+ final String staticPartitionValuesString = context.getProperty(STATIC_PARTITION_VALUES).evaluateAttributeExpressions(flowFile).getValue();
final boolean disableStreamingOptimizations = context.getProperty(DISABLE_STREAMING_OPTIMIZATIONS).asBoolean();
// Override the Hive Metastore URIs in the config if set by the user
@@ -373,12 +363,11 @@ public class PutHive3Streaming extends AbstractProcessor {
HiveOptions o = new HiveOptions(metastoreURIs, dbName, tableName)
.withHiveConf(hiveConfig)
- .withAutoCreatePartitions(autoCreatePartitions)
.withCallTimeout(callTimeout)
.withStreamingOptimizations(!disableStreamingOptimizations);
- if (!StringUtils.isEmpty(partitionValuesString)) {
- List<String> staticPartitionValues = Arrays.stream(partitionValuesString.split(",")).filter(Objects::nonNull).map(String::trim).collect(Collectors.toList());
+ if (!StringUtils.isEmpty(staticPartitionValuesString)) {
+ List<String> staticPartitionValues = Arrays.stream(staticPartitionValuesString.split(",")).filter(Objects::nonNull).map(String::trim).collect(Collectors.toList());
o = o.withStaticPartitionValues(staticPartitionValues);
}
diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/util/hive/HiveOptions.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/util/hive/HiveOptions.java
index ca6e6eb..82f6856 100644
--- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/util/hive/HiveOptions.java
+++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/util/hive/HiveOptions.java
@@ -32,7 +32,6 @@ public class HiveOptions implements Serializable {
protected Integer idleTimeout = 60000;
protected Integer callTimeout = 0;
protected List<String> staticPartitionValues = null;
- protected Boolean autoCreatePartitions = true;
protected String kerberosPrincipal;
protected String kerberosKeytab;
protected HiveConf hiveConf;
@@ -54,11 +53,6 @@ public class HiveOptions implements Serializable {
return this;
}
- public HiveOptions withAutoCreatePartitions(Boolean autoCreatePartitions) {
- this.autoCreatePartitions = autoCreatePartitions;
- return this;
- }
-
public HiveOptions withKerberosKeytab(String kerberosKeytab) {
this.kerberosKeytab = kerberosKeytab;
return this;