You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by bb...@apache.org on 2017/08/04 18:54:32 UTC
[2/2] nifi git commit: NIFI-4024 Ensuring InputStream gets closed and
cleaning up complex field handling
NIFI-4024 Ensuring InputStream gets closed and cleaning up complex field handling
This closes #1961.
Signed-off-by: Bryan Bende <bb...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/f8f1cc8d
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/f8f1cc8d
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/f8f1cc8d
Branch: refs/heads/master
Commit: f8f1cc8d0df4893f35d7cbe3413b7a3dd521bd18
Parents: 496a32e
Author: Bryan Bende <bb...@apache.org>
Authored: Thu Aug 3 13:00:25 2017 -0400
Committer: Bryan Bende <bb...@apache.org>
Committed: Fri Aug 4 14:54:05 2017 -0400
----------------------------------------------------------------------
.../org/apache/nifi/hbase/PutHBaseRecord.java | 104 +++++++++++--------
1 file changed, 61 insertions(+), 43 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/f8f1cc8d/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseRecord.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseRecord.java b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseRecord.java
index 66f95e0..8aa84ea 100755
--- a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseRecord.java
+++ b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseRecord.java
@@ -40,6 +40,7 @@ import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema;
import java.io.IOException;
+import java.io.InputStream;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
@@ -146,7 +147,6 @@ public class PutHBaseRecord extends AbstractPutHBase {
return columns;
}
- private RecordReaderFactory recordParserFactory;
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
final int batchSize = context.getProperty(BATCH_SIZE).asInteger();
@@ -176,7 +176,8 @@ public class PutHBaseRecord extends AbstractPutHBase {
}
PutFlowFile last = null;
- try (RecordReader reader = recordParserFactory.createRecordReader(flowFile, session.read(flowFile), getLogger())) {
+ try (final InputStream in = session.read(flowFile);
+ final RecordReader reader = recordParserFactory.createRecordReader(flowFile, in, getLogger())) {
Record record;
if (startIndex >= 0) {
while ( index++ < startIndex && (reader.nextRecord()) != null) {}
@@ -242,15 +243,29 @@ public class PutHBaseRecord extends AbstractPutHBase {
byte[] retVal;
if (asString) {
- retVal = clientService.toBytes(record.getAsString(field));
+ switch (fieldType) {
+ case RECORD:
+ case CHOICE:
+ case ARRAY:
+ case MAP:
+ retVal = handleComplexField(record, field, complexFieldStrategy);
+ break;
+ default:
+ final String value = record.getAsString(field);
+ retVal = clientService.toBytes(value);
+ break;
+ }
} else {
switch (fieldType) {
+ case RECORD:
+ case CHOICE:
+ case ARRAY:
+ case MAP:
+ retVal = handleComplexField(record, field, complexFieldStrategy);
+ break;
case BOOLEAN:
retVal = clientService.toBytes(record.getAsBoolean(field));
break;
- case CHAR:
- retVal = clientService.toBytes(record.getAsString(field));
- break;
case DOUBLE:
retVal = clientService.toBytes(record.getAsDouble(field));
break;
@@ -264,31 +279,37 @@ public class PutHBaseRecord extends AbstractPutHBase {
retVal = clientService.toBytes(record.getAsLong(field));
break;
default:
- retVal = null;
- switch (complexFieldStrategy) {
- case FAIL_VALUE:
- getLogger().error("Complex value found for {}; routing to failure", new Object[]{field});
- throw new PutCreationFailedInvokedException(String.format("Complex value found for %s; routing to failure", field));
- case WARN_VALUE:
- getLogger().warn("Complex value found for {}; skipping", new Object[]{field});
- break;
- case TEXT_VALUE:
- retVal = clientService.toBytes(record.getAsString(field));
- break;
- case IGNORE_VALUE:
- // silently skip
- break;
- default:
- break;
- }
+ final String value = record.getAsString(field);
+ retVal = clientService.toBytes(value);
+ break;
}
}
return retVal;
}
+ private byte[] handleComplexField(Record record, String field, String complexFieldStrategy) throws PutCreationFailedInvokedException {
+ switch (complexFieldStrategy) {
+ case FAIL_VALUE:
+ getLogger().error("Complex value found for {}; routing to failure", new Object[]{field});
+ throw new PutCreationFailedInvokedException(String.format("Complex value found for %s; routing to failure", field));
+ case WARN_VALUE:
+ getLogger().warn("Complex value found for {}; skipping", new Object[]{field});
+ return null;
+ case TEXT_VALUE:
+ final String value = record.getAsString(field);
+ return clientService.toBytes(value);
+ case IGNORE_VALUE:
+ // silently skip
+ return null;
+ default:
+ return null;
+ }
+ }
+
protected PutFlowFile createPut(ProcessContext context, Record record, RecordSchema schema, FlowFile flowFile,
- String rowFieldName, String columnFamily, String fieldEncodingStrategy, String rowEncodingStrategy, String complexFieldStrategy) throws PutCreationFailedInvokedException {
+ String rowFieldName, String columnFamily, String fieldEncodingStrategy, String rowEncodingStrategy, String complexFieldStrategy)
+ throws PutCreationFailedInvokedException {
PutFlowFile retVal = null;
final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue();
@@ -296,29 +317,26 @@ public class PutHBaseRecord extends AbstractPutHBase {
final byte[] fam = clientService.toBytes(columnFamily);
- //try {
- if (record != null) {
- List<PutColumn> columns = new ArrayList<>();
- for (String name : schema.getFieldNames()) {
- if (name.equals(rowFieldName)) {
- continue;
- }
- columns.add(new PutColumn(fam, clientService.toBytes(name), asBytes(name,
- schema.getField(name).get().getDataType().getFieldType(), record, asString, complexFieldStrategy)));
- }
- String rowIdValue = record.getAsString(rowFieldName);
- if (rowIdValue == null) {
- throw new PutCreationFailedInvokedException(String.format("Row ID was null for flowfile with ID %s", flowFile.getAttribute("uuid")));
+ if (record != null) {
+ List<PutColumn> columns = new ArrayList<>();
+ for (String name : schema.getFieldNames()) {
+ if (name.equals(rowFieldName)) {
+ continue;
}
- byte[] rowId = getRow(rowIdValue, rowEncodingStrategy);
- retVal = new PutFlowFile(tableName, rowId, columns, flowFile);
+ final byte[] fieldValueBytes = asBytes(name, schema.getField(name).get().getDataType().getFieldType(), record, asString, complexFieldStrategy);
+ if (fieldValueBytes != null) {
+ columns.add(new PutColumn(fam, clientService.toBytes(name), fieldValueBytes));
+ }
+ }
+ String rowIdValue = record.getAsString(rowFieldName);
+ if (rowIdValue == null) {
+ throw new PutCreationFailedInvokedException(String.format("Row ID was null for flowfile with ID %s", flowFile.getAttribute("uuid")));
}
+ byte[] rowId = getRow(rowIdValue, rowEncodingStrategy);
-/* } catch (Exception ex) {
- getLogger().error("Error running createPuts", ex);
- throw new RuntimeException(ex);
- }*/
+ retVal = new PutFlowFile(tableName, rowId, columns, flowFile);
+ }
return retVal;
}