You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2019/11/07 23:25:20 UTC
[nifi] branch master updated: NIFI-6680: This closes #3748. Nifi
PutKudu processor - Convert record field names to lowercase
This is an automated email from the ASF dual-hosted git repository.
joewitt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/master by this push:
new eb366c8 NIFI-6680: This closes #3748. Nifi PutKudu processor - Convert record field names to lowercase
eb366c8 is described below
commit eb366c8d0ab6a071f0f593fcbf227856243fdd5b
Author: kevinmccarthy <00...@gmail.com>
AuthorDate: Mon Sep 16 15:52:24 2019 -0500
NIFI-6680: This closes #3748. Nifi PutKudu processor - Convert record field names to lowercase
Signed-off-by: Joe Witt <jo...@apache.org>
---
.../processors/kudu/AbstractKuduProcessor.java | 47 +++++++++++----------
.../org/apache/nifi/processors/kudu/PutKudu.java | 27 ++++++++----
.../apache/nifi/processors/kudu/MockPutKudu.java | 8 ++--
.../apache/nifi/processors/kudu/TestPutKudu.java | 49 +++++++++++++++++-----
4 files changed, 87 insertions(+), 44 deletions(-)
diff --git a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKuduProcessor.java b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKuduProcessor.java
index 6f946f4..fbf931b 100644
--- a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKuduProcessor.java
+++ b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKuduProcessor.java
@@ -166,15 +166,18 @@ public abstract class AbstractKuduProcessor extends AbstractProcessor {
}
@VisibleForTesting
- protected void buildPartialRow(Schema schema, PartialRow row, Record record, List<String> fieldNames, Boolean ignoreNull) {
-
- for (String colName : fieldNames) {
+ protected void buildPartialRow(Schema schema, PartialRow row, Record record, List<String> fieldNames, Boolean ignoreNull, Boolean lowercaseFields) {
+ for (String recordFieldName : fieldNames) {
+ String colName = recordFieldName;
+ if (lowercaseFields) {
+ colName = colName.toLowerCase();
+ }
int colIdx = this.getColumnIndex(schema, colName);
if (colIdx != -1) {
ColumnSchema colSchema = schema.getColumnByIndex(colIdx);
Type colType = colSchema.getType();
- if (record.getValue(colName) == null) {
+ if (record.getValue(recordFieldName) == null) {
if (schema.getColumnByIndex(colIdx).isKey()) {
throw new IllegalArgumentException(String.format("Can't set primary key column %s to null ", colName));
} else if(!schema.getColumnByIndex(colIdx).isNullable()) {
@@ -188,37 +191,37 @@ public abstract class AbstractKuduProcessor extends AbstractProcessor {
} else {
switch (colType.getDataType(colSchema.getTypeAttributes())) {
case BOOL:
- row.addBoolean(colIdx, record.getAsBoolean(colName));
+ row.addBoolean(colIdx, record.getAsBoolean(recordFieldName));
break;
case FLOAT:
- row.addFloat(colIdx, record.getAsFloat(colName));
+ row.addFloat(colIdx, record.getAsFloat(recordFieldName));
break;
case DOUBLE:
- row.addDouble(colIdx, record.getAsDouble(colName));
+ row.addDouble(colIdx, record.getAsDouble(recordFieldName));
break;
case BINARY:
- row.addBinary(colIdx, record.getAsString(colName).getBytes());
+ row.addBinary(colIdx, record.getAsString(recordFieldName).getBytes());
break;
case INT8:
- row.addByte(colIdx, record.getAsInt(colName).byteValue());
+ row.addByte(colIdx, record.getAsInt(recordFieldName).byteValue());
break;
case INT16:
- row.addShort(colIdx, record.getAsInt(colName).shortValue());
+ row.addShort(colIdx, record.getAsInt(recordFieldName).shortValue());
break;
case INT32:
- row.addInt(colIdx, record.getAsInt(colName));
+ row.addInt(colIdx, record.getAsInt(recordFieldName));
break;
case INT64:
case UNIXTIME_MICROS:
- row.addLong(colIdx, record.getAsLong(colName));
+ row.addLong(colIdx, record.getAsLong(recordFieldName));
break;
case STRING:
- row.addString(colIdx, record.getAsString(colName));
+ row.addString(colIdx, record.getAsString(recordFieldName));
break;
case DECIMAL32:
case DECIMAL64:
case DECIMAL128:
- row.addDecimal(colIdx, new BigDecimal(record.getAsString(colName)));
+ row.addDecimal(colIdx, new BigDecimal(record.getAsString(recordFieldName)));
break;
default:
throw new IllegalStateException(String.format("unknown column type %s", colType));
@@ -236,27 +239,27 @@ public abstract class AbstractKuduProcessor extends AbstractProcessor {
}
}
- protected Upsert upsertRecordToKudu(KuduTable kuduTable, Record record, List<String> fieldNames, Boolean ignoreNull) {
+ protected Upsert upsertRecordToKudu(KuduTable kuduTable, Record record, List<String> fieldNames, Boolean ignoreNull, Boolean lowercaseFields) {
Upsert upsert = kuduTable.newUpsert();
- buildPartialRow(kuduTable.getSchema(), upsert.getRow(), record, fieldNames, ignoreNull);
+ buildPartialRow(kuduTable.getSchema(), upsert.getRow(), record, fieldNames, ignoreNull, lowercaseFields);
return upsert;
}
- protected Insert insertRecordToKudu(KuduTable kuduTable, Record record, List<String> fieldNames, Boolean ignoreNull) {
+ protected Insert insertRecordToKudu(KuduTable kuduTable, Record record, List<String> fieldNames, Boolean ignoreNull, Boolean lowercaseFields) {
Insert insert = kuduTable.newInsert();
- buildPartialRow(kuduTable.getSchema(), insert.getRow(), record, fieldNames, ignoreNull);
+ buildPartialRow(kuduTable.getSchema(), insert.getRow(), record, fieldNames, ignoreNull, lowercaseFields);
return insert;
}
- protected Delete deleteRecordFromKudu(KuduTable kuduTable, Record record, List<String> fieldNames, Boolean ignoreNull) {
+ protected Delete deleteRecordFromKudu(KuduTable kuduTable, Record record, List<String> fieldNames, Boolean ignoreNull, Boolean lowercaseFields) {
Delete delete = kuduTable.newDelete();
- buildPartialRow(kuduTable.getSchema(), delete.getRow(), record, fieldNames, ignoreNull);
+ buildPartialRow(kuduTable.getSchema(), delete.getRow(), record, fieldNames, ignoreNull, lowercaseFields);
return delete;
}
- protected Update updateRecordToKudu(KuduTable kuduTable, Record record, List<String> fieldNames, Boolean ignoreNull) {
+ protected Update updateRecordToKudu(KuduTable kuduTable, Record record, List<String> fieldNames, Boolean ignoreNull, Boolean lowercaseFields) {
Update update = kuduTable.newUpdate();
- buildPartialRow(kuduTable.getSchema(), update.getRow(), record, fieldNames, ignoreNull);
+ buildPartialRow(kuduTable.getSchema(), update.getRow(), record, fieldNames, ignoreNull, lowercaseFields);
return update;
}
diff --git a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java
index 1b47337..6476886 100644
--- a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java
+++ b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java
@@ -99,6 +99,15 @@ public class PutKudu extends AbstractKuduProcessor {
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
+ protected static final PropertyDescriptor LOWERCASE_FIELD_NAMES = new Builder()
+ .name("Lowercase Field Names")
+ .description("Convert column names to lowercase when finding index of Kudu table columns")
+ .defaultValue("false")
+ .required(true)
+ .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+ .build();
+
protected static final PropertyDescriptor INSERT_OPERATION = new Builder()
.name("Insert Operation")
.displayName("Kudu Operation Type")
@@ -179,6 +188,7 @@ public class PutKudu extends AbstractKuduProcessor {
properties.add(TABLE_NAME);
properties.add(KERBEROS_CREDENTIALS_SERVICE);
properties.add(SKIP_HEAD_LINE);
+ properties.add(LOWERCASE_FIELD_NAMES);
properties.add(RECORD_READER);
properties.add(INSERT_OPERATION);
properties.add(FLUSH_MODE);
@@ -247,16 +257,17 @@ public class PutKudu extends AbstractKuduProcessor {
for (FlowFile flowFile : flowFiles) {
operationType = OperationType.valueOf(context.getProperty(INSERT_OPERATION).evaluateAttributeExpressions(flowFile).getValue());
Boolean ignoreNull = Boolean.valueOf(context.getProperty(IGNORE_NULL).evaluateAttributeExpressions(flowFile).getValue());
+ Boolean lowercaseFields = Boolean.valueOf(context.getProperty(LOWERCASE_FIELD_NAMES).evaluateAttributeExpressions(flowFile).getValue());
try (final InputStream in = session.read(flowFile);
final RecordReader recordReader = recordReaderFactory.createRecordReader(flowFile, in, getLogger())) {
- final List<String> fieldNames = recordReader.getSchema().getFieldNames();
final RecordSet recordSet = recordReader.createRecordSet();
+ final List<String> fieldNames = recordReader.getSchema().getFieldNames();
final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue();
final KuduTable kuduTable = kuduClient.openTable(tableName);
Record record = recordSet.next();
while (record != null) {
- Operation operation = getKuduOperationType(operationType, record, fieldNames, ignoreNull, kuduTable);
+ Operation operation = getKuduOperationType(operationType, record, fieldNames, ignoreNull, lowercaseFields, kuduTable);
// We keep track of mappings between Operations and their origins,
// so that we know which FlowFiles should be marked failure after buffered flush.
operationFlowFileMap.put(operation, flowFile);
@@ -343,18 +354,18 @@ public class PutKudu extends AbstractKuduProcessor {
return kuduSession;
}
- private Operation getKuduOperationType(OperationType operationType, Record record, List<String> fieldNames, Boolean ignoreNull, KuduTable kuduTable) {
+ private Operation getKuduOperationType(OperationType operationType, Record record, List<String> fieldNames, Boolean ignoreNull, Boolean lowercaseFields, KuduTable kuduTable) {
switch (operationType) {
case DELETE:
- return deleteRecordFromKudu(kuduTable, record, fieldNames, ignoreNull);
+ return deleteRecordFromKudu(kuduTable, record, fieldNames, ignoreNull, lowercaseFields);
case INSERT:
- return insertRecordToKudu(kuduTable, record, fieldNames, ignoreNull);
+ return insertRecordToKudu(kuduTable, record, fieldNames, ignoreNull, lowercaseFields);
case INSERT_IGNORE:
- return insertRecordToKudu(kuduTable, record, fieldNames, ignoreNull);
+ return insertRecordToKudu(kuduTable, record, fieldNames, ignoreNull, lowercaseFields);
case UPSERT:
- return upsertRecordToKudu(kuduTable, record, fieldNames, ignoreNull);
+ return upsertRecordToKudu(kuduTable, record, fieldNames, ignoreNull, lowercaseFields);
case UPDATE:
- return updateRecordToKudu(kuduTable, record, fieldNames, ignoreNull);
+ return updateRecordToKudu(kuduTable, record, fieldNames, ignoreNull, lowercaseFields);
default:
throw new IllegalArgumentException(String.format("OperationType: %s not supported by Kudu", operationType));
}
diff --git a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/MockPutKudu.java b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/MockPutKudu.java
index b5ce71f..f416e96 100644
--- a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/MockPutKudu.java
+++ b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/MockPutKudu.java
@@ -62,23 +62,23 @@ public class MockPutKudu extends PutKudu {
}
@Override
- protected Insert insertRecordToKudu(KuduTable kuduTable, Record record, List<String> fieldNames, Boolean ignoreNull) {
+ protected Insert insertRecordToKudu(KuduTable kuduTable, Record record, List<String> fieldNames, Boolean ignoreNull, Boolean lowercaseFields) {
Insert insert = insertQueue.poll();
return insert != null ? insert : mock(Insert.class);
}
@Override
- protected Upsert upsertRecordToKudu(KuduTable kuduTable, Record record, List<String> fieldNames, Boolean ignoreNull) {
+ protected Upsert upsertRecordToKudu(KuduTable kuduTable, Record record, List<String> fieldNames, Boolean ignoreNull, Boolean lowercaseFields) {
return mock(Upsert.class);
}
@Override
- protected Delete deleteRecordFromKudu(KuduTable kuduTable, Record record, List<String> fieldNames, Boolean ignoreNull) {
+ protected Delete deleteRecordFromKudu(KuduTable kuduTable, Record record, List<String> fieldNames, Boolean ignoreNull, Boolean lowercaseFields) {
return mock(Delete.class);
}
@Override
- protected Update updateRecordToKudu(KuduTable kuduTable, Record record, List<String> fieldNames, Boolean ignoreNull) {
+ protected Update updateRecordToKudu(KuduTable kuduTable, Record record, List<String> fieldNames, Boolean ignoreNull, Boolean lowercaseFields) {
return mock(Update.class);
}
diff --git a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/TestPutKudu.java b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/TestPutKudu.java
index ac7c0cd..9be6952 100644
--- a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/TestPutKudu.java
+++ b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/TestPutKudu.java
@@ -27,6 +27,7 @@ import org.apache.kudu.client.KuduSession;
import org.apache.kudu.client.OperationResponse;
import org.apache.kudu.client.RowError;
import org.apache.kudu.client.RowErrorsAndOverflowStatus;
+import org.apache.kudu.client.PartialRow;
import org.apache.kudu.client.SessionConfiguration.FlushMode;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.flowfile.FlowFile;
@@ -103,6 +104,7 @@ public class TestPutKudu {
testRunner.setProperty(PutKudu.KUDU_MASTERS, DEFAULT_MASTERS);
testRunner.setProperty(PutKudu.SKIP_HEAD_LINE, SKIP_HEAD_LINE);
testRunner.setProperty(PutKudu.IGNORE_NULL, "true");
+ testRunner.setProperty(PutKudu.LOWERCASE_FIELD_NAMES, "true");
testRunner.setProperty(PutKudu.RECORD_READER, "mock-reader-factory");
testRunner.setProperty(PutKudu.INSERT_OPERATION, OperationType.INSERT.toString());
}
@@ -366,27 +368,51 @@ public class TestPutKudu {
@Test
public void testBuildRow() {
- buildPartialRow((long) 1, "foo", (short) 10);
+ buildPartialRow((long) 1, "foo", (short) 10, "id", "id", false);
}
@Test
public void testBuildPartialRowNullable() {
- buildPartialRow((long) 1, null, (short) 10);
+ buildPartialRow((long) 1, null, (short) 10, "id", "id", false);
}
@Test(expected = IllegalArgumentException.class)
public void testBuildPartialRowNullPrimaryKey() {
- buildPartialRow(null, "foo", (short) 10);
+ buildPartialRow(null, "foo", (short) 10, "id", "id", false);
}
@Test(expected = IllegalArgumentException.class)
public void testBuildPartialRowNotNullable() {
- buildPartialRow((long) 1, "foo", null);
+ buildPartialRow((long) 1, "foo", null, "id", "id", false);
}
- private void buildPartialRow(Long id, String name, Short age) {
+ @Test
+ public void testBuildPartialRowLowercaseFields() {
+ PartialRow row = buildPartialRow((long) 1, "foo", (short) 10, "id", "ID", true);
+ row.getLong("id");
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testBuildPartialRowLowercaseFieldsFalse() {
+ PartialRow row = buildPartialRow((long) 1, "foo", (short) 10, "id", "ID", false);
+ row.getLong("id");
+ }
+
+ @Test
+ public void testBuildPartialRowLowercaseFieldsKuduUpper() {
+ PartialRow row = buildPartialRow((long) 1, "foo", (short) 10, "ID", "ID", false);
+ row.getLong("ID");
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testBuildPartialRowLowercaseFieldsKuduUpperFail() {
+ PartialRow row = buildPartialRow((long) 1, "foo", (short) 10, "ID", "ID", true);
+ row.getLong("ID");
+ }
+
+ private PartialRow buildPartialRow(Long id, String name, Short age, String kuduIdName, String recordIdName, Boolean lowercaseFields) {
final Schema kuduSchema = new Schema(Arrays.asList(
- new ColumnSchema.ColumnSchemaBuilder("id", Type.INT64).key(true).build(),
+ new ColumnSchema.ColumnSchemaBuilder(kuduIdName, Type.INT64).key(true).build(),
new ColumnSchema.ColumnSchemaBuilder("name", Type.STRING).nullable(true).build(),
new ColumnSchema.ColumnSchemaBuilder("age", Type.INT16).nullable(false).build(),
new ColumnSchema.ColumnSchemaBuilder("updated_at", Type.UNIXTIME_MICROS).nullable(false).build(),
@@ -395,25 +421,28 @@ public class TestPutKudu {
).build()));
final RecordSchema schema = new SimpleRecordSchema(Arrays.asList(
- new RecordField("id", RecordFieldType.BIGINT.getDataType()),
+ new RecordField(recordIdName, RecordFieldType.BIGINT.getDataType()),
new RecordField("name", RecordFieldType.STRING.getDataType()),
new RecordField("age", RecordFieldType.SHORT.getDataType()),
new RecordField("updated_at", RecordFieldType.BIGINT.getDataType()),
new RecordField("score", RecordFieldType.LONG.getDataType())));
Map<String, Object> values = new HashMap<>();
- values.put("id", id);
+ PartialRow row = kuduSchema.newPartialRow();
+ values.put(recordIdName, id);
values.put("name", name);
values.put("age", age);
values.put("updated_at", System.currentTimeMillis() * 1000);
values.put("score", 10000L);
processor.buildPartialRow(
kuduSchema,
- kuduSchema.newPartialRow(),
+ row,
new MapRecord(schema, values),
schema.getFieldNames(),
- true
+ true,
+ lowercaseFields
);
+ return row;
}
private Tuple<Insert, OperationResponse> insert(boolean success) {