You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ij...@apache.org on 2017/11/14 05:00:23 UTC
nifi git commit: NIFI-4578 Added strategy for dealing with nullable
fields in PutHBaseRecord.
Repository: nifi
Updated Branches:
refs/heads/master 9e9c129c2 -> ad2d12a20
NIFI-4578 Added strategy for dealing with nullable fields in PutHBaseRecord.
NIFI-4578 Added changes from code review.
This closes #2256.
Signed-off-by: Koji Kawamura <ij...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/ad2d12a2
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/ad2d12a2
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/ad2d12a2
Branch: refs/heads/master
Commit: ad2d12a20cdfdd93e99be84a52918eaf39269063
Parents: 9e9c129
Author: Mike Thomsen <mi...@gmail.com>
Authored: Tue Nov 7 11:24:46 2017 -0500
Committer: Koji Kawamura <ij...@apache.org>
Committed: Tue Nov 14 13:59:30 2017 +0900
----------------------------------------------------------------------
.../org/apache/nifi/hbase/PutHBaseRecord.java | 37 ++++++++++++++++++--
1 file changed, 35 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/ad2d12a2/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseRecord.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseRecord.java b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseRecord.java
index 3e6d624..b4de3c6 100755
--- a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseRecord.java
+++ b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseRecord.java
@@ -127,6 +127,20 @@ public class PutHBaseRecord extends AbstractPutHBase {
.defaultValue("1000")
.build();
+ protected static final AllowableValue NULL_FIELD_EMPTY = new AllowableValue("empty-bytes", "Empty Bytes",
+ "Use empty bytes. This can be used to overwrite existing fields or to put an empty placeholder value if you want" +
+ " every field to be present even if it has a null value.");
+ protected static final AllowableValue NULL_FIELD_SKIP = new AllowableValue("skip-field", "Skip Field", "Skip the field (don't process it at all).");
+
+ protected static final PropertyDescriptor NULL_FIELD_STRATEGY = new PropertyDescriptor.Builder()
+ .name("hbase-record-null-field-strategy")
+ .displayName("Null Field Strategy")
+ .required(true)
+ .defaultValue("skip-field")
+ .description("Handle null field values as either an empty string or skip them altogether.")
+ .allowableValues(NULL_FIELD_EMPTY, NULL_FIELD_SKIP)
+ .build();
+
@Override
public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
final List<PropertyDescriptor> properties = new ArrayList<>();
@@ -135,6 +149,7 @@ public class PutHBaseRecord extends AbstractPutHBase {
properties.add(TABLE_NAME);
properties.add(ROW_FIELD_NAME);
properties.add(ROW_ID_ENCODING_STRATEGY);
+ properties.add(NULL_FIELD_STRATEGY);
properties.add(COLUMN_FAMILY);
properties.add(TIMESTAMP_FIELD_NAME);
properties.add(BATCH_SIZE);
@@ -201,6 +216,9 @@ public class PutHBaseRecord extends AbstractPutHBase {
while ((record = reader.nextRecord()) != null) {
PutFlowFile putFlowFile = createPut(context, record, reader.getSchema(), flowFile, rowFieldName, columnFamily,
timestampFieldName, fieldEncodingStrategy, rowEncodingStrategy, complexFieldStrategy);
+ if (putFlowFile.getColumns().size() == 0) {
+ continue;
+ }
flowFiles.add(putFlowFile);
index++;
@@ -220,7 +238,9 @@ public class PutHBaseRecord extends AbstractPutHBase {
}
if (!failed) {
- sendProvenance(session, flowFile, columns, System.nanoTime() - start, last);
+ if (columns > 0) {
+ sendProvenance(session, flowFile, columns, System.nanoTime() - start, last);
+ }
flowFile = session.removeAttribute(flowFile, "restart.index");
session.transfer(flowFile, REL_SUCCESS);
} else {
@@ -317,12 +337,15 @@ public class PutHBaseRecord extends AbstractPutHBase {
}
}
+ static final byte[] EMPTY = "".getBytes();
+
protected PutFlowFile createPut(ProcessContext context, Record record, RecordSchema schema, FlowFile flowFile, String rowFieldName,
String columnFamily, String timestampFieldName, String fieldEncodingStrategy, String rowEncodingStrategy,
String complexFieldStrategy)
throws PutCreationFailedInvokedException {
PutFlowFile retVal = null;
final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue();
+ final String nullStrategy = context.getProperty(NULL_FIELD_STRATEGY).getValue();
boolean asString = STRING_ENCODING_VALUE.equals(fieldEncodingStrategy);
@@ -350,7 +373,17 @@ public class PutHBaseRecord extends AbstractPutHBase {
continue;
}
- final byte[] fieldValueBytes = asBytes(name, schema.getField(name).get().getDataType().getFieldType(), record, asString, complexFieldStrategy);
+ Object val = record.getValue(name);
+ final byte[] fieldValueBytes;
+ if (val == null && nullStrategy.equals(NULL_FIELD_SKIP.getValue())) {
+ continue;
+ } else if (val == null && nullStrategy.equals(NULL_FIELD_EMPTY.getValue())) {
+ fieldValueBytes = EMPTY;
+ } else {
+ fieldValueBytes = asBytes(name, schema.getField(name).get().getDataType().getFieldType(), record, asString, complexFieldStrategy);
+ }
+
+
if (fieldValueBytes != null) {
columns.add(new PutColumn(fam, clientService.toBytes(name), fieldValueBytes, timestamp));
}