You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by pv...@apache.org on 2017/09/22 09:09:29 UTC

nifi git commit: NIFI-4384 - Enhance PutKudu processor to support batch insert

Repository: nifi
Updated Branches:
  refs/heads/master 329dbe3a6 -> 0c0c33411


NIFI-4384 - Enhance PutKudu processor to support batch insert

Signed-off-by: Pierre Villard <pi...@gmail.com>

This closes #2160.


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/0c0c3341
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/0c0c3341
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/0c0c3341

Branch: refs/heads/master
Commit: 0c0c33411d586af8c0c195181747e9d55709a574
Parents: 329dbe3
Author: cam <ca...@inspur.com>
Authored: Thu Sep 14 15:29:08 2017 -0700
Committer: Pierre Villard <pi...@gmail.com>
Committed: Fri Sep 22 10:56:15 2017 +0200

----------------------------------------------------------------------
 .../nifi/processors/kudu/AbstractKudu.java      | 43 ++++++++++++++++++--
 .../apache/nifi/processors/kudu/PutKudu.java    |  4 +-
 2 files changed, 42 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/0c0c3341/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKudu.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKudu.java b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKudu.java
index 5019e03..359e817 100644
--- a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKudu.java
+++ b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKudu.java
@@ -28,6 +28,7 @@ import org.apache.kudu.client.KuduSession;
 import org.apache.kudu.client.KuduTable;
 import org.apache.kudu.client.Insert;
 import org.apache.kudu.client.Upsert;
+import org.apache.kudu.client.SessionConfiguration;
 
 import org.apache.nifi.annotation.lifecycle.OnScheduled;
 import org.apache.nifi.annotation.lifecycle.OnStopped;
@@ -60,6 +61,7 @@ public abstract class AbstractKudu extends AbstractProcessor {
             .description("List all kudu masters's ip with port (e.g. 7051), comma separated")
             .required(true)
             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(true)
             .build();
 
     protected static final PropertyDescriptor TABLE_NAME = new PropertyDescriptor.Builder()
@@ -67,6 +69,7 @@ public abstract class AbstractKudu extends AbstractProcessor {
             .description("The name of the Kudu Table to put data into")
             .required(true)
             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(true)
             .build();
 
     public static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
@@ -94,6 +97,29 @@ public abstract class AbstractKudu extends AbstractProcessor {
             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
             .build();
 
+    protected static final PropertyDescriptor FLUSH_MODE = new PropertyDescriptor.Builder()
+            .name("Flush Mode")
+            .description("Set the new flush mode for a kudu session.\n" +
+                    "AUTO_FLUSH_SYNC: the call returns when the operation is persisted, else it throws an exception.\n" +
+                    "AUTO_FLUSH_BACKGROUND: the call returns when the operation has been added to the buffer. This call should normally perform only fast in-memory" +
+                    " operations but it may have to wait when the buffer is full and there's another buffer being flushed.\n" +
+                    "MANUAL_FLUSH: the call returns when the operation has been added to the buffer, else it throws a KuduException if the buffer is full.")
+            .allowableValues(SessionConfiguration.FlushMode.values())
+            .defaultValue(SessionConfiguration.FlushMode.AUTO_FLUSH_BACKGROUND.toString())
+            .required(true)
+            .build();
+
+    protected static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
+            .name("Batch Size")
+            .description("Set the number of operations that can be buffered, between 2 - 100000. " +
+                    "Depending on your memory size, and data size per row set an appropriate batch size. " +
+                    "Gradually increase this number to find out the best one for best performances.")
+            .defaultValue("100")
+            .required(true)
+            .addValidator(StandardValidators.createLongValidator(2, 100000, true))
+            .expressionLanguageSupported(true)
+            .build();
+
     protected static final Relationship REL_SUCCESS = new Relationship.Builder()
             .name("success")
             .description("A FlowFile is routed to this relationship after it has been successfully stored in Kudu")
@@ -109,6 +135,8 @@ public abstract class AbstractKudu extends AbstractProcessor {
     protected String tableName;
     protected boolean skipHeadLine;
     protected OperationType operationType;
+    protected SessionConfiguration.FlushMode flushMode;
+    protected int batchSize = 100;
 
     protected KuduClient kuduClient;
     protected KuduTable kuduTable;
@@ -116,19 +144,22 @@ public abstract class AbstractKudu extends AbstractProcessor {
     @OnScheduled
     public void OnScheduled(final ProcessContext context) {
         try {
-            tableName = context.getProperty(TABLE_NAME).getValue();
-            kuduMasters = context.getProperty(KUDU_MASTERS).getValue();
+            tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions().getValue();
+            kuduMasters = context.getProperty(KUDU_MASTERS).evaluateAttributeExpressions().getValue();
             if(kuduClient == null) {
                 getLogger().debug("Setting up Kudu connection...");
                 kuduClient = getKuduConnection(kuduMasters);
                 kuduTable = this.getKuduTable(kuduClient, tableName);
                 getLogger().debug("Kudu connection successfully initialized");
             }
+
+            operationType = OperationType.valueOf(context.getProperty(INSERT_OPERATION).getValue());
+            batchSize = context.getProperty(BATCH_SIZE).evaluateAttributeExpressions().asInteger();
+            flushMode = SessionConfiguration.FlushMode.valueOf(context.getProperty(FLUSH_MODE).getValue());
+            skipHeadLine = context.getProperty(SKIP_HEAD_LINE).asBoolean();
         } catch(KuduException ex){
             getLogger().error("Exception occurred while interacting with Kudu due to " + ex.getMessage(), ex);
         }
-        operationType = OperationType.valueOf(context.getProperty(INSERT_OPERATION).getValue());
-        skipHeadLine = context.getProperty(SKIP_HEAD_LINE).asBoolean();
     }
 
     @OnStopped
@@ -223,6 +254,10 @@ public abstract class AbstractKudu extends AbstractProcessor {
     protected KuduSession getKuduSession(KuduClient client){
 
         KuduSession kuduSession = client.newSession();
+
+        kuduSession.setMutationBufferSpace(batchSize);
+        kuduSession.setFlushMode(flushMode);
+
         if(operationType == OperationType.INSERT_IGNORE){
             kuduSession.setIgnoreAllDuplicateRows(true);
         }

http://git-wip-us.apache.org/repos/asf/nifi/blob/0c0c3341/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java
index 53fc678..313e49b 100644
--- a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java
+++ b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java
@@ -43,7 +43,7 @@ import java.util.Set;
 @EventDriven
 @SupportsBatching
 @InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
-@Tags({"put", "database", "NoSQL", "kudu", "HDFS"})
+@Tags({"put", "database", "NoSQL", "kudu", "HDFS", "record"})
 @CapabilityDescription("Reads records from an incoming FlowFile using the provided Record Reader, and writes those records " +
         "to the specified Kudu's table. The schema for the table must be provided in the processor properties or from your source." +
         " If any error occurs while reading records from the input, or writing records to Kudu, the FlowFile will be routed to failure")
@@ -58,6 +58,8 @@ public class PutKudu extends AbstractKudu {
         properties.add(SKIP_HEAD_LINE);
         properties.add(RECORD_READER);
         properties.add(INSERT_OPERATION);
+        properties.add(FLUSH_MODE);
+        properties.add(BATCH_SIZE);
 
         return properties;
     }