You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2019/02/08 20:58:51 UTC

[nifi] branch master updated (4e914ce -> bc0e68f)

This is an automated email from the ASF dual-hosted git repository.

markap14 pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git.


    from 4e914ce  NIFI-6008: When a Processor is Terminated, call its #onPropertyModified method for properties whose value differs from the default value. Previously, it was called if the value differs from the current value (which it never will since we are only restoring the current values for the Processor). This results in ensuring that #onPr #onPropertyModified is correctly called when a component is reloaded
     new 29d5412  NIFI-5989 - PutKudu: Additional FF Queue length setting
     new bc0e68f  NIFI-5989: Update to property names to clarify difference between batch sizes

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../org/apache/nifi/processors/kudu/PutKudu.java    | 21 +++++++++++++++++++--
 .../apache/nifi/processors/kudu/TestPutKudu.java    |  1 +
 2 files changed, 20 insertions(+), 2 deletions(-)


[nifi] 02/02: NIFI-5989: Update to property names to clarify difference between batch sizes

Posted by ma...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

markap14 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git

commit bc0e68ff2abbfc2a1f38f3fe34812eb4adfc92fd
Author: Mark Payne <ma...@hotmail.com>
AuthorDate: Fri Feb 8 15:58:43 2019 -0500

    NIFI-5989: Update to property names to clarify difference between batch sizes
---
 .../src/main/java/org/apache/nifi/processors/kudu/PutKudu.java           | 1 +
 1 file changed, 1 insertion(+)

diff --git a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java
index 1a1d9b8..b0eb3f9 100644
--- a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java
+++ b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java
@@ -142,6 +142,7 @@ public class PutKudu extends AbstractProcessor {
 
     protected static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
         .name("Batch Size")
+        .displayName("Max Records per Batch")
         .description("The maximum number of Records to process in a single Kudu-client batch, between 1 - 100000. " +
             "Depending on your memory size, and data size per row set an appropriate batch size. " +
             "Gradually increase this number to find out the best one for best performances.")


[nifi] 01/02: NIFI-5989 - PutKudu: Additional FF Queue length setting

Posted by ma...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

markap14 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git

commit 29d54126e4afac1e984912b1ccb0a5fb222c7928
Author: Alex Goos <ag...@mac.com>
AuthorDate: Wed Jan 30 17:31:20 2019 +0100

    NIFI-5989 - PutKudu: Additional FF Queue length setting
    
    Signed-off-by: Mark Payne <ma...@hotmail.com>
---
 .../org/apache/nifi/processors/kudu/PutKudu.java     | 20 ++++++++++++++++++--
 .../org/apache/nifi/processors/kudu/TestPutKudu.java |  1 +
 2 files changed, 19 insertions(+), 2 deletions(-)

diff --git a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java
index 9605315..1a1d9b8 100644
--- a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java
+++ b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java
@@ -128,9 +128,21 @@ public class PutKudu extends AbstractProcessor {
         .required(true)
         .build();
 
+    protected static final PropertyDescriptor FLOWFILE_BATCH_SIZE = new PropertyDescriptor.Builder()
+        .name("FlowFiles per Batch")
+        .description("The maximum number of FlowFiles to process in a single execution, between 1 - 100000. " +
+            "Depending on your memory size, and data size per row set an appropriate batch size " +
+            "for the number of FlowFiles to process per client connection setup." +
+            "Gradually increase this number, only if your FlowFiles typically contain a few records.")
+        .defaultValue("1")
+        .required(true)
+        .addValidator(StandardValidators.createLongValidator(1, 100000, true))
+        .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+        .build();
+
     protected static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
         .name("Batch Size")
-        .description("The maximum number of FlowFiles to process in a single execution, between 1 - 100000. " +
+        .description("The maximum number of Records to process in a single Kudu-client batch, between 1 - 100000. " +
             "Depending on your memory size, and data size per row set an appropriate batch size. " +
             "Gradually increase this number to find out the best one for best performances.")
         .defaultValue("100")
@@ -139,6 +151,7 @@ public class PutKudu extends AbstractProcessor {
         .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
         .build();
 
+
     protected static final Relationship REL_SUCCESS = new Relationship.Builder()
         .name("success")
         .description("A FlowFile is routed to this relationship after it has been successfully stored in Kudu")
@@ -153,6 +166,7 @@ public class PutKudu extends AbstractProcessor {
     protected OperationType operationType;
     protected SessionConfiguration.FlushMode flushMode;
     protected int batchSize = 100;
+    protected int ffbatch   = 1;
 
     protected KuduClient kuduClient;
     protected KuduTable kuduTable;
@@ -166,6 +180,7 @@ public class PutKudu extends AbstractProcessor {
         properties.add(RECORD_READER);
         properties.add(INSERT_OPERATION);
         properties.add(FLUSH_MODE);
+        properties.add(FLOWFILE_BATCH_SIZE);
         properties.add(BATCH_SIZE);
 
         return properties;
@@ -186,6 +201,7 @@ public class PutKudu extends AbstractProcessor {
         final String kuduMasters = context.getProperty(KUDU_MASTERS).evaluateAttributeExpressions().getValue();
         operationType = OperationType.valueOf(context.getProperty(INSERT_OPERATION).getValue());
         batchSize = context.getProperty(BATCH_SIZE).evaluateAttributeExpressions().asInteger();
+        ffbatch   = context.getProperty(FLOWFILE_BATCH_SIZE).evaluateAttributeExpressions().asInteger();
         flushMode = SessionConfiguration.FlushMode.valueOf(context.getProperty(FLUSH_MODE).getValue());
 
         getLogger().debug("Setting up Kudu connection...");
@@ -209,7 +225,7 @@ public class PutKudu extends AbstractProcessor {
 
     @Override
     public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
-        final List<FlowFile> flowFiles = session.get(batchSize);
+        final List<FlowFile> flowFiles = session.get(ffbatch);
         if (flowFiles.isEmpty()) {
             return;
         }
diff --git a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/TestPutKudu.java b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/TestPutKudu.java
index 041b506..51908f2 100644
--- a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/TestPutKudu.java
+++ b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/TestPutKudu.java
@@ -480,6 +480,7 @@ public class TestPutKudu {
         setUpTestRunner(testRunner);
         testRunner.setProperty(PutKudu.FLUSH_MODE, flushMode.name());
         testRunner.setProperty(PutKudu.BATCH_SIZE, String.valueOf(batchSize));
+        testRunner.setProperty(PutKudu.FLOWFILE_BATCH_SIZE, String.valueOf(batchSize));
 
         IntStream.range(0, numFlowFiles).forEach(i -> testRunner.enqueue(""));
         testRunner.run(numFlowFiles);