You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2017/01/30 16:48:53 UTC

nifi git commit: NIFI-3418 Add records-per-transaction property to putHiveStreaming processor

Repository: nifi
Updated Branches:
  refs/heads/master 78a0e1e18 -> d5b139ffd


NIFI-3418 Add records-per-transaction property to putHiveStreaming processor

Signed-off-by: Matt Burgess <ma...@apache.org>

This closes #1455

Minor whitespace Checkstyle issue fixed


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/d5b139ff
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/d5b139ff
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/d5b139ff

Branch: refs/heads/master
Commit: d5b139ffd4fea53bc004e86087beba4c204eed70
Parents: 78a0e1e
Author: Ben Schofield <bs...@hortonworks.com>
Authored: Mon Jan 30 05:44:19 2017 -0500
Committer: Matt Burgess <ma...@apache.org>
Committed: Mon Jan 30 11:46:42 2017 -0500

----------------------------------------------------------------------
 .../nifi/processors/hive/PutHiveStreaming.java     | 17 ++++++++++++++---
 1 file changed, 14 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/d5b139ff/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/PutHiveStreaming.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/PutHiveStreaming.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/PutHiveStreaming.java
index edb33dc..794b268 100644
--- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/PutHiveStreaming.java
+++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/PutHiveStreaming.java
@@ -219,6 +219,16 @@ public class PutHiveStreaming extends AbstractProcessor {
             .defaultValue("100")
             .build();
 
+    public static final PropertyDescriptor RECORDS_PER_TXN = new PropertyDescriptor.Builder()
+            .name("hive-stream-records-per-transaction")
+            .displayName("Records per Transaction")
+            .description("Number of records to process before committing the transaction. This value must be greater than 1.")
+            .required(true)
+            .expressionLanguageSupported(true)
+            .addValidator(GREATER_THAN_ONE_VALIDATOR)
+            .defaultValue("10000")
+            .build();
+
     // Relationships
     public static final Relationship REL_SUCCESS = new Relationship.Builder()
             .name("success")
@@ -270,6 +280,7 @@ public class PutHiveStreaming extends AbstractProcessor {
         props.add(MAX_OPEN_CONNECTIONS);
         props.add(HEARTBEAT_INTERVAL);
         props.add(TXNS_PER_BATCH);
+        props.add(RECORDS_PER_TXN);
 
         kerberosConfigFile = context.getKerberosConfigurationFile();
         kerberosProperties = new KerberosProperties(kerberosConfigFile);
@@ -357,7 +368,7 @@ public class PutHiveStreaming extends AbstractProcessor {
         }
 
         final ComponentLog log = getLogger();
-        final Integer txnsPerBatch = context.getProperty(TXNS_PER_BATCH).asInteger();
+        final Integer recordsPerTxn = context.getProperty(RECORDS_PER_TXN).asInteger();
 
         // Store the original class loader, then explicitly set it to this class's classloader (for use by the Hive Metastore)
         ClassLoader originalClassloader = Thread.currentThread().getContextClassLoader();
@@ -482,8 +493,8 @@ public class PutHiveStreaming extends AbstractProcessor {
                                     appendRecordsToFlowFile(session, Collections.singletonList(record), failureFlowFile, failureAvroWriter, reader);
                                 }
 
-                                // If we've reached the transactions-per-batch limit, flush the Hive Writer and update the Avro Writer for successful records
-                                if (hiveWriter.getTotalRecords() >= txnsPerBatch) {
+                                // If we've reached the records-per-transaction limit, flush the Hive Writer and update the Avro Writer for successful records
+                                if (hiveWriter.getTotalRecords() >= recordsPerTxn) {
                                     hiveWriter.flush(true);
                                     // Now send the records to the success relationship and update the success count
                                     try {