You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ij...@apache.org on 2017/11/14 05:00:23 UTC

nifi git commit: NIFI-4578 Added strategy for dealing with nullable fields in PutHBaseRecord.

Repository: nifi
Updated Branches:
  refs/heads/master 9e9c129c2 -> ad2d12a20


NIFI-4578 Added strategy for dealing with nullable fields in PutHBaseRecord.

NIFI-4578 Added changes from code review.

This closes #2256.

Signed-off-by: Koji Kawamura <ij...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/ad2d12a2
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/ad2d12a2
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/ad2d12a2

Branch: refs/heads/master
Commit: ad2d12a20cdfdd93e99be84a52918eaf39269063
Parents: 9e9c129
Author: Mike Thomsen <mi...@gmail.com>
Authored: Tue Nov 7 11:24:46 2017 -0500
Committer: Koji Kawamura <ij...@apache.org>
Committed: Tue Nov 14 13:59:30 2017 +0900

----------------------------------------------------------------------
 .../org/apache/nifi/hbase/PutHBaseRecord.java   | 37 ++++++++++++++++++--
 1 file changed, 35 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/ad2d12a2/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseRecord.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseRecord.java b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseRecord.java
index 3e6d624..b4de3c6 100755
--- a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseRecord.java
+++ b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseRecord.java
@@ -127,6 +127,20 @@ public class PutHBaseRecord extends AbstractPutHBase {
             .defaultValue("1000")
             .build();
 
+    protected static final AllowableValue NULL_FIELD_EMPTY = new AllowableValue("empty-bytes", "Empty Bytes",
+            "Use empty bytes. This can be used to overwrite existing fields or to put an empty placeholder value if you want" +
+                    " every field to be present even if it has a null value.");
+    protected static final AllowableValue NULL_FIELD_SKIP  = new AllowableValue("skip-field", "Skip Field", "Skip the field (don't process it at all).");
+
+    protected static final PropertyDescriptor NULL_FIELD_STRATEGY = new PropertyDescriptor.Builder()
+            .name("hbase-record-null-field-strategy")
+            .displayName("Null Field Strategy")
+            .required(true)
+            .defaultValue("skip-field")
+            .description("Handle null field values as either an empty string or skip them altogether.")
+            .allowableValues(NULL_FIELD_EMPTY, NULL_FIELD_SKIP)
+            .build();
+
     @Override
     public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
         final List<PropertyDescriptor> properties = new ArrayList<>();
@@ -135,6 +149,7 @@ public class PutHBaseRecord extends AbstractPutHBase {
         properties.add(TABLE_NAME);
         properties.add(ROW_FIELD_NAME);
         properties.add(ROW_ID_ENCODING_STRATEGY);
+        properties.add(NULL_FIELD_STRATEGY);
         properties.add(COLUMN_FAMILY);
         properties.add(TIMESTAMP_FIELD_NAME);
         properties.add(BATCH_SIZE);
@@ -201,6 +216,9 @@ public class PutHBaseRecord extends AbstractPutHBase {
             while ((record = reader.nextRecord()) != null) {
                 PutFlowFile putFlowFile = createPut(context, record, reader.getSchema(), flowFile, rowFieldName, columnFamily,
                         timestampFieldName, fieldEncodingStrategy, rowEncodingStrategy, complexFieldStrategy);
+                if (putFlowFile.getColumns().size() == 0) {
+                    continue;
+                }
                 flowFiles.add(putFlowFile);
                 index++;
 
@@ -220,7 +238,9 @@ public class PutHBaseRecord extends AbstractPutHBase {
         }
 
         if (!failed) {
-            sendProvenance(session, flowFile, columns, System.nanoTime() - start, last);
+            if (columns > 0) {
+                sendProvenance(session, flowFile, columns, System.nanoTime() - start, last);
+            }
             flowFile = session.removeAttribute(flowFile, "restart.index");
             session.transfer(flowFile, REL_SUCCESS);
         } else {
@@ -317,12 +337,15 @@ public class PutHBaseRecord extends AbstractPutHBase {
         }
     }
 
+    static final byte[] EMPTY = "".getBytes();
+
     protected PutFlowFile createPut(ProcessContext context, Record record, RecordSchema schema, FlowFile flowFile, String rowFieldName,
                                     String columnFamily, String timestampFieldName, String fieldEncodingStrategy, String rowEncodingStrategy,
                                     String complexFieldStrategy)
             throws PutCreationFailedInvokedException {
         PutFlowFile retVal = null;
         final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue();
+        final String nullStrategy = context.getProperty(NULL_FIELD_STRATEGY).getValue();
 
         boolean asString = STRING_ENCODING_VALUE.equals(fieldEncodingStrategy);
 
@@ -350,7 +373,17 @@ public class PutHBaseRecord extends AbstractPutHBase {
                     continue;
                 }
 
-                final byte[] fieldValueBytes = asBytes(name, schema.getField(name).get().getDataType().getFieldType(), record, asString, complexFieldStrategy);
+                Object val = record.getValue(name);
+                final byte[] fieldValueBytes;
+                if (val == null && nullStrategy.equals(NULL_FIELD_SKIP.getValue())) {
+                    continue;
+                } else if (val == null && nullStrategy.equals(NULL_FIELD_EMPTY.getValue())) {
+                    fieldValueBytes = EMPTY;
+                } else {
+                    fieldValueBytes = asBytes(name, schema.getField(name).get().getDataType().getFieldType(), record, asString, complexFieldStrategy);
+                }
+
+
                 if (fieldValueBytes != null) {
                     columns.add(new PutColumn(fam, clientService.toBytes(name), fieldValueBytes, timestamp));
                 }