You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2020/10/27 23:03:48 UTC

[nifi] branch main updated: NIFI-7952: Allow RecordPath to be used for specifying the Insertion Operation and the data to be inserted into Kudu

This is an automated email from the ASF dual-hosted git repository.

markap14 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 7496899  NIFI-7952: Allow RecordPath to be used for specifying the Insertion Operation and the data to be inserted into Kudu
7496899 is described below

commit 74968991d5034c96c481386f76929e9a49cab8a1
Author: Mark Payne <ma...@hotmail.com>
AuthorDate: Tue Oct 27 11:40:12 2020 -0400

    NIFI-7952: Allow RecordPath to be used for specifying the Insertion Operation and the data to be inserted into Kudu
---
 .../nifi-kudu-bundle/nifi-kudu-processors/pom.xml  |   5 +
 .../processors/kudu/AbstractKuduProcessor.java     | 140 ++++++-------
 .../org/apache/nifi/processors/kudu/PutKudu.java   | 227 +++++++++++++++------
 3 files changed, 241 insertions(+), 131 deletions(-)

diff --git a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/pom.xml b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/pom.xml
index 34dea1e..bf38d47 100644
--- a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/pom.xml
@@ -85,6 +85,11 @@
         </dependency>
         <dependency>
             <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-record-path</artifactId>
+            <version>1.13.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
             <artifactId>nifi-security-utils</artifactId>
             <version>1.13.0-SNAPSHOT</version>
         </dependency>
diff --git a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKuduProcessor.java b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKuduProcessor.java
index 520fca3..0ae8e40 100644
--- a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKuduProcessor.java
+++ b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKuduProcessor.java
@@ -179,8 +179,8 @@ public abstract class AbstractKuduProcessor extends AbstractProcessor {
 
     protected KuduClient buildClient(final ProcessContext context) {
         final String masters = context.getProperty(KUDU_MASTERS).evaluateAttributeExpressions().getValue();
-        final Integer operationTimeout = context.getProperty(KUDU_OPERATION_TIMEOUT_MS).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS).intValue();
-        final Integer adminOperationTimeout = context.getProperty(KUDU_KEEP_ALIVE_PERIOD_TIMEOUT_MS).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS).intValue();
+        final int operationTimeout = context.getProperty(KUDU_OPERATION_TIMEOUT_MS).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS).intValue();
+        final int adminOperationTimeout = context.getProperty(KUDU_KEEP_ALIVE_PERIOD_TIMEOUT_MS).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS).intValue();
 
         return new KuduClient.KuduClientBuilder(masters)
                 .defaultOperationTimeoutMs(operationTimeout)
@@ -295,68 +295,72 @@ public abstract class AbstractKuduProcessor extends AbstractProcessor {
             if (lowercaseFields) {
                 colName = colName.toLowerCase();
             }
-            int colIdx = this.getColumnIndex(schema, colName);
-            if (colIdx != -1) {
-                ColumnSchema colSchema = schema.getColumnByIndex(colIdx);
-                Type colType = colSchema.getType();
-                if (record.getValue(recordFieldName) == null) {
-                    if (schema.getColumnByIndex(colIdx).isKey()) {
-                        throw new IllegalArgumentException(String.format("Can't set primary key column %s to null ", colName));
-                    } else if(!schema.getColumnByIndex(colIdx).isNullable()) {
-                        throw new IllegalArgumentException(String.format("Can't set column %s to null ", colName));
-                    }
-
-                    if (!ignoreNull) {
-                        row.setNull(colName);
-                    }
-                } else {
-                    Object value = record.getValue(recordFieldName);
-                    switch (colType) {
-                        case BOOL:
-                            row.addBoolean(colIdx, DataTypeUtils.toBoolean(value, recordFieldName));
-                            break;
-                        case INT8:
-                            row.addByte(colIdx, DataTypeUtils.toByte(value, recordFieldName));
-                            break;
-                        case INT16:
-                            row.addShort(colIdx,  DataTypeUtils.toShort(value, recordFieldName));
-                            break;
-                        case INT32:
-                            row.addInt(colIdx,  DataTypeUtils.toInteger(value, recordFieldName));
-                            break;
-                        case INT64:
-                            row.addLong(colIdx,  DataTypeUtils.toLong(value, recordFieldName));
-                            break;
-                        case UNIXTIME_MICROS:
-                            DataType fieldType = record.getSchema().getDataType(recordFieldName).get();
-                            Timestamp timestamp = DataTypeUtils.toTimestamp(record.getValue(recordFieldName),
-                                    () -> DataTypeUtils.getDateFormat(fieldType.getFormat()), recordFieldName);
-                            row.addTimestamp(colIdx, timestamp);
-                            break;
-                        case STRING:
-                            row.addString(colIdx, DataTypeUtils.toString(value, recordFieldName));
-                            break;
-                        case BINARY:
-                            row.addBinary(colIdx, DataTypeUtils.toString(value, recordFieldName).getBytes());
-                            break;
-                        case FLOAT:
-                            row.addFloat(colIdx, DataTypeUtils.toFloat(value, recordFieldName));
-                            break;
-                        case DOUBLE:
-                            row.addDouble(colIdx, DataTypeUtils.toDouble(value, recordFieldName));
-                            break;
-                        case DECIMAL:
-                            row.addDecimal(colIdx, new BigDecimal(DataTypeUtils.toString(value, recordFieldName)));
-                            break;
-                        case VARCHAR:
-                            row.addVarchar(colIdx, DataTypeUtils.toString(value, recordFieldName));
-                            break;
-                        case DATE:
-                            row.addDate(colIdx, DataTypeUtils.toDate(value, () -> DataTypeUtils.getDateFormat(RecordFieldType.DATE.getDefaultFormat()), recordFieldName));
-                            break;
-                        default:
-                            throw new IllegalStateException(String.format("unknown column type %s", colType));
-                    }
+
+            if (!schema.hasColumn(colName)) {
+                continue;
+            }
+
+            final int columnIndex = schema.getColumnIndex(colName);
+            final ColumnSchema colSchema = schema.getColumnByIndex(columnIndex);
+            final Type colType = colSchema.getType();
+
+            if (record.getValue(recordFieldName) == null) {
+                if (schema.getColumnByIndex(columnIndex).isKey()) {
+                    throw new IllegalArgumentException(String.format("Can't set primary key column %s to null ", colName));
+                } else if(!schema.getColumnByIndex(columnIndex).isNullable()) {
+                    throw new IllegalArgumentException(String.format("Can't set column %s to null ", colName));
+                }
+
+                if (!ignoreNull) {
+                    row.setNull(colName);
+                }
+            } else {
+                Object value = record.getValue(recordFieldName);
+                switch (colType) {
+                    case BOOL:
+                        row.addBoolean(columnIndex, DataTypeUtils.toBoolean(value, recordFieldName));
+                        break;
+                    case INT8:
+                        row.addByte(columnIndex, DataTypeUtils.toByte(value, recordFieldName));
+                        break;
+                    case INT16:
+                        row.addShort(columnIndex,  DataTypeUtils.toShort(value, recordFieldName));
+                        break;
+                    case INT32:
+                        row.addInt(columnIndex,  DataTypeUtils.toInteger(value, recordFieldName));
+                        break;
+                    case INT64:
+                        row.addLong(columnIndex,  DataTypeUtils.toLong(value, recordFieldName));
+                        break;
+                    case UNIXTIME_MICROS:
+                        DataType fieldType = record.getSchema().getDataType(recordFieldName).get();
+                        Timestamp timestamp = DataTypeUtils.toTimestamp(record.getValue(recordFieldName),
+                                () -> DataTypeUtils.getDateFormat(fieldType.getFormat()), recordFieldName);
+                        row.addTimestamp(columnIndex, timestamp);
+                        break;
+                    case STRING:
+                        row.addString(columnIndex, DataTypeUtils.toString(value, recordFieldName));
+                        break;
+                    case BINARY:
+                        row.addBinary(columnIndex, DataTypeUtils.toString(value, recordFieldName).getBytes());
+                        break;
+                    case FLOAT:
+                        row.addFloat(columnIndex, DataTypeUtils.toFloat(value, recordFieldName));
+                        break;
+                    case DOUBLE:
+                        row.addDouble(columnIndex, DataTypeUtils.toDouble(value, recordFieldName));
+                        break;
+                    case DECIMAL:
+                        row.addDecimal(columnIndex, new BigDecimal(DataTypeUtils.toString(value, recordFieldName)));
+                        break;
+                    case VARCHAR:
+                        row.addVarchar(columnIndex, DataTypeUtils.toString(value, recordFieldName));
+                        break;
+                    case DATE:
+                        row.addDate(columnIndex, DataTypeUtils.toDate(value, () -> DataTypeUtils.getDateFormat(RecordFieldType.DATE.getDefaultFormat()), recordFieldName));
+                        break;
+                    default:
+                        throw new IllegalStateException(String.format("unknown column type %s", colType));
                 }
             }
         }
@@ -425,14 +429,6 @@ public abstract class AbstractKuduProcessor extends AbstractProcessor {
         return alterTable;
     }
 
-    private int getColumnIndex(Schema columns, String colName) {
-        try {
-            return columns.getColumnIndex(colName);
-        } catch (Exception ex) {
-            return -1;
-        }
-    }
-
     protected Upsert upsertRecordToKudu(KuduTable kuduTable, Record record, List<String> fieldNames, Boolean ignoreNull, Boolean lowercaseFields) {
         Upsert upsert = kuduTable.newUpsert();
         buildPartialRow(kuduTable.getSchema(), upsert.getRow(), record, fieldNames, ignoreNull, lowercaseFields);
diff --git a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java
index 3e8e199..c811b6b 100644
--- a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java
+++ b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java
@@ -40,34 +40,42 @@ import org.apache.nifi.components.PropertyValue;
 import org.apache.nifi.components.ValidationContext;
 import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.components.Validator;
-import org.apache.nifi.expression.ExpressionLanguageScope;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.record.path.FieldValue;
+import org.apache.nifi.record.path.RecordPath;
+import org.apache.nifi.record.path.RecordPathResult;
+import org.apache.nifi.record.path.validation.RecordPathValidator;
 import org.apache.nifi.security.krb.KerberosAction;
 import org.apache.nifi.security.krb.KerberosUser;
 import org.apache.nifi.serialization.RecordReader;
 import org.apache.nifi.serialization.RecordReaderFactory;
 import org.apache.nifi.serialization.record.Record;
 import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
 import org.apache.nifi.serialization.record.RecordSet;
 
 import javax.security.auth.login.LoginException;
-import java.io.IOException;
 import java.io.InputStream;
 import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.function.Function;
 import java.util.stream.Collectors;
-import java.util.stream.Stream;
+
+import static org.apache.nifi.expression.ExpressionLanguageScope.FLOWFILE_ATTRIBUTES;
+import static org.apache.nifi.expression.ExpressionLanguageScope.NONE;
+import static org.apache.nifi.expression.ExpressionLanguageScope.VARIABLE_REGISTRY;
 
 @EventDriven
 @SupportsBatching
@@ -86,7 +94,7 @@ public class PutKudu extends AbstractKuduProcessor {
         .description("The name of the Kudu Table to put data into")
         .required(true)
         .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-        .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+        .expressionLanguageSupported(FLOWFILE_ATTRIBUTES)
         .build();
 
     public static final PropertyDescriptor RECORD_READER = new Builder()
@@ -113,7 +121,7 @@ public class PutKudu extends AbstractKuduProcessor {
             .description("Convert column names to lowercase when finding index of Kudu table columns")
             .defaultValue("false")
             .required(true)
-            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .expressionLanguageSupported(FLOWFILE_ATTRIBUTES)
             .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
             .build();
 
@@ -123,10 +131,31 @@ public class PutKudu extends AbstractKuduProcessor {
                     "are encountered, the Kudu table will be altered to include new columns for those fields.")
             .defaultValue("false")
             .required(true)
-            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .expressionLanguageSupported(FLOWFILE_ATTRIBUTES)
             .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
             .build();
 
+    static final PropertyDescriptor DATA_RECORD_PATH = new Builder()
+        .name("Data RecordPath")
+        .displayName("Data RecordPath")
+        .description("If specified, this property denotes a RecordPath that will be evaluated against each incoming Record and the Record that results from evaluating the RecordPath will be sent to" +
+            " Kudu instead of sending the entire incoming Record. If not specified, the entire incoming Record will be published to Kudu.")
+        .required(false)
+        .addValidator(new RecordPathValidator())
+        .expressionLanguageSupported(NONE)
+        .build();
+
+    static final PropertyDescriptor OPERATION_RECORD_PATH = new Builder()
+        .name("Operation RecordPath")
+        .displayName("Operation RecordPath")
+        .description("If specified, this property denotes a RecordPath that will be evaluated against each incoming Record in order to determine the Kudu Operation Type. When evaluated, the " +
+            "RecordPath must evaluate to one of hte valid Kudu Operation Types, or the incoming FlowFile will be routed to failure. If this property is specified, the <Kudu Operation Type> property" +
+            " will be ignored.")
+        .required(false)
+        .addValidator(new RecordPathValidator())
+        .expressionLanguageSupported(NONE)
+        .build();
+
     protected static final Validator OperationTypeValidator = new Validator() {
         @Override
         public ValidationResult validate(String subject, String value, ValidationContext context) {
@@ -156,9 +185,10 @@ public class PutKudu extends AbstractKuduProcessor {
         .displayName("Kudu Operation Type")
         .description("Specify operationType for this processor.\n" +
                 "Valid values are: " +
-                Arrays.stream(OperationType.values()).map(Enum::toString).collect(Collectors.joining(", ")))
+                Arrays.stream(OperationType.values()).map(Enum::toString).collect(Collectors.joining(", ")) +
+                ". This Property will be ignored if the <Operation RecordPath> property is set.")
         .defaultValue(OperationType.INSERT.toString())
-        .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+        .expressionLanguageSupported(FLOWFILE_ATTRIBUTES)
         .addValidator(OperationTypeValidator)
         .build();
 
@@ -184,7 +214,7 @@ public class PutKudu extends AbstractKuduProcessor {
         .defaultValue("1")
         .required(true)
         .addValidator(StandardValidators.createLongValidator(1, 100000, true))
-        .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+        .expressionLanguageSupported(VARIABLE_REGISTRY)
         .build();
 
     protected static final PropertyDescriptor BATCH_SIZE = new Builder()
@@ -196,7 +226,7 @@ public class PutKudu extends AbstractKuduProcessor {
         .defaultValue("100")
         .required(true)
         .addValidator(StandardValidators.createLongValidator(1, 100000, true))
-        .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+        .expressionLanguageSupported(VARIABLE_REGISTRY)
         .build();
 
     protected static final PropertyDescriptor IGNORE_NULL = new Builder()
@@ -205,7 +235,7 @@ public class PutKudu extends AbstractKuduProcessor {
         .defaultValue("false")
         .required(true)
         .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
-        .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+        .expressionLanguageSupported(FLOWFILE_ATTRIBUTES)
         .build();
 
     protected static final Relationship REL_SUCCESS = new Relationship.Builder()
@@ -220,9 +250,11 @@ public class PutKudu extends AbstractKuduProcessor {
     public static final String RECORD_COUNT_ATTR = "record.count";
 
     // Properties set in onScheduled.
-    protected int batchSize = 100;
-    protected int ffbatch   = 1;
-    protected SessionConfiguration.FlushMode flushMode;
+    private volatile int batchSize = 100;
+    private volatile int ffbatch   = 1;
+    private volatile SessionConfiguration.FlushMode flushMode;
+    private volatile Function<Record, OperationType> recordPathOperationType;
+    private volatile RecordPath dataRecordPath;
 
     @Override
     protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
@@ -236,6 +268,8 @@ public class PutKudu extends AbstractKuduProcessor {
         properties.add(LOWERCASE_FIELD_NAMES);
         properties.add(HANDLE_SCHEMA_DRIFT);
         properties.add(RECORD_READER);
+        properties.add(DATA_RECORD_PATH);
+        properties.add(OPERATION_RECORD_PATH);
         properties.add(INSERT_OPERATION);
         properties.add(FLUSH_MODE);
         properties.add(FLOWFILE_BATCH_SIZE);
@@ -255,11 +289,22 @@ public class PutKudu extends AbstractKuduProcessor {
     }
 
     @OnScheduled
-    public void onScheduled(final ProcessContext context) throws IOException, LoginException {
+    public void onScheduled(final ProcessContext context) throws LoginException {
         batchSize = context.getProperty(BATCH_SIZE).evaluateAttributeExpressions().asInteger();
         ffbatch   = context.getProperty(FLOWFILE_BATCH_SIZE).evaluateAttributeExpressions().asInteger();
         flushMode = SessionConfiguration.FlushMode.valueOf(context.getProperty(FLUSH_MODE).getValue().toUpperCase());
         createKerberosUserAndOrKuduClient(context);
+
+        final String operationRecordPathValue = context.getProperty(OPERATION_RECORD_PATH).getValue();
+        if (operationRecordPathValue == null) {
+            recordPathOperationType = null;
+        } else {
+            final RecordPath recordPath = RecordPath.compile(operationRecordPathValue);
+            recordPathOperationType = new RecordPathOperationType(recordPath);
+        }
+
+        final String dataRecordPathValue = context.getProperty(DATA_RECORD_PATH).getValue();
+        dataRecordPath = dataRecordPathValue == null ? null : RecordPath.compile(dataRecordPathValue);
     }
 
     @Override
@@ -301,83 +346,119 @@ public class PutKudu extends AbstractKuduProcessor {
                 final RecordReader recordReader = recordReaderFactory.createRecordReader(flowFile, in, getLogger())) {
 
                 final String tableName = getEvaluatedProperty(TABLE_NAME, context, flowFile);
-                final OperationType operationType = OperationType.valueOf(getEvaluatedProperty(INSERT_OPERATION, context, flowFile).toUpperCase());
                 final Boolean ignoreNull = Boolean.valueOf(getEvaluatedProperty(IGNORE_NULL, context, flowFile));
                 final Boolean lowercaseFields = Boolean.valueOf(getEvaluatedProperty(LOWERCASE_FIELD_NAMES, context, flowFile));
-                final Boolean handleSchemaDrift = Boolean.valueOf(getEvaluatedProperty(HANDLE_SCHEMA_DRIFT, context, flowFile));
+                final boolean handleSchemaDrift = Boolean.parseBoolean(getEvaluatedProperty(HANDLE_SCHEMA_DRIFT, context, flowFile));
+
+                final Function<Record, OperationType> operationTypeFunction;
+                if (recordPathOperationType == null) {
+                    final OperationType staticOperationType = OperationType.valueOf(getEvaluatedProperty(INSERT_OPERATION, context, flowFile).toUpperCase());
+                    operationTypeFunction = record -> staticOperationType;
+                } else {
+                    operationTypeFunction = recordPathOperationType;
+                }
 
                 final RecordSet recordSet = recordReader.createRecordSet();
-                final List<String> fieldNames = recordReader.getSchema().getFieldNames();
                 KuduTable kuduTable = kuduClient.openTable(tableName);
 
                 // If handleSchemaDrift is true, check for any missing columns and alter the Kudu table to add them.
                 if (handleSchemaDrift) {
                     final Schema schema = kuduTable.getSchema();
-                    Stream<RecordField> fields = recordReader.getSchema().getFields().stream();
-                    List<RecordField> missing = fields.filter(field -> !schema.hasColumn(
-                            lowercaseFields ? field.getFieldName().toLowerCase() : field.getFieldName()))
-                            .collect(Collectors.toList());
+                    final List<RecordField> missing = recordReader.getSchema().getFields().stream()
+                        .filter(field -> !schema.hasColumn(lowercaseFields ? field.getFieldName().toLowerCase() : field.getFieldName()))
+                        .collect(Collectors.toList());
+
                     if (!missing.isEmpty()) {
-                        getLogger().info("adding {} columns to table '{}' to handle schema drift",
-                                new Object[]{missing.size(), tableName});
+                        getLogger().info("adding {} columns to table '{}' to handle schema drift", new Object[]{missing.size(), tableName});
+
                         // Add each column one at a time to avoid failing if some of the missing columns
                         // we created by a concurrent thread or application attempting to handle schema drift.
-                        for (RecordField field : missing) {
+                        for (final RecordField field : missing) {
                             try {
                                 final String columnName = lowercaseFields ? field.getFieldName().toLowerCase() : field.getFieldName();
                                 kuduClient.alterTable(tableName, getAddNullableColumnStatement(columnName, field.getDataType()));
-                            } catch (KuduException e) {
+                            } catch (final KuduException e) {
                                 // Ignore the exception if the column already exists due to concurrent
                                 // threads or applications attempting to handle schema drift.
                                 if (e.getStatus().isAlreadyPresent()) {
-                                    getLogger().info("column already exists in table '{}' while handling schema drift",
-                                            new Object[]{tableName});
+                                    getLogger().info("Column already exists in table '{}' while handling schema drift", new Object[]{tableName});
                                 } else {
                                     throw new ProcessException(e);
                                 }
                             }
                         }
+
                         // Re-open the table to get the new schema.
                         kuduTable = kuduClient.openTable(tableName);
                     }
                 }
 
-                // In the case of INSERT_IGNORE the Kudu session is modified to ignore row errors.
-                // Because the session is shared across flow files, for batching efficiency, we
-                // need to flush when changing to and from INSERT_IGNORE operation types.
-                // This should be updated and simplified when KUDU-1563 is completed.
-                if (prevOperationType != operationType && (prevOperationType == OperationType.INSERT_IGNORE || operationType == OperationType.INSERT_IGNORE)) {
-                    flushKuduSession(kuduSession, false, pendingRowErrors);
-                    kuduSession.setIgnoreAllDuplicateRows(operationType == OperationType.INSERT_IGNORE);
-                }
-                prevOperationType = operationType;
-
                 Record record = recordSet.next();
-                while (record != null) {
-                    Operation operation = createKuduOperation(operationType, record, fieldNames, ignoreNull, lowercaseFields, kuduTable);
-                    // We keep track of mappings between Operations and their origins,
-                    // so that we know which FlowFiles should be marked failure after buffered flush.
-                    operationFlowFileMap.put(operation, flowFile);
-
-                    // Flush mutation buffer of KuduSession to avoid "MANUAL_FLUSH is enabled
-                    // but the buffer is too big" error. This can happen when flush mode is
-                    // MANUAL_FLUSH and a FlowFile has more than one records.
-                    if (numBuffered == batchSize && flushMode == SessionConfiguration.FlushMode.MANUAL_FLUSH) {
-                        numBuffered = 0;
-                        flushKuduSession(kuduSession, false, pendingRowErrors);
+                recordReaderLoop: while (record != null) {
+                    final OperationType operationType = operationTypeFunction.apply(record);
+
+                    final List<Record> dataRecords;
+                    if (dataRecordPath == null) {
+                        dataRecords = Collections.singletonList(record);
+                    } else {
+                        final RecordPathResult result = dataRecordPath.evaluate(record);
+                        final List<FieldValue> fieldValues = result.getSelectedFields().collect(Collectors.toList());
+                        if (fieldValues.isEmpty()) {
+                            throw new ProcessException("RecordPath " + dataRecordPath.getPath() + " evaluated against Record yielded no results.");
+                        }
+
+                        for (final FieldValue fieldValue : fieldValues) {
+                            final RecordFieldType fieldType = fieldValue.getField().getDataType().getFieldType();
+                            if (fieldType != RecordFieldType.RECORD) {
+                                throw new ProcessException("RecordPath " + dataRecordPath.getPath() + " evaluated against Record expected to return one or more Records but encountered field of type" +
+                                    " " + fieldType);
+                            }
+                        }
+
+                        dataRecords = new ArrayList<>(fieldValues.size());
+                        for (final FieldValue fieldValue : fieldValues) {
+                            dataRecords.add((Record) fieldValue.getValue());
+                        }
                     }
 
-                    // OperationResponse is returned only when flush mode is set to AUTO_FLUSH_SYNC
-                    OperationResponse response = kuduSession.apply(operation);
-                    if (response != null && response.hasRowError()) {
-                        // Stop processing the records on the first error.
-                        // Note that Kudu does not support rolling back of previous operations.
-                        flowFileFailures.put(flowFile, response.getRowError());
-                        break;
+                    for (final Record dataRecord : dataRecords) {
+                        // In the case of INSERT_IGNORE the Kudu session is modified to ignore row errors.
+                        // Because the session is shared across flow files, for batching efficiency, we
+                        // need to flush when changing to and from INSERT_IGNORE operation types.
+                        // This should be updated and simplified when KUDU-1563 is completed.
+                        if (prevOperationType != operationType && (prevOperationType == OperationType.INSERT_IGNORE || operationType == OperationType.INSERT_IGNORE)) {
+                            flushKuduSession(kuduSession, false, pendingRowErrors);
+                            kuduSession.setIgnoreAllDuplicateRows(operationType == OperationType.INSERT_IGNORE);
+                        }
+                        prevOperationType = operationType;
+
+                        final List<String> fieldNames = dataRecord.getSchema().getFieldNames();
+                        Operation operation = createKuduOperation(operationType, dataRecord, fieldNames, ignoreNull, lowercaseFields, kuduTable);
+                        // We keep track of mappings between Operations and their origins,
+                        // so that we know which FlowFiles should be marked failure after buffered flush.
+                        operationFlowFileMap.put(operation, flowFile);
+
+                        // Flush mutation buffer of KuduSession to avoid "MANUAL_FLUSH is enabled
+                        // but the buffer is too big" error. This can happen when flush mode is
+                        // MANUAL_FLUSH and a FlowFile has more than one records.
+                        if (numBuffered == batchSize && flushMode == SessionConfiguration.FlushMode.MANUAL_FLUSH) {
+                            numBuffered = 0;
+                            flushKuduSession(kuduSession, false, pendingRowErrors);
+                        }
+
+                        // OperationResponse is returned only when flush mode is set to AUTO_FLUSH_SYNC
+                        OperationResponse response = kuduSession.apply(operation);
+                        if (response != null && response.hasRowError()) {
+                            // Stop processing the records on the first error.
+                            // Note that Kudu does not support rolling back of previous operations.
+                            flowFileFailures.put(flowFile, response.getRowError());
+                            break recordReaderLoop;
+                        }
+
+                        numBuffered++;
+                        numRecords.merge(flowFile, 1, Integer::sum);
                     }
 
-                    numBuffered++;
-                    numRecords.merge(flowFile, 1, Integer::sum);
                     record = recordSet.next();
                 }
             } catch (Exception ex) {
@@ -460,4 +541,32 @@ public class PutKudu extends AbstractKuduProcessor {
                 throw new IllegalArgumentException(String.format("OperationType: %s not supported by Kudu", operationType));
         }
     }
+
+    private static class RecordPathOperationType implements Function<Record, OperationType> {
+        private final RecordPath recordPath;
+
+        public RecordPathOperationType(final RecordPath recordPath) {
+            this.recordPath = recordPath;
+        }
+
+        @Override
+        public OperationType apply(final Record record) {
+            final RecordPathResult recordPathResult = recordPath.evaluate(record);
+            final List<FieldValue> resultList = recordPathResult.getSelectedFields().distinct().collect(Collectors.toList());
+            if (resultList.isEmpty()) {
+                throw new ProcessException("Evaluated RecordPath " + recordPath.getPath() + " against Record but got no results");
+            }
+
+            if (resultList.size() > 1) {
+                throw new ProcessException("Evaluated RecordPath " + recordPath.getPath() + " against Record and received multiple distinct results (" + resultList + ")");
+            }
+
+            final String resultValue = String.valueOf(resultList.get(0).getValue());
+            try {
+                return OperationType.valueOf(resultValue.toUpperCase());
+            } catch (final IllegalArgumentException iae) {
+                throw new ProcessException("Evaluated RecordPath " + recordPath.getPath() + " against Record to determine Kudu Operation Type but found invalid value: " + resultValue);
+            }
+        }
+    }
 }