You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2020/10/27 23:03:48 UTC
[nifi] branch main updated: NIFI-7952: Allow RecordPath to be used
for specifying the Insertion Operation and the data to be inserted into
Kudu
This is an automated email from the ASF dual-hosted git repository.
markap14 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 7496899 NIFI-7952: Allow RecordPath to be used for specifying the Insertion Operation and the data to be inserted into Kudu
7496899 is described below
commit 74968991d5034c96c481386f76929e9a49cab8a1
Author: Mark Payne <ma...@hotmail.com>
AuthorDate: Tue Oct 27 11:40:12 2020 -0400
NIFI-7952: Allow RecordPath to be used for specifying the Insertion Operation and the data to be inserted into Kudu
---
.../nifi-kudu-bundle/nifi-kudu-processors/pom.xml | 5 +
.../processors/kudu/AbstractKuduProcessor.java | 140 ++++++-------
.../org/apache/nifi/processors/kudu/PutKudu.java | 227 +++++++++++++++------
3 files changed, 241 insertions(+), 131 deletions(-)
diff --git a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/pom.xml b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/pom.xml
index 34dea1e..bf38d47 100644
--- a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/pom.xml
@@ -85,6 +85,11 @@
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-record-path</artifactId>
+ <version>1.13.0-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
<artifactId>nifi-security-utils</artifactId>
<version>1.13.0-SNAPSHOT</version>
</dependency>
diff --git a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKuduProcessor.java b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKuduProcessor.java
index 520fca3..0ae8e40 100644
--- a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKuduProcessor.java
+++ b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKuduProcessor.java
@@ -179,8 +179,8 @@ public abstract class AbstractKuduProcessor extends AbstractProcessor {
protected KuduClient buildClient(final ProcessContext context) {
final String masters = context.getProperty(KUDU_MASTERS).evaluateAttributeExpressions().getValue();
- final Integer operationTimeout = context.getProperty(KUDU_OPERATION_TIMEOUT_MS).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS).intValue();
- final Integer adminOperationTimeout = context.getProperty(KUDU_KEEP_ALIVE_PERIOD_TIMEOUT_MS).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS).intValue();
+ final int operationTimeout = context.getProperty(KUDU_OPERATION_TIMEOUT_MS).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS).intValue();
+ final int adminOperationTimeout = context.getProperty(KUDU_KEEP_ALIVE_PERIOD_TIMEOUT_MS).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS).intValue();
return new KuduClient.KuduClientBuilder(masters)
.defaultOperationTimeoutMs(operationTimeout)
@@ -295,68 +295,72 @@ public abstract class AbstractKuduProcessor extends AbstractProcessor {
if (lowercaseFields) {
colName = colName.toLowerCase();
}
- int colIdx = this.getColumnIndex(schema, colName);
- if (colIdx != -1) {
- ColumnSchema colSchema = schema.getColumnByIndex(colIdx);
- Type colType = colSchema.getType();
- if (record.getValue(recordFieldName) == null) {
- if (schema.getColumnByIndex(colIdx).isKey()) {
- throw new IllegalArgumentException(String.format("Can't set primary key column %s to null ", colName));
- } else if(!schema.getColumnByIndex(colIdx).isNullable()) {
- throw new IllegalArgumentException(String.format("Can't set column %s to null ", colName));
- }
-
- if (!ignoreNull) {
- row.setNull(colName);
- }
- } else {
- Object value = record.getValue(recordFieldName);
- switch (colType) {
- case BOOL:
- row.addBoolean(colIdx, DataTypeUtils.toBoolean(value, recordFieldName));
- break;
- case INT8:
- row.addByte(colIdx, DataTypeUtils.toByte(value, recordFieldName));
- break;
- case INT16:
- row.addShort(colIdx, DataTypeUtils.toShort(value, recordFieldName));
- break;
- case INT32:
- row.addInt(colIdx, DataTypeUtils.toInteger(value, recordFieldName));
- break;
- case INT64:
- row.addLong(colIdx, DataTypeUtils.toLong(value, recordFieldName));
- break;
- case UNIXTIME_MICROS:
- DataType fieldType = record.getSchema().getDataType(recordFieldName).get();
- Timestamp timestamp = DataTypeUtils.toTimestamp(record.getValue(recordFieldName),
- () -> DataTypeUtils.getDateFormat(fieldType.getFormat()), recordFieldName);
- row.addTimestamp(colIdx, timestamp);
- break;
- case STRING:
- row.addString(colIdx, DataTypeUtils.toString(value, recordFieldName));
- break;
- case BINARY:
- row.addBinary(colIdx, DataTypeUtils.toString(value, recordFieldName).getBytes());
- break;
- case FLOAT:
- row.addFloat(colIdx, DataTypeUtils.toFloat(value, recordFieldName));
- break;
- case DOUBLE:
- row.addDouble(colIdx, DataTypeUtils.toDouble(value, recordFieldName));
- break;
- case DECIMAL:
- row.addDecimal(colIdx, new BigDecimal(DataTypeUtils.toString(value, recordFieldName)));
- break;
- case VARCHAR:
- row.addVarchar(colIdx, DataTypeUtils.toString(value, recordFieldName));
- break;
- case DATE:
- row.addDate(colIdx, DataTypeUtils.toDate(value, () -> DataTypeUtils.getDateFormat(RecordFieldType.DATE.getDefaultFormat()), recordFieldName));
- break;
- default:
- throw new IllegalStateException(String.format("unknown column type %s", colType));
- }
+
+ if (!schema.hasColumn(colName)) {
+ continue;
+ }
+
+ final int columnIndex = schema.getColumnIndex(colName);
+ final ColumnSchema colSchema = schema.getColumnByIndex(columnIndex);
+ final Type colType = colSchema.getType();
+
+ if (record.getValue(recordFieldName) == null) {
+ if (schema.getColumnByIndex(columnIndex).isKey()) {
+ throw new IllegalArgumentException(String.format("Can't set primary key column %s to null ", colName));
+ } else if(!schema.getColumnByIndex(columnIndex).isNullable()) {
+ throw new IllegalArgumentException(String.format("Can't set column %s to null ", colName));
+ }
+
+ if (!ignoreNull) {
+ row.setNull(colName);
+ }
+ } else {
+ Object value = record.getValue(recordFieldName);
+ switch (colType) {
+ case BOOL:
+ row.addBoolean(columnIndex, DataTypeUtils.toBoolean(value, recordFieldName));
+ break;
+ case INT8:
+ row.addByte(columnIndex, DataTypeUtils.toByte(value, recordFieldName));
+ break;
+ case INT16:
+ row.addShort(columnIndex, DataTypeUtils.toShort(value, recordFieldName));
+ break;
+ case INT32:
+ row.addInt(columnIndex, DataTypeUtils.toInteger(value, recordFieldName));
+ break;
+ case INT64:
+ row.addLong(columnIndex, DataTypeUtils.toLong(value, recordFieldName));
+ break;
+ case UNIXTIME_MICROS:
+ DataType fieldType = record.getSchema().getDataType(recordFieldName).get();
+ Timestamp timestamp = DataTypeUtils.toTimestamp(record.getValue(recordFieldName),
+ () -> DataTypeUtils.getDateFormat(fieldType.getFormat()), recordFieldName);
+ row.addTimestamp(columnIndex, timestamp);
+ break;
+ case STRING:
+ row.addString(columnIndex, DataTypeUtils.toString(value, recordFieldName));
+ break;
+ case BINARY:
+ row.addBinary(columnIndex, DataTypeUtils.toString(value, recordFieldName).getBytes());
+ break;
+ case FLOAT:
+ row.addFloat(columnIndex, DataTypeUtils.toFloat(value, recordFieldName));
+ break;
+ case DOUBLE:
+ row.addDouble(columnIndex, DataTypeUtils.toDouble(value, recordFieldName));
+ break;
+ case DECIMAL:
+ row.addDecimal(columnIndex, new BigDecimal(DataTypeUtils.toString(value, recordFieldName)));
+ break;
+ case VARCHAR:
+ row.addVarchar(columnIndex, DataTypeUtils.toString(value, recordFieldName));
+ break;
+ case DATE:
+ row.addDate(columnIndex, DataTypeUtils.toDate(value, () -> DataTypeUtils.getDateFormat(RecordFieldType.DATE.getDefaultFormat()), recordFieldName));
+ break;
+ default:
+ throw new IllegalStateException(String.format("unknown column type %s", colType));
}
}
}
@@ -425,14 +429,6 @@ public abstract class AbstractKuduProcessor extends AbstractProcessor {
return alterTable;
}
- private int getColumnIndex(Schema columns, String colName) {
- try {
- return columns.getColumnIndex(colName);
- } catch (Exception ex) {
- return -1;
- }
- }
-
protected Upsert upsertRecordToKudu(KuduTable kuduTable, Record record, List<String> fieldNames, Boolean ignoreNull, Boolean lowercaseFields) {
Upsert upsert = kuduTable.newUpsert();
buildPartialRow(kuduTable.getSchema(), upsert.getRow(), record, fieldNames, ignoreNull, lowercaseFields);
diff --git a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java
index 3e8e199..c811b6b 100644
--- a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java
+++ b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java
@@ -40,34 +40,42 @@ import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.Validator;
-import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.record.path.FieldValue;
+import org.apache.nifi.record.path.RecordPath;
+import org.apache.nifi.record.path.RecordPathResult;
+import org.apache.nifi.record.path.validation.RecordPathValidator;
import org.apache.nifi.security.krb.KerberosAction;
import org.apache.nifi.security.krb.KerberosUser;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSet;
import javax.security.auth.login.LoginException;
-import java.io.IOException;
import java.io.InputStream;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.function.Function;
import java.util.stream.Collectors;
-import java.util.stream.Stream;
+
+import static org.apache.nifi.expression.ExpressionLanguageScope.FLOWFILE_ATTRIBUTES;
+import static org.apache.nifi.expression.ExpressionLanguageScope.NONE;
+import static org.apache.nifi.expression.ExpressionLanguageScope.VARIABLE_REGISTRY;
@EventDriven
@SupportsBatching
@@ -86,7 +94,7 @@ public class PutKudu extends AbstractKuduProcessor {
.description("The name of the Kudu Table to put data into")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .expressionLanguageSupported(FLOWFILE_ATTRIBUTES)
.build();
public static final PropertyDescriptor RECORD_READER = new Builder()
@@ -113,7 +121,7 @@ public class PutKudu extends AbstractKuduProcessor {
.description("Convert column names to lowercase when finding index of Kudu table columns")
.defaultValue("false")
.required(true)
- .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .expressionLanguageSupported(FLOWFILE_ATTRIBUTES)
.addValidator(StandardValidators.BOOLEAN_VALIDATOR)
.build();
@@ -123,10 +131,31 @@ public class PutKudu extends AbstractKuduProcessor {
"are encountered, the Kudu table will be altered to include new columns for those fields.")
.defaultValue("false")
.required(true)
- .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .expressionLanguageSupported(FLOWFILE_ATTRIBUTES)
.addValidator(StandardValidators.BOOLEAN_VALIDATOR)
.build();
+ static final PropertyDescriptor DATA_RECORD_PATH = new Builder()
+ .name("Data RecordPath")
+ .displayName("Data RecordPath")
+ .description("If specified, this property denotes a RecordPath that will be evaluated against each incoming Record and the Record that results from evaluating the RecordPath will be sent to" +
+ " Kudu instead of sending the entire incoming Record. If not specified, the entire incoming Record will be published to Kudu.")
+ .required(false)
+ .addValidator(new RecordPathValidator())
+ .expressionLanguageSupported(NONE)
+ .build();
+
+ static final PropertyDescriptor OPERATION_RECORD_PATH = new Builder()
+ .name("Operation RecordPath")
+ .displayName("Operation RecordPath")
+ .description("If specified, this property denotes a RecordPath that will be evaluated against each incoming Record in order to determine the Kudu Operation Type. When evaluated, the " +
+ "RecordPath must evaluate to one of hte valid Kudu Operation Types, or the incoming FlowFile will be routed to failure. If this property is specified, the <Kudu Operation Type> property" +
+ " will be ignored.")
+ .required(false)
+ .addValidator(new RecordPathValidator())
+ .expressionLanguageSupported(NONE)
+ .build();
+
protected static final Validator OperationTypeValidator = new Validator() {
@Override
public ValidationResult validate(String subject, String value, ValidationContext context) {
@@ -156,9 +185,10 @@ public class PutKudu extends AbstractKuduProcessor {
.displayName("Kudu Operation Type")
.description("Specify operationType for this processor.\n" +
"Valid values are: " +
- Arrays.stream(OperationType.values()).map(Enum::toString).collect(Collectors.joining(", ")))
+ Arrays.stream(OperationType.values()).map(Enum::toString).collect(Collectors.joining(", ")) +
+ ". This Property will be ignored if the <Operation RecordPath> property is set.")
.defaultValue(OperationType.INSERT.toString())
- .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .expressionLanguageSupported(FLOWFILE_ATTRIBUTES)
.addValidator(OperationTypeValidator)
.build();
@@ -184,7 +214,7 @@ public class PutKudu extends AbstractKuduProcessor {
.defaultValue("1")
.required(true)
.addValidator(StandardValidators.createLongValidator(1, 100000, true))
- .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .expressionLanguageSupported(VARIABLE_REGISTRY)
.build();
protected static final PropertyDescriptor BATCH_SIZE = new Builder()
@@ -196,7 +226,7 @@ public class PutKudu extends AbstractKuduProcessor {
.defaultValue("100")
.required(true)
.addValidator(StandardValidators.createLongValidator(1, 100000, true))
- .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .expressionLanguageSupported(VARIABLE_REGISTRY)
.build();
protected static final PropertyDescriptor IGNORE_NULL = new Builder()
@@ -205,7 +235,7 @@ public class PutKudu extends AbstractKuduProcessor {
.defaultValue("false")
.required(true)
.addValidator(StandardValidators.BOOLEAN_VALIDATOR)
- .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .expressionLanguageSupported(FLOWFILE_ATTRIBUTES)
.build();
protected static final Relationship REL_SUCCESS = new Relationship.Builder()
@@ -220,9 +250,11 @@ public class PutKudu extends AbstractKuduProcessor {
public static final String RECORD_COUNT_ATTR = "record.count";
// Properties set in onScheduled.
- protected int batchSize = 100;
- protected int ffbatch = 1;
- protected SessionConfiguration.FlushMode flushMode;
+ private volatile int batchSize = 100;
+ private volatile int ffbatch = 1;
+ private volatile SessionConfiguration.FlushMode flushMode;
+ private volatile Function<Record, OperationType> recordPathOperationType;
+ private volatile RecordPath dataRecordPath;
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
@@ -236,6 +268,8 @@ public class PutKudu extends AbstractKuduProcessor {
properties.add(LOWERCASE_FIELD_NAMES);
properties.add(HANDLE_SCHEMA_DRIFT);
properties.add(RECORD_READER);
+ properties.add(DATA_RECORD_PATH);
+ properties.add(OPERATION_RECORD_PATH);
properties.add(INSERT_OPERATION);
properties.add(FLUSH_MODE);
properties.add(FLOWFILE_BATCH_SIZE);
@@ -255,11 +289,22 @@ public class PutKudu extends AbstractKuduProcessor {
}
@OnScheduled
- public void onScheduled(final ProcessContext context) throws IOException, LoginException {
+ public void onScheduled(final ProcessContext context) throws LoginException {
batchSize = context.getProperty(BATCH_SIZE).evaluateAttributeExpressions().asInteger();
ffbatch = context.getProperty(FLOWFILE_BATCH_SIZE).evaluateAttributeExpressions().asInteger();
flushMode = SessionConfiguration.FlushMode.valueOf(context.getProperty(FLUSH_MODE).getValue().toUpperCase());
createKerberosUserAndOrKuduClient(context);
+
+ final String operationRecordPathValue = context.getProperty(OPERATION_RECORD_PATH).getValue();
+ if (operationRecordPathValue == null) {
+ recordPathOperationType = null;
+ } else {
+ final RecordPath recordPath = RecordPath.compile(operationRecordPathValue);
+ recordPathOperationType = new RecordPathOperationType(recordPath);
+ }
+
+ final String dataRecordPathValue = context.getProperty(DATA_RECORD_PATH).getValue();
+ dataRecordPath = dataRecordPathValue == null ? null : RecordPath.compile(dataRecordPathValue);
}
@Override
@@ -301,83 +346,119 @@ public class PutKudu extends AbstractKuduProcessor {
final RecordReader recordReader = recordReaderFactory.createRecordReader(flowFile, in, getLogger())) {
final String tableName = getEvaluatedProperty(TABLE_NAME, context, flowFile);
- final OperationType operationType = OperationType.valueOf(getEvaluatedProperty(INSERT_OPERATION, context, flowFile).toUpperCase());
final Boolean ignoreNull = Boolean.valueOf(getEvaluatedProperty(IGNORE_NULL, context, flowFile));
final Boolean lowercaseFields = Boolean.valueOf(getEvaluatedProperty(LOWERCASE_FIELD_NAMES, context, flowFile));
- final Boolean handleSchemaDrift = Boolean.valueOf(getEvaluatedProperty(HANDLE_SCHEMA_DRIFT, context, flowFile));
+ final boolean handleSchemaDrift = Boolean.parseBoolean(getEvaluatedProperty(HANDLE_SCHEMA_DRIFT, context, flowFile));
+
+ final Function<Record, OperationType> operationTypeFunction;
+ if (recordPathOperationType == null) {
+ final OperationType staticOperationType = OperationType.valueOf(getEvaluatedProperty(INSERT_OPERATION, context, flowFile).toUpperCase());
+ operationTypeFunction = record -> staticOperationType;
+ } else {
+ operationTypeFunction = recordPathOperationType;
+ }
final RecordSet recordSet = recordReader.createRecordSet();
- final List<String> fieldNames = recordReader.getSchema().getFieldNames();
KuduTable kuduTable = kuduClient.openTable(tableName);
// If handleSchemaDrift is true, check for any missing columns and alter the Kudu table to add them.
if (handleSchemaDrift) {
final Schema schema = kuduTable.getSchema();
- Stream<RecordField> fields = recordReader.getSchema().getFields().stream();
- List<RecordField> missing = fields.filter(field -> !schema.hasColumn(
- lowercaseFields ? field.getFieldName().toLowerCase() : field.getFieldName()))
- .collect(Collectors.toList());
+ final List<RecordField> missing = recordReader.getSchema().getFields().stream()
+ .filter(field -> !schema.hasColumn(lowercaseFields ? field.getFieldName().toLowerCase() : field.getFieldName()))
+ .collect(Collectors.toList());
+
if (!missing.isEmpty()) {
- getLogger().info("adding {} columns to table '{}' to handle schema drift",
- new Object[]{missing.size(), tableName});
+ getLogger().info("adding {} columns to table '{}' to handle schema drift", new Object[]{missing.size(), tableName});
+
// Add each column one at a time to avoid failing if some of the missing columns
// we created by a concurrent thread or application attempting to handle schema drift.
- for (RecordField field : missing) {
+ for (final RecordField field : missing) {
try {
final String columnName = lowercaseFields ? field.getFieldName().toLowerCase() : field.getFieldName();
kuduClient.alterTable(tableName, getAddNullableColumnStatement(columnName, field.getDataType()));
- } catch (KuduException e) {
+ } catch (final KuduException e) {
// Ignore the exception if the column already exists due to concurrent
// threads or applications attempting to handle schema drift.
if (e.getStatus().isAlreadyPresent()) {
- getLogger().info("column already exists in table '{}' while handling schema drift",
- new Object[]{tableName});
+ getLogger().info("Column already exists in table '{}' while handling schema drift", new Object[]{tableName});
} else {
throw new ProcessException(e);
}
}
}
+
// Re-open the table to get the new schema.
kuduTable = kuduClient.openTable(tableName);
}
}
- // In the case of INSERT_IGNORE the Kudu session is modified to ignore row errors.
- // Because the session is shared across flow files, for batching efficiency, we
- // need to flush when changing to and from INSERT_IGNORE operation types.
- // This should be updated and simplified when KUDU-1563 is completed.
- if (prevOperationType != operationType && (prevOperationType == OperationType.INSERT_IGNORE || operationType == OperationType.INSERT_IGNORE)) {
- flushKuduSession(kuduSession, false, pendingRowErrors);
- kuduSession.setIgnoreAllDuplicateRows(operationType == OperationType.INSERT_IGNORE);
- }
- prevOperationType = operationType;
-
Record record = recordSet.next();
- while (record != null) {
- Operation operation = createKuduOperation(operationType, record, fieldNames, ignoreNull, lowercaseFields, kuduTable);
- // We keep track of mappings between Operations and their origins,
- // so that we know which FlowFiles should be marked failure after buffered flush.
- operationFlowFileMap.put(operation, flowFile);
-
- // Flush mutation buffer of KuduSession to avoid "MANUAL_FLUSH is enabled
- // but the buffer is too big" error. This can happen when flush mode is
- // MANUAL_FLUSH and a FlowFile has more than one records.
- if (numBuffered == batchSize && flushMode == SessionConfiguration.FlushMode.MANUAL_FLUSH) {
- numBuffered = 0;
- flushKuduSession(kuduSession, false, pendingRowErrors);
+ recordReaderLoop: while (record != null) {
+ final OperationType operationType = operationTypeFunction.apply(record);
+
+ final List<Record> dataRecords;
+ if (dataRecordPath == null) {
+ dataRecords = Collections.singletonList(record);
+ } else {
+ final RecordPathResult result = dataRecordPath.evaluate(record);
+ final List<FieldValue> fieldValues = result.getSelectedFields().collect(Collectors.toList());
+ if (fieldValues.isEmpty()) {
+ throw new ProcessException("RecordPath " + dataRecordPath.getPath() + " evaluated against Record yielded no results.");
+ }
+
+ for (final FieldValue fieldValue : fieldValues) {
+ final RecordFieldType fieldType = fieldValue.getField().getDataType().getFieldType();
+ if (fieldType != RecordFieldType.RECORD) {
+ throw new ProcessException("RecordPath " + dataRecordPath.getPath() + " evaluated against Record expected to return one or more Records but encountered field of type" +
+ " " + fieldType);
+ }
+ }
+
+ dataRecords = new ArrayList<>(fieldValues.size());
+ for (final FieldValue fieldValue : fieldValues) {
+ dataRecords.add((Record) fieldValue.getValue());
+ }
}
- // OperationResponse is returned only when flush mode is set to AUTO_FLUSH_SYNC
- OperationResponse response = kuduSession.apply(operation);
- if (response != null && response.hasRowError()) {
- // Stop processing the records on the first error.
- // Note that Kudu does not support rolling back of previous operations.
- flowFileFailures.put(flowFile, response.getRowError());
- break;
+ for (final Record dataRecord : dataRecords) {
+ // In the case of INSERT_IGNORE the Kudu session is modified to ignore row errors.
+ // Because the session is shared across flow files, for batching efficiency, we
+ // need to flush when changing to and from INSERT_IGNORE operation types.
+ // This should be updated and simplified when KUDU-1563 is completed.
+ if (prevOperationType != operationType && (prevOperationType == OperationType.INSERT_IGNORE || operationType == OperationType.INSERT_IGNORE)) {
+ flushKuduSession(kuduSession, false, pendingRowErrors);
+ kuduSession.setIgnoreAllDuplicateRows(operationType == OperationType.INSERT_IGNORE);
+ }
+ prevOperationType = operationType;
+
+ final List<String> fieldNames = dataRecord.getSchema().getFieldNames();
+ Operation operation = createKuduOperation(operationType, dataRecord, fieldNames, ignoreNull, lowercaseFields, kuduTable);
+ // We keep track of mappings between Operations and their origins,
+ // so that we know which FlowFiles should be marked failure after buffered flush.
+ operationFlowFileMap.put(operation, flowFile);
+
+ // Flush mutation buffer of KuduSession to avoid "MANUAL_FLUSH is enabled
+ // but the buffer is too big" error. This can happen when flush mode is
+ // MANUAL_FLUSH and a FlowFile has more than one records.
+ if (numBuffered == batchSize && flushMode == SessionConfiguration.FlushMode.MANUAL_FLUSH) {
+ numBuffered = 0;
+ flushKuduSession(kuduSession, false, pendingRowErrors);
+ }
+
+ // OperationResponse is returned only when flush mode is set to AUTO_FLUSH_SYNC
+ OperationResponse response = kuduSession.apply(operation);
+ if (response != null && response.hasRowError()) {
+ // Stop processing the records on the first error.
+ // Note that Kudu does not support rolling back of previous operations.
+ flowFileFailures.put(flowFile, response.getRowError());
+ break recordReaderLoop;
+ }
+
+ numBuffered++;
+ numRecords.merge(flowFile, 1, Integer::sum);
}
- numBuffered++;
- numRecords.merge(flowFile, 1, Integer::sum);
record = recordSet.next();
}
} catch (Exception ex) {
@@ -460,4 +541,32 @@ public class PutKudu extends AbstractKuduProcessor {
throw new IllegalArgumentException(String.format("OperationType: %s not supported by Kudu", operationType));
}
}
+
+ private static class RecordPathOperationType implements Function<Record, OperationType> {
+ private final RecordPath recordPath;
+
+ public RecordPathOperationType(final RecordPath recordPath) {
+ this.recordPath = recordPath;
+ }
+
+ @Override
+ public OperationType apply(final Record record) {
+ final RecordPathResult recordPathResult = recordPath.evaluate(record);
+ final List<FieldValue> resultList = recordPathResult.getSelectedFields().distinct().collect(Collectors.toList());
+ if (resultList.isEmpty()) {
+ throw new ProcessException("Evaluated RecordPath " + recordPath.getPath() + " against Record but got no results");
+ }
+
+ if (resultList.size() > 1) {
+ throw new ProcessException("Evaluated RecordPath " + recordPath.getPath() + " against Record and received multiple distinct results (" + resultList + ")");
+ }
+
+ final String resultValue = String.valueOf(resultList.get(0).getValue());
+ try {
+ return OperationType.valueOf(resultValue.toUpperCase());
+ } catch (final IllegalArgumentException iae) {
+ throw new ProcessException("Evaluated RecordPath " + recordPath.getPath() + " against Record to determine Kudu Operation Type but found invalid value: " + resultValue);
+ }
+ }
+ }
}