You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2017/01/30 16:48:53 UTC
nifi git commit: NIFI-3418 Add records-per-transaction property to
putHiveStreaming processor
Repository: nifi
Updated Branches:
refs/heads/master 78a0e1e18 -> d5b139ffd
NIFI-3418 Add records-per-transaction property to putHiveStreaming processor
Signed-off-by: Matt Burgess <ma...@apache.org>
This closes #1455
Minor whitespace Checkstyle issue fixed
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/d5b139ff
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/d5b139ff
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/d5b139ff
Branch: refs/heads/master
Commit: d5b139ffd4fea53bc004e86087beba4c204eed70
Parents: 78a0e1e
Author: Ben Schofield <bs...@hortonworks.com>
Authored: Mon Jan 30 05:44:19 2017 -0500
Committer: Matt Burgess <ma...@apache.org>
Committed: Mon Jan 30 11:46:42 2017 -0500
----------------------------------------------------------------------
.../nifi/processors/hive/PutHiveStreaming.java | 17 ++++++++++++++---
1 file changed, 14 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/d5b139ff/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/PutHiveStreaming.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/PutHiveStreaming.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/PutHiveStreaming.java
index edb33dc..794b268 100644
--- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/PutHiveStreaming.java
+++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/PutHiveStreaming.java
@@ -219,6 +219,16 @@ public class PutHiveStreaming extends AbstractProcessor {
.defaultValue("100")
.build();
+ public static final PropertyDescriptor RECORDS_PER_TXN = new PropertyDescriptor.Builder()
+ .name("hive-stream-records-per-transaction")
+ .displayName("Records per Transaction")
+ .description("Number of records to process before committing the transaction. This value must be greater than 1.")
+ .required(true)
+ .expressionLanguageSupported(true)
+ .addValidator(GREATER_THAN_ONE_VALIDATOR)
+ .defaultValue("10000")
+ .build();
+
// Relationships
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
@@ -270,6 +280,7 @@ public class PutHiveStreaming extends AbstractProcessor {
props.add(MAX_OPEN_CONNECTIONS);
props.add(HEARTBEAT_INTERVAL);
props.add(TXNS_PER_BATCH);
+ props.add(RECORDS_PER_TXN);
kerberosConfigFile = context.getKerberosConfigurationFile();
kerberosProperties = new KerberosProperties(kerberosConfigFile);
@@ -357,7 +368,7 @@ public class PutHiveStreaming extends AbstractProcessor {
}
final ComponentLog log = getLogger();
- final Integer txnsPerBatch = context.getProperty(TXNS_PER_BATCH).asInteger();
+ final Integer recordsPerTxn = context.getProperty(RECORDS_PER_TXN).asInteger();
// Store the original class loader, then explicitly set it to this class's classloader (for use by the Hive Metastore)
ClassLoader originalClassloader = Thread.currentThread().getContextClassLoader();
@@ -482,8 +493,8 @@ public class PutHiveStreaming extends AbstractProcessor {
appendRecordsToFlowFile(session, Collections.singletonList(record), failureFlowFile, failureAvroWriter, reader);
}
- // If we've reached the transactions-per-batch limit, flush the Hive Writer and update the Avro Writer for successful records
- if (hiveWriter.getTotalRecords() >= txnsPerBatch) {
+ // If we've reached the records-per-transaction limit, flush the Hive Writer and update the Avro Writer for successful records
+ if (hiveWriter.getTotalRecords() >= recordsPerTxn) {
hiveWriter.flush(true);
// Now send the records to the success relationship and update the success count
try {