You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2019/11/07 23:25:20 UTC

[nifi] branch master updated: NIFI-6680: This closes #3748. Nifi PutKudu processor - Convert record field names to lowercase

This is an automated email from the ASF dual-hosted git repository.

joewitt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/master by this push:
     new eb366c8  NIFI-6680: This closes #3748. Nifi PutKudu processor - Convert record field names to lowercase
eb366c8 is described below

commit eb366c8d0ab6a071f0f593fcbf227856243fdd5b
Author: kevinmccarthy <00...@gmail.com>
AuthorDate: Mon Sep 16 15:52:24 2019 -0500

    NIFI-6680: This closes #3748. Nifi PutKudu processor - Convert record field names to lowercase
    
    Signed-off-by: Joe Witt <jo...@apache.org>
---
 .../processors/kudu/AbstractKuduProcessor.java     | 47 +++++++++++----------
 .../org/apache/nifi/processors/kudu/PutKudu.java   | 27 ++++++++----
 .../apache/nifi/processors/kudu/MockPutKudu.java   |  8 ++--
 .../apache/nifi/processors/kudu/TestPutKudu.java   | 49 +++++++++++++++++-----
 4 files changed, 87 insertions(+), 44 deletions(-)

diff --git a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKuduProcessor.java b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKuduProcessor.java
index 6f946f4..fbf931b 100644
--- a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKuduProcessor.java
+++ b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKuduProcessor.java
@@ -166,15 +166,18 @@ public abstract class AbstractKuduProcessor extends AbstractProcessor {
     }
 
     @VisibleForTesting
-    protected void buildPartialRow(Schema schema, PartialRow row, Record record, List<String> fieldNames, Boolean ignoreNull) {
-
-        for (String colName : fieldNames) {
+    protected void buildPartialRow(Schema schema, PartialRow row, Record record, List<String> fieldNames, Boolean ignoreNull, Boolean lowercaseFields) {
+        for (String recordFieldName : fieldNames) {
+            String colName = recordFieldName;
+            if (lowercaseFields) {
+                colName = colName.toLowerCase();
+            }
             int colIdx = this.getColumnIndex(schema, colName);
             if (colIdx != -1) {
                 ColumnSchema colSchema = schema.getColumnByIndex(colIdx);
                 Type colType = colSchema.getType();
 
-                if (record.getValue(colName) == null) {
+                if (record.getValue(recordFieldName) == null) {
                     if (schema.getColumnByIndex(colIdx).isKey()) {
                         throw new IllegalArgumentException(String.format("Can't set primary key column %s to null ", colName));
                     } else if(!schema.getColumnByIndex(colIdx).isNullable()) {
@@ -188,37 +191,37 @@ public abstract class AbstractKuduProcessor extends AbstractProcessor {
                 } else {
                     switch (colType.getDataType(colSchema.getTypeAttributes())) {
                         case BOOL:
-                            row.addBoolean(colIdx, record.getAsBoolean(colName));
+                            row.addBoolean(colIdx, record.getAsBoolean(recordFieldName));
                             break;
                         case FLOAT:
-                            row.addFloat(colIdx, record.getAsFloat(colName));
+                            row.addFloat(colIdx, record.getAsFloat(recordFieldName));
                             break;
                         case DOUBLE:
-                            row.addDouble(colIdx, record.getAsDouble(colName));
+                            row.addDouble(colIdx, record.getAsDouble(recordFieldName));
                             break;
                         case BINARY:
-                            row.addBinary(colIdx, record.getAsString(colName).getBytes());
+                            row.addBinary(colIdx, record.getAsString(recordFieldName).getBytes());
                             break;
                         case INT8:
-                            row.addByte(colIdx, record.getAsInt(colName).byteValue());
+                            row.addByte(colIdx, record.getAsInt(recordFieldName).byteValue());
                             break;
                         case INT16:
-                            row.addShort(colIdx, record.getAsInt(colName).shortValue());
+                            row.addShort(colIdx, record.getAsInt(recordFieldName).shortValue());
                             break;
                         case INT32:
-                            row.addInt(colIdx, record.getAsInt(colName));
+                            row.addInt(colIdx, record.getAsInt(recordFieldName));
                             break;
                         case INT64:
                         case UNIXTIME_MICROS:
-                            row.addLong(colIdx, record.getAsLong(colName));
+                            row.addLong(colIdx, record.getAsLong(recordFieldName));
                             break;
                         case STRING:
-                            row.addString(colIdx, record.getAsString(colName));
+                            row.addString(colIdx, record.getAsString(recordFieldName));
                             break;
                         case DECIMAL32:
                         case DECIMAL64:
                         case DECIMAL128:
-                            row.addDecimal(colIdx, new BigDecimal(record.getAsString(colName)));
+                            row.addDecimal(colIdx, new BigDecimal(record.getAsString(recordFieldName)));
                             break;
                         default:
                             throw new IllegalStateException(String.format("unknown column type %s", colType));
@@ -236,27 +239,27 @@ public abstract class AbstractKuduProcessor extends AbstractProcessor {
         }
     }
 
-    protected Upsert upsertRecordToKudu(KuduTable kuduTable, Record record, List<String> fieldNames, Boolean ignoreNull) {
+    protected Upsert upsertRecordToKudu(KuduTable kuduTable, Record record, List<String> fieldNames, Boolean ignoreNull, Boolean lowercaseFields) {
         Upsert upsert = kuduTable.newUpsert();
-        buildPartialRow(kuduTable.getSchema(), upsert.getRow(), record, fieldNames, ignoreNull);
+        buildPartialRow(kuduTable.getSchema(), upsert.getRow(), record, fieldNames, ignoreNull, lowercaseFields);
         return upsert;
     }
 
-    protected Insert insertRecordToKudu(KuduTable kuduTable, Record record, List<String> fieldNames, Boolean ignoreNull) {
+    protected Insert insertRecordToKudu(KuduTable kuduTable, Record record, List<String> fieldNames, Boolean ignoreNull, Boolean lowercaseFields) {
         Insert insert = kuduTable.newInsert();
-        buildPartialRow(kuduTable.getSchema(), insert.getRow(), record, fieldNames, ignoreNull);
+        buildPartialRow(kuduTable.getSchema(), insert.getRow(), record, fieldNames, ignoreNull, lowercaseFields);
         return insert;
     }
 
-    protected Delete deleteRecordFromKudu(KuduTable kuduTable, Record record, List<String> fieldNames, Boolean ignoreNull) {
+    protected Delete deleteRecordFromKudu(KuduTable kuduTable, Record record, List<String> fieldNames, Boolean ignoreNull, Boolean lowercaseFields) {
         Delete delete = kuduTable.newDelete();
-        buildPartialRow(kuduTable.getSchema(), delete.getRow(), record, fieldNames, ignoreNull);
+        buildPartialRow(kuduTable.getSchema(), delete.getRow(), record, fieldNames, ignoreNull, lowercaseFields);
         return delete;
     }
 
-    protected Update updateRecordToKudu(KuduTable kuduTable, Record record, List<String> fieldNames, Boolean ignoreNull) {
+    protected Update updateRecordToKudu(KuduTable kuduTable, Record record, List<String> fieldNames, Boolean ignoreNull, Boolean lowercaseFields) {
         Update update = kuduTable.newUpdate();
-        buildPartialRow(kuduTable.getSchema(), update.getRow(), record, fieldNames, ignoreNull);
+        buildPartialRow(kuduTable.getSchema(), update.getRow(), record, fieldNames, ignoreNull, lowercaseFields);
         return update;
     }
 
diff --git a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java
index 1b47337..6476886 100644
--- a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java
+++ b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java
@@ -99,6 +99,15 @@ public class PutKudu extends AbstractKuduProcessor {
         .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
         .build();
 
+    protected static final PropertyDescriptor LOWERCASE_FIELD_NAMES = new Builder()
+            .name("Lowercase Field Names")
+            .description("Convert column names to lowercase when finding index of Kudu table columns")
+            .defaultValue("false")
+            .required(true)
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+            .build();
+
     protected static final PropertyDescriptor INSERT_OPERATION = new Builder()
         .name("Insert Operation")
         .displayName("Kudu Operation Type")
@@ -179,6 +188,7 @@ public class PutKudu extends AbstractKuduProcessor {
         properties.add(TABLE_NAME);
         properties.add(KERBEROS_CREDENTIALS_SERVICE);
         properties.add(SKIP_HEAD_LINE);
+        properties.add(LOWERCASE_FIELD_NAMES);
         properties.add(RECORD_READER);
         properties.add(INSERT_OPERATION);
         properties.add(FLUSH_MODE);
@@ -247,16 +257,17 @@ public class PutKudu extends AbstractKuduProcessor {
         for (FlowFile flowFile : flowFiles) {
             operationType = OperationType.valueOf(context.getProperty(INSERT_OPERATION).evaluateAttributeExpressions(flowFile).getValue());
             Boolean ignoreNull = Boolean.valueOf(context.getProperty(IGNORE_NULL).evaluateAttributeExpressions(flowFile).getValue());
+            Boolean lowercaseFields = Boolean.valueOf(context.getProperty(LOWERCASE_FIELD_NAMES).evaluateAttributeExpressions(flowFile).getValue());
             try (final InputStream in = session.read(flowFile);
                 final RecordReader recordReader = recordReaderFactory.createRecordReader(flowFile, in, getLogger())) {
-                final List<String> fieldNames = recordReader.getSchema().getFieldNames();
                 final RecordSet recordSet = recordReader.createRecordSet();
+                final List<String> fieldNames = recordReader.getSchema().getFieldNames();
                 final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue();
                 final KuduTable kuduTable = kuduClient.openTable(tableName);
 
                 Record record = recordSet.next();
                 while (record != null) {
-                    Operation operation = getKuduOperationType(operationType, record, fieldNames, ignoreNull, kuduTable);
+                    Operation operation = getKuduOperationType(operationType, record, fieldNames, ignoreNull, lowercaseFields, kuduTable);
                     // We keep track of mappings between Operations and their origins,
                     // so that we know which FlowFiles should be marked failure after buffered flush.
                     operationFlowFileMap.put(operation, flowFile);
@@ -343,18 +354,18 @@ public class PutKudu extends AbstractKuduProcessor {
         return kuduSession;
     }
 
-    private Operation getKuduOperationType(OperationType operationType, Record record, List<String> fieldNames, Boolean ignoreNull, KuduTable kuduTable) {
+    private Operation getKuduOperationType(OperationType operationType, Record record, List<String> fieldNames, Boolean ignoreNull, Boolean lowercaseFields, KuduTable kuduTable) {
         switch (operationType) {
             case DELETE:
-                return deleteRecordFromKudu(kuduTable, record, fieldNames, ignoreNull);
+                return deleteRecordFromKudu(kuduTable, record, fieldNames, ignoreNull, lowercaseFields);
             case INSERT:
-                return insertRecordToKudu(kuduTable, record, fieldNames, ignoreNull);
+                return insertRecordToKudu(kuduTable, record, fieldNames, ignoreNull, lowercaseFields);
             case INSERT_IGNORE:
-                return insertRecordToKudu(kuduTable, record, fieldNames, ignoreNull);
+                return insertRecordToKudu(kuduTable, record, fieldNames, ignoreNull, lowercaseFields);
             case UPSERT:
-                return upsertRecordToKudu(kuduTable, record, fieldNames, ignoreNull);
+                return upsertRecordToKudu(kuduTable, record, fieldNames, ignoreNull, lowercaseFields);
             case UPDATE:
-                return updateRecordToKudu(kuduTable, record, fieldNames, ignoreNull);
+                return updateRecordToKudu(kuduTable, record, fieldNames, ignoreNull, lowercaseFields);
             default:
                 throw new IllegalArgumentException(String.format("OperationType: %s not supported by Kudu", operationType));
         }
diff --git a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/MockPutKudu.java b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/MockPutKudu.java
index b5ce71f..f416e96 100644
--- a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/MockPutKudu.java
+++ b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/MockPutKudu.java
@@ -62,23 +62,23 @@ public class MockPutKudu extends PutKudu {
     }
 
     @Override
-    protected Insert insertRecordToKudu(KuduTable kuduTable, Record record, List<String> fieldNames, Boolean ignoreNull) {
+    protected Insert insertRecordToKudu(KuduTable kuduTable, Record record, List<String> fieldNames, Boolean ignoreNull, Boolean lowercaseFields) {
         Insert insert = insertQueue.poll();
         return insert != null ? insert : mock(Insert.class);
     }
 
     @Override
-    protected Upsert upsertRecordToKudu(KuduTable kuduTable, Record record, List<String> fieldNames, Boolean ignoreNull) {
+    protected Upsert upsertRecordToKudu(KuduTable kuduTable, Record record, List<String> fieldNames, Boolean ignoreNull, Boolean lowercaseFields) {
         return mock(Upsert.class);
     }
 
     @Override
-    protected Delete deleteRecordFromKudu(KuduTable kuduTable, Record record, List<String> fieldNames, Boolean ignoreNull) {
+    protected Delete deleteRecordFromKudu(KuduTable kuduTable, Record record, List<String> fieldNames, Boolean ignoreNull, Boolean lowercaseFields) {
         return mock(Delete.class);
     }
 
     @Override
-    protected Update updateRecordToKudu(KuduTable kuduTable, Record record, List<String> fieldNames, Boolean ignoreNull) {
+    protected Update updateRecordToKudu(KuduTable kuduTable, Record record, List<String> fieldNames, Boolean ignoreNull, Boolean lowercaseFields) {
         return mock(Update.class);
     }
 
diff --git a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/TestPutKudu.java b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/TestPutKudu.java
index ac7c0cd..9be6952 100644
--- a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/TestPutKudu.java
+++ b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/TestPutKudu.java
@@ -27,6 +27,7 @@ import org.apache.kudu.client.KuduSession;
 import org.apache.kudu.client.OperationResponse;
 import org.apache.kudu.client.RowError;
 import org.apache.kudu.client.RowErrorsAndOverflowStatus;
+import org.apache.kudu.client.PartialRow;
 import org.apache.kudu.client.SessionConfiguration.FlushMode;
 import org.apache.nifi.controller.AbstractControllerService;
 import org.apache.nifi.flowfile.FlowFile;
@@ -103,6 +104,7 @@ public class TestPutKudu {
         testRunner.setProperty(PutKudu.KUDU_MASTERS, DEFAULT_MASTERS);
         testRunner.setProperty(PutKudu.SKIP_HEAD_LINE, SKIP_HEAD_LINE);
         testRunner.setProperty(PutKudu.IGNORE_NULL, "true");
+        testRunner.setProperty(PutKudu.LOWERCASE_FIELD_NAMES, "true");
         testRunner.setProperty(PutKudu.RECORD_READER, "mock-reader-factory");
         testRunner.setProperty(PutKudu.INSERT_OPERATION, OperationType.INSERT.toString());
     }
@@ -366,27 +368,51 @@ public class TestPutKudu {
 
     @Test
     public void testBuildRow() {
-        buildPartialRow((long) 1, "foo", (short) 10);
+        buildPartialRow((long) 1, "foo", (short) 10, "id", "id", false);
     }
 
     @Test
     public void testBuildPartialRowNullable() {
-        buildPartialRow((long) 1, null, (short) 10);
+        buildPartialRow((long) 1, null, (short) 10, "id", "id",  false);
     }
 
     @Test(expected = IllegalArgumentException.class)
     public void testBuildPartialRowNullPrimaryKey() {
-        buildPartialRow(null, "foo", (short) 10);
+        buildPartialRow(null, "foo", (short) 10, "id", "id",  false);
     }
 
     @Test(expected = IllegalArgumentException.class)
     public void testBuildPartialRowNotNullable() {
-        buildPartialRow((long) 1, "foo", null);
+        buildPartialRow((long) 1, "foo", null, "id", "id", false);
     }
 
-    private void buildPartialRow(Long id, String name, Short age) {
+    @Test
+    public void testBuildPartialRowLowercaseFields() {
+        PartialRow row = buildPartialRow((long) 1, "foo", (short) 10, "id", "ID", true);
+        row.getLong("id");
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testBuildPartialRowLowercaseFieldsFalse() {
+        PartialRow row = buildPartialRow((long) 1, "foo", (short) 10, "id", "ID", false);
+        row.getLong("id");
+    }
+
+    @Test
+    public void testBuildPartialRowLowercaseFieldsKuduUpper() {
+        PartialRow row = buildPartialRow((long) 1, "foo", (short) 10, "ID", "ID", false);
+        row.getLong("ID");
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testBuildPartialRowLowercaseFieldsKuduUpperFail() {
+        PartialRow row = buildPartialRow((long) 1, "foo", (short) 10, "ID", "ID", true);
+        row.getLong("ID");
+    }
+
+    private PartialRow buildPartialRow(Long id, String name, Short age, String kuduIdName, String recordIdName, Boolean lowercaseFields) {
         final Schema kuduSchema = new Schema(Arrays.asList(
-            new ColumnSchema.ColumnSchemaBuilder("id", Type.INT64).key(true).build(),
+            new ColumnSchema.ColumnSchemaBuilder(kuduIdName, Type.INT64).key(true).build(),
             new ColumnSchema.ColumnSchemaBuilder("name", Type.STRING).nullable(true).build(),
             new ColumnSchema.ColumnSchemaBuilder("age", Type.INT16).nullable(false).build(),
             new ColumnSchema.ColumnSchemaBuilder("updated_at", Type.UNIXTIME_MICROS).nullable(false).build(),
@@ -395,25 +421,28 @@ public class TestPutKudu {
             ).build()));
 
         final RecordSchema schema = new SimpleRecordSchema(Arrays.asList(
-            new RecordField("id", RecordFieldType.BIGINT.getDataType()),
+            new RecordField(recordIdName, RecordFieldType.BIGINT.getDataType()),
             new RecordField("name", RecordFieldType.STRING.getDataType()),
             new RecordField("age", RecordFieldType.SHORT.getDataType()),
             new RecordField("updated_at", RecordFieldType.BIGINT.getDataType()),
             new RecordField("score", RecordFieldType.LONG.getDataType())));
 
         Map<String, Object> values = new HashMap<>();
-        values.put("id", id);
+        PartialRow row = kuduSchema.newPartialRow();
+        values.put(recordIdName, id);
         values.put("name", name);
         values.put("age", age);
         values.put("updated_at", System.currentTimeMillis() * 1000);
         values.put("score", 10000L);
         processor.buildPartialRow(
             kuduSchema,
-            kuduSchema.newPartialRow(),
+            row,
             new MapRecord(schema, values),
             schema.getFieldNames(),
-                true
+                true,
+            lowercaseFields
         );
+        return row;
     }
 
     private Tuple<Insert, OperationResponse> insert(boolean success) {