You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2019/02/08 20:58:52 UTC

[nifi] 01/02: NIFI-5989 - PutKudu: Additional FF Queue length setting

This is an automated email from the ASF dual-hosted git repository.

markap14 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git

commit 29d54126e4afac1e984912b1ccb0a5fb222c7928
Author: Alex Goos <ag...@mac.com>
AuthorDate: Wed Jan 30 17:31:20 2019 +0100

    NIFI-5989 - PutKudu: Additional FF Queue length setting
    
    Signed-off-by: Mark Payne <ma...@hotmail.com>
---
 .../org/apache/nifi/processors/kudu/PutKudu.java     | 20 ++++++++++++++++++--
 .../org/apache/nifi/processors/kudu/TestPutKudu.java |  1 +
 2 files changed, 19 insertions(+), 2 deletions(-)

diff --git a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java
index 9605315..1a1d9b8 100644
--- a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java
+++ b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java
@@ -128,9 +128,21 @@ public class PutKudu extends AbstractProcessor {
         .required(true)
         .build();
 
+    protected static final PropertyDescriptor FLOWFILE_BATCH_SIZE = new PropertyDescriptor.Builder()
+        .name("FlowFiles per Batch")
+        .description("The maximum number of FlowFiles to process in a single execution, between 1 - 100000. " +
+            "Depending on your memory size, and data size per row set an appropriate batch size " +
+            "for the number of FlowFiles to process per client connection setup." +
+            "Gradually increase this number, only if your FlowFiles typically contain a few records.")
+        .defaultValue("1")
+        .required(true)
+        .addValidator(StandardValidators.createLongValidator(1, 100000, true))
+        .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+        .build();
+
     protected static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
         .name("Batch Size")
-        .description("The maximum number of FlowFiles to process in a single execution, between 1 - 100000. " +
+        .description("The maximum number of Records to process in a single Kudu-client batch, between 1 - 100000. " +
             "Depending on your memory size, and data size per row set an appropriate batch size. " +
             "Gradually increase this number to find out the best one for best performances.")
         .defaultValue("100")
@@ -139,6 +151,7 @@ public class PutKudu extends AbstractProcessor {
         .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
         .build();
 
+
     protected static final Relationship REL_SUCCESS = new Relationship.Builder()
         .name("success")
         .description("A FlowFile is routed to this relationship after it has been successfully stored in Kudu")
@@ -153,6 +166,7 @@ public class PutKudu extends AbstractProcessor {
     protected OperationType operationType;
     protected SessionConfiguration.FlushMode flushMode;
     protected int batchSize = 100;
+    protected int ffbatch   = 1;
 
     protected KuduClient kuduClient;
     protected KuduTable kuduTable;
@@ -166,6 +180,7 @@ public class PutKudu extends AbstractProcessor {
         properties.add(RECORD_READER);
         properties.add(INSERT_OPERATION);
         properties.add(FLUSH_MODE);
+        properties.add(FLOWFILE_BATCH_SIZE);
         properties.add(BATCH_SIZE);
 
         return properties;
@@ -186,6 +201,7 @@ public class PutKudu extends AbstractProcessor {
         final String kuduMasters = context.getProperty(KUDU_MASTERS).evaluateAttributeExpressions().getValue();
         operationType = OperationType.valueOf(context.getProperty(INSERT_OPERATION).getValue());
         batchSize = context.getProperty(BATCH_SIZE).evaluateAttributeExpressions().asInteger();
+        ffbatch   = context.getProperty(FLOWFILE_BATCH_SIZE).evaluateAttributeExpressions().asInteger();
         flushMode = SessionConfiguration.FlushMode.valueOf(context.getProperty(FLUSH_MODE).getValue());
 
         getLogger().debug("Setting up Kudu connection...");
@@ -209,7 +225,7 @@ public class PutKudu extends AbstractProcessor {
 
     @Override
     public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
-        final List<FlowFile> flowFiles = session.get(batchSize);
+        final List<FlowFile> flowFiles = session.get(ffbatch);
         if (flowFiles.isEmpty()) {
             return;
         }
diff --git a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/TestPutKudu.java b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/TestPutKudu.java
index 041b506..51908f2 100644
--- a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/TestPutKudu.java
+++ b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/TestPutKudu.java
@@ -480,6 +480,7 @@ public class TestPutKudu {
         setUpTestRunner(testRunner);
         testRunner.setProperty(PutKudu.FLUSH_MODE, flushMode.name());
         testRunner.setProperty(PutKudu.BATCH_SIZE, String.valueOf(batchSize));
+        testRunner.setProperty(PutKudu.FLOWFILE_BATCH_SIZE, String.valueOf(batchSize));
 
         IntStream.range(0, numFlowFiles).forEach(i -> testRunner.enqueue(""));
         testRunner.run(numFlowFiles);