You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by bb...@apache.org on 2017/08/04 18:54:32 UTC

[2/2] nifi git commit: NIFI-4024 Ensuring InputStream gets closed and cleaning up complex field handling

NIFI-4024 Ensuring InputStream gets closed and cleaning up complex field handling

This closes #1961.

Signed-off-by: Bryan Bende <bb...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/f8f1cc8d
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/f8f1cc8d
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/f8f1cc8d

Branch: refs/heads/master
Commit: f8f1cc8d0df4893f35d7cbe3413b7a3dd521bd18
Parents: 496a32e
Author: Bryan Bende <bb...@apache.org>
Authored: Thu Aug 3 13:00:25 2017 -0400
Committer: Bryan Bende <bb...@apache.org>
Committed: Fri Aug 4 14:54:05 2017 -0400

----------------------------------------------------------------------
 .../org/apache/nifi/hbase/PutHBaseRecord.java   | 104 +++++++++++--------
 1 file changed, 61 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/f8f1cc8d/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseRecord.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseRecord.java b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseRecord.java
index 66f95e0..8aa84ea 100755
--- a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseRecord.java
+++ b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseRecord.java
@@ -40,6 +40,7 @@ import org.apache.nifi.serialization.record.RecordFieldType;
 import org.apache.nifi.serialization.record.RecordSchema;
 
 import java.io.IOException;
+import java.io.InputStream;
 import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
@@ -146,7 +147,6 @@ public class PutHBaseRecord extends AbstractPutHBase {
         return columns;
     }
 
-    private RecordReaderFactory recordParserFactory;
     @Override
     public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
         final int batchSize = context.getProperty(BATCH_SIZE).asInteger();
@@ -176,7 +176,8 @@ public class PutHBaseRecord extends AbstractPutHBase {
         }
 
         PutFlowFile last  = null;
-        try (RecordReader reader = recordParserFactory.createRecordReader(flowFile, session.read(flowFile), getLogger())) {
+        try (final InputStream in = session.read(flowFile);
+             final RecordReader reader = recordParserFactory.createRecordReader(flowFile, in, getLogger())) {
             Record record;
             if (startIndex >= 0) {
                 while ( index++ < startIndex && (reader.nextRecord()) != null) {}
@@ -242,15 +243,29 @@ public class PutHBaseRecord extends AbstractPutHBase {
         byte[] retVal;
 
         if (asString) {
-            retVal = clientService.toBytes(record.getAsString(field));
+            switch (fieldType) {
+                case RECORD:
+                case CHOICE:
+                case ARRAY:
+                case MAP:
+                    retVal = handleComplexField(record, field, complexFieldStrategy);
+                    break;
+                default:
+                    final String value = record.getAsString(field);
+                    retVal = clientService.toBytes(value);
+                    break;
+            }
         } else {
             switch (fieldType) {
+                case RECORD:
+                case CHOICE:
+                case ARRAY:
+                case MAP:
+                    retVal = handleComplexField(record, field, complexFieldStrategy);
+                    break;
                 case BOOLEAN:
                     retVal = clientService.toBytes(record.getAsBoolean(field));
                     break;
-                case CHAR:
-                    retVal = clientService.toBytes(record.getAsString(field));
-                    break;
                 case DOUBLE:
                     retVal = clientService.toBytes(record.getAsDouble(field));
                     break;
@@ -264,31 +279,37 @@ public class PutHBaseRecord extends AbstractPutHBase {
                     retVal = clientService.toBytes(record.getAsLong(field));
                     break;
                 default:
-                    retVal = null;
-                    switch (complexFieldStrategy) {
-                        case FAIL_VALUE:
-                            getLogger().error("Complex value found for {}; routing to failure", new Object[]{field});
-                            throw new PutCreationFailedInvokedException(String.format("Complex value found for %s; routing to failure", field));
-                        case WARN_VALUE:
-                            getLogger().warn("Complex value found for {}; skipping", new Object[]{field});
-                            break;
-                        case TEXT_VALUE:
-                            retVal = clientService.toBytes(record.getAsString(field));
-                            break;
-                        case IGNORE_VALUE:
-                            // silently skip
-                            break;
-                        default:
-                            break;
-                    }
+                    final String value = record.getAsString(field);
+                    retVal = clientService.toBytes(value);
+                    break;
             }
         }
 
         return retVal;
     }
 
+    private byte[] handleComplexField(Record record, String field, String complexFieldStrategy) throws PutCreationFailedInvokedException {
+        switch (complexFieldStrategy) {
+            case FAIL_VALUE:
+                getLogger().error("Complex value found for {}; routing to failure", new Object[]{field});
+                throw new PutCreationFailedInvokedException(String.format("Complex value found for %s; routing to failure", field));
+            case WARN_VALUE:
+                getLogger().warn("Complex value found for {}; skipping", new Object[]{field});
+                return null;
+            case TEXT_VALUE:
+                final String value = record.getAsString(field);
+                return clientService.toBytes(value);
+            case IGNORE_VALUE:
+                // silently skip
+                return null;
+            default:
+                return null;
+        }
+    }
+
     protected PutFlowFile createPut(ProcessContext context, Record record, RecordSchema schema, FlowFile flowFile,
-                                    String rowFieldName, String columnFamily, String fieldEncodingStrategy, String rowEncodingStrategy, String complexFieldStrategy) throws PutCreationFailedInvokedException {
+                                    String rowFieldName, String columnFamily, String fieldEncodingStrategy, String rowEncodingStrategy, String complexFieldStrategy)
+            throws PutCreationFailedInvokedException {
         PutFlowFile retVal = null;
         final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue();
 
@@ -296,29 +317,26 @@ public class PutHBaseRecord extends AbstractPutHBase {
 
         final byte[] fam  = clientService.toBytes(columnFamily);
 
-        //try {
-            if (record != null) {
-                List<PutColumn> columns = new ArrayList<>();
-                for (String name : schema.getFieldNames()) {
-                    if (name.equals(rowFieldName)) {
-                        continue;
-                    }
-                    columns.add(new PutColumn(fam, clientService.toBytes(name), asBytes(name,
-                            schema.getField(name).get().getDataType().getFieldType(), record, asString, complexFieldStrategy)));
-                }
-                String rowIdValue = record.getAsString(rowFieldName);
-                if (rowIdValue == null) {
-                    throw new PutCreationFailedInvokedException(String.format("Row ID was null for flowfile with ID %s", flowFile.getAttribute("uuid")));
+        if (record != null) {
+            List<PutColumn> columns = new ArrayList<>();
+            for (String name : schema.getFieldNames()) {
+                if (name.equals(rowFieldName)) {
+                    continue;
                 }
-                byte[] rowId =  getRow(rowIdValue, rowEncodingStrategy);
 
-                retVal = new PutFlowFile(tableName, rowId, columns, flowFile);
+                final byte[] fieldValueBytes = asBytes(name, schema.getField(name).get().getDataType().getFieldType(), record, asString, complexFieldStrategy);
+                if (fieldValueBytes != null) {
+                    columns.add(new PutColumn(fam, clientService.toBytes(name), fieldValueBytes));
+                }
+            }
+            String rowIdValue = record.getAsString(rowFieldName);
+            if (rowIdValue == null) {
+                throw new PutCreationFailedInvokedException(String.format("Row ID was null for flowfile with ID %s", flowFile.getAttribute("uuid")));
             }
+            byte[] rowId =  getRow(rowIdValue, rowEncodingStrategy);
 
-/*        } catch (Exception ex) {
-            getLogger().error("Error running createPuts", ex);
-            throw new RuntimeException(ex);
-        }*/
+            retVal = new PutFlowFile(tableName, rowId, columns, flowFile);
+        }
 
         return retVal;
     }