You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2017/04/27 17:56:56 UTC

[2/5] nifi git commit: NIFI-3415: Add Rollback on Failure.

http://git-wip-us.apache.org/repos/asf/nifi/blob/d9acdb54/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
index fd414c4..0f65014 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
@@ -31,13 +31,19 @@ import org.apache.nifi.dbcp.DBCPService;
 import org.apache.nifi.expression.AttributeExpression;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.logging.ComponentLog;
-import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessSessionFactory;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.util.StandardValidators;
-import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.processor.util.pattern.ErrorTypes;
+import org.apache.nifi.processor.util.pattern.ExceptionHandler;
+import org.apache.nifi.processor.util.pattern.PartialFunctions;
+import org.apache.nifi.processor.util.pattern.Put;
+import org.apache.nifi.processor.util.pattern.RollbackOnFailure;
+import org.apache.nifi.processor.util.pattern.RoutingResult;
 import org.apache.nifi.serialization.MalformedRecordException;
 import org.apache.nifi.serialization.RecordReader;
 import org.apache.nifi.serialization.RecordReaderFactory;
@@ -53,7 +59,9 @@ import java.sql.DatabaseMetaData;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.ResultSetMetaData;
+import java.sql.SQLDataException;
 import java.sql.SQLException;
+import java.sql.SQLIntegrityConstraintViolationException;
 import java.sql.SQLNonTransientException;
 import java.sql.Statement;
 import java.util.ArrayList;
@@ -66,7 +74,8 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.stream.IntStream;
+
+import static java.lang.String.format;
 
 
 @EventDriven
@@ -83,7 +92,7 @@ import java.util.stream.IntStream;
         + "will be used to determine the type of statement (INSERT, UPDATE, DELETE, SQL, etc.) to generate and execute.")
 @WritesAttribute(attribute = PutDatabaseRecord.PUT_DATABASE_RECORD_ERROR, description = "If an error occurs during processing, the flow file will be routed to failure or retry, and this attribute "
         + "will be populated with the cause of the error.")
-public class PutDatabaseRecord extends AbstractProcessor {
+public class PutDatabaseRecord extends AbstractSessionFactoryProcessor {
 
     static final String UPDATE_TYPE = "UPDATE";
     static final String INSERT_TYPE = "INSERT";
@@ -289,10 +298,13 @@ public class PutDatabaseRecord extends AbstractProcessor {
         pds.add(QUOTED_IDENTIFIERS);
         pds.add(QUOTED_TABLE_IDENTIFIER);
         pds.add(QUERY_TIMEOUT);
+        pds.add(RollbackOnFailure.ROLLBACK_ON_FAILURE);
 
         propDescriptors = Collections.unmodifiableList(pds);
     }
 
+    private Put<FunctionContext, Connection> process;
+    private ExceptionHandler<FunctionContext> exceptionHandler;
 
     @Override
     public Set<Relationship> getRelationships() {
@@ -316,318 +328,356 @@ public class PutDatabaseRecord extends AbstractProcessor {
                 .build();
     }
 
+    private final PartialFunctions.InitConnection<FunctionContext, Connection> initConnection = (c, s, fc) -> {
+        final Connection connection = c.getProperty(DBCP_SERVICE).asControllerService(DBCPService.class).getConnection();
+        try {
+            fc.originalAutoCommit = connection.getAutoCommit();
+            connection.setAutoCommit(false);
+
+            String jdbcUrl = "DBCPService";
+            try {
+                DatabaseMetaData databaseMetaData = connection.getMetaData();
+                if (databaseMetaData != null) {
+                    jdbcUrl = databaseMetaData.getURL();
+                }
+            } catch (SQLException se) {
+                // Ignore and use default JDBC URL. This shouldn't happen unless the driver doesn't implement getMetaData() properly
+            } finally {
+                fc.jdbcUrl = jdbcUrl;
+            }
+
+        } catch (SQLException e) {
+            throw new ProcessException("Failed to disable auto commit due to " + e, e);
+        }
+        return connection;
+    };
+
+    private final Put.PutFlowFile<FunctionContext, Connection> putFlowFile = (context, session, functionContext, conn, flowFile, result) -> {
+
+        exceptionHandler.execute(functionContext, flowFile, inputFlowFile -> {
+
+            // Get the statement type from the attribute if necessary
+            final String statementTypeProperty = context.getProperty(STATEMENT_TYPE).getValue();
+            String statementType = statementTypeProperty;
+            if (USE_ATTR_TYPE.equals(statementTypeProperty)) {
+                statementType = inputFlowFile.getAttribute(STATEMENT_TYPE_ATTRIBUTE);
+            }
+            if (StringUtils.isEmpty(statementType)) {
+                final String msg = format("Statement Type is not specified, FlowFile %s", inputFlowFile);
+                throw new IllegalArgumentException(msg);
+            }
+
+
+            try (final InputStream in = session.read(inputFlowFile)) {
+
+                final RecordReaderFactory recordParserFactory = context.getProperty(RECORD_READER_FACTORY)
+                        .asControllerService(RecordReaderFactory.class);
+                final RecordReader recordParser = recordParserFactory.createRecordReader(inputFlowFile, in, getLogger());
+
+                if (SQL_TYPE.equalsIgnoreCase(statementType)) {
+                    executeSQL(context, session, inputFlowFile, functionContext, result, conn, recordParser);
+
+                } else {
+                    final DMLSettings settings = new DMLSettings(context);
+                    executeDML(context, session, inputFlowFile, functionContext, result, conn, recordParser, statementType, settings);
+                }
+            }
+
+        }, (fc, inputFlowFile, r, e) -> {
+
+            getLogger().warn("Failed to process {} due to {}", new Object[]{inputFlowFile, e}, e);
+
+            if (e instanceof BatchUpdateException) {
+                try {
+                    // Although process session will move forward in order to route the failed FlowFile,
+                    // database transaction should be rolled back to avoid partial batch update.
+                    conn.rollback();
+                } catch (SQLException re) {
+                    getLogger().error("Failed to rollback database due to {}, transaction may be incomplete.", new Object[]{re}, re);
+                }
+            }
+
+            // Embed Exception detail to FlowFile attribute then delegate error handling to default and rollbackOnFailure.
+            final FlowFile flowFileWithAttributes = session.putAttribute(inputFlowFile, PUT_DATABASE_RECORD_ERROR, e.getMessage());
+            final ExceptionHandler.OnError<FunctionContext, FlowFile> defaultOnError = ExceptionHandler.createOnError(context, session, result, REL_FAILURE, REL_RETRY);
+            final ExceptionHandler.OnError<FunctionContext, FlowFile> rollbackOnFailure = RollbackOnFailure.createOnError(defaultOnError);
+            rollbackOnFailure.apply(fc, flowFileWithAttributes, r, e);
+        });
+    };
+
+
     @OnScheduled
     public void onScheduled(final ProcessContext context) {
         synchronized (this) {
             schemaCache.clear();
         }
+
+        process = new Put<>();
+
+        process.setLogger(getLogger());
+        process.initConnection(initConnection);
+        process.putFlowFile(putFlowFile);
+        process.adjustRoute(RollbackOnFailure.createAdjustRoute(REL_FAILURE, REL_RETRY));
+
+        process.onCompleted((c, s, fc, conn) -> {
+            try {
+                conn.commit();
+            } catch (SQLException e) {
+                // Throw ProcessException to rollback process session.
+                throw new ProcessException("Failed to commit database connection due to " + e, e);
+            }
+        });
+
+        process.onFailed((c, s, fc, conn, e) -> {
+            try {
+                conn.rollback();
+            } catch (SQLException re) {
+                // Just log the fact that rollback failed.
+                // ProcessSession will be rollback by the thrown Exception so don't have to do anything here.
+                getLogger().warn("Failed to rollback database connection due to %s", new Object[]{re}, re);
+            }
+        });
+
+        process.cleanup((c, s, fc, conn) -> {
+            // make sure that we try to set the auto commit back to whatever it was.
+            if (fc.originalAutoCommit) {
+                try {
+                    conn.setAutoCommit(true);
+                } catch (final SQLException se) {
+                    getLogger().warn("Failed to reset autocommit due to {}", new Object[]{se});
+                }
+            }
+        });
+
+        exceptionHandler = new ExceptionHandler<>();
+        exceptionHandler.mapException(s -> {
+
+            try {
+                if (s == null) {
+                    return ErrorTypes.PersistentFailure;
+                }
+                throw s;
+
+            } catch (IllegalArgumentException
+                    |MalformedRecordException
+                    |SQLNonTransientException e) {
+                return ErrorTypes.InvalidInput;
+
+            } catch (IOException
+                    |SQLException e) {
+                return ErrorTypes.TemporalFailure;
+
+            } catch (Exception e) {
+                return ErrorTypes.UnknownFailure;
+            }
+
+        });
+        exceptionHandler.adjustError(RollbackOnFailure.createAdjustError(getLogger()));
     }
 
-    @Override
-    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+    private static class FunctionContext extends RollbackOnFailure {
+        private final int queryTimeout;
+        private boolean originalAutoCommit = false;
+        private String jdbcUrl;
 
-        FlowFile flowFile = session.get();
-        if (flowFile == null) {
-            return;
+        public FunctionContext(boolean rollbackOnFailure, int queryTimeout) {
+            super(rollbackOnFailure, true);
+            this.queryTimeout = queryTimeout;
         }
+    }
 
-        final ComponentLog log = getLogger();
-
-        final RecordReaderFactory recordParserFactory = context.getProperty(RECORD_READER_FACTORY)
-                .asControllerService(RecordReaderFactory.class);
-        final String statementTypeProperty = context.getProperty(STATEMENT_TYPE).getValue();
-        final DBCPService dbcpService = context.getProperty(DBCP_SERVICE).asControllerService(DBCPService.class);
-        final boolean translateFieldNames = context.getProperty(TRANSLATE_FIELD_NAMES).asBoolean();
-        final boolean ignoreUnmappedFields = IGNORE_UNMATCHED_FIELD.getValue().equalsIgnoreCase(context.getProperty(UNMATCHED_FIELD_BEHAVIOR).getValue());
-        final Integer queryTimeout = context.getProperty(QUERY_TIMEOUT).evaluateAttributeExpressions().asTimePeriod(TimeUnit.SECONDS).intValue();
+    static class DMLSettings {
+        private final boolean translateFieldNames;
+        private final boolean ignoreUnmappedFields;
 
         // Is the unmatched column behaviour fail or warning?
-        final boolean failUnmappedColumns = FAIL_UNMATCHED_COLUMN.getValue().equalsIgnoreCase(context.getProperty(UNMATCHED_COLUMN_BEHAVIOR).getValue());
-        final boolean warningUnmappedColumns = WARNING_UNMATCHED_COLUMN.getValue().equalsIgnoreCase(context.getProperty(UNMATCHED_COLUMN_BEHAVIOR).getValue());
+        private final boolean failUnmappedColumns;
+        private final boolean warningUnmappedColumns;
 
         // Escape column names?
-        final boolean escapeColumnNames = context.getProperty(QUOTED_IDENTIFIERS).asBoolean();
+        private final boolean escapeColumnNames;
 
         // Quote table name?
-        final boolean quoteTableName = context.getProperty(QUOTED_TABLE_IDENTIFIER).asBoolean();
+        private final boolean quoteTableName;
+
+        private DMLSettings(ProcessContext context) {
+            translateFieldNames = context.getProperty(TRANSLATE_FIELD_NAMES).asBoolean();
+            ignoreUnmappedFields = IGNORE_UNMATCHED_FIELD.getValue().equalsIgnoreCase(context.getProperty(UNMATCHED_FIELD_BEHAVIOR).getValue());
+
+            failUnmappedColumns = FAIL_UNMATCHED_COLUMN.getValue().equalsIgnoreCase(context.getProperty(UNMATCHED_COLUMN_BEHAVIOR).getValue());
+            warningUnmappedColumns = WARNING_UNMATCHED_COLUMN.getValue().equalsIgnoreCase(context.getProperty(UNMATCHED_COLUMN_BEHAVIOR).getValue());
+
+            escapeColumnNames = context.getProperty(QUOTED_IDENTIFIERS).asBoolean();
+            quoteTableName = context.getProperty(QUOTED_TABLE_IDENTIFIER).asBoolean();
+        }
+
+    }
 
-        try (final Connection con = dbcpService.getConnection()) {
+    private void executeSQL(ProcessContext context, ProcessSession session,
+                            FlowFile flowFile, FunctionContext functionContext, RoutingResult result,
+                            Connection con, RecordReader recordParser)
+            throws IllegalArgumentException, MalformedRecordException, IOException, SQLException {
+
+        final RecordSchema recordSchema = recordParser.getSchema();
+
+        // Find which field has the SQL statement in it
+        final String sqlField = context.getProperty(FIELD_CONTAINING_SQL).evaluateAttributeExpressions(flowFile).getValue();
+        if (StringUtils.isEmpty(sqlField)) {
+            throw new IllegalArgumentException(format("SQL specified as Statement Type but no Field Containing SQL was found, FlowFile %s", flowFile));
+        }
+
+        boolean schemaHasSqlField = recordSchema.getFields().stream().anyMatch((field) -> sqlField.equals(field.getFieldName()));
+        if (!schemaHasSqlField) {
+            throw new IllegalArgumentException(format("Record schema does not contain Field Containing SQL: %s, FlowFile %s", sqlField, flowFile));
+        }
+
+        try (Statement s = con.createStatement()) {
 
-            final boolean originalAutoCommit = con.getAutoCommit();
             try {
-                con.setAutoCommit(false);
+                s.setQueryTimeout(functionContext.queryTimeout); // timeout in seconds
+            } catch (SQLException se) {
+                // If the driver doesn't support query timeout, then assume it is "infinite". Allow a timeout of zero only
+                if (functionContext.queryTimeout > 0) {
+                    throw se;
+                }
+            }
 
-                String jdbcURL = "DBCPService";
-                try {
-                    DatabaseMetaData databaseMetaData = con.getMetaData();
-                    if (databaseMetaData != null) {
-                        jdbcURL = databaseMetaData.getURL();
-                    }
-                } catch (SQLException se) {
-                    // Ignore and use default JDBC URL. This shouldn't happen unless the driver doesn't implement getMetaData() properly
+            Record currentRecord;
+            while ((currentRecord = recordParser.nextRecord()) != null) {
+                Object sql = currentRecord.getValue(sqlField);
+                if (sql == null || StringUtils.isEmpty((String) sql)) {
+                    throw new MalformedRecordException(format("Record had no (or null) value for Field Containing SQL: %s, FlowFile %s", sqlField, flowFile));
                 }
 
-                final String catalog = context.getProperty(CATALOG_NAME).evaluateAttributeExpressions(flowFile).getValue();
-                final String schemaName = context.getProperty(SCHEMA_NAME).evaluateAttributeExpressions(flowFile).getValue();
-                final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue();
-                final String updateKeys = context.getProperty(UPDATE_KEYS).evaluateAttributeExpressions(flowFile).getValue();
-                final SchemaKey schemaKey = new SchemaKey(catalog, schemaName, tableName);
+                // Execute the statement as-is
+                s.execute((String) sql);
+            }
+            result.routeTo(flowFile, REL_SUCCESS);
+            session.getProvenanceReporter().send(flowFile, functionContext.jdbcUrl);
+        }
+    }
 
-                // Get the statement type from the attribute if necessary
-                String statementType = statementTypeProperty;
-                if (USE_ATTR_TYPE.equals(statementTypeProperty)) {
-                    statementType = flowFile.getAttribute(STATEMENT_TYPE_ATTRIBUTE);
-                }
-                if (StringUtils.isEmpty(statementType)) {
-                    log.error("Statement Type is not specified, flowfile {} will be penalized and routed to failure", new Object[]{flowFile});
-                    flowFile = session.putAttribute(flowFile, PUT_DATABASE_RECORD_ERROR, "Statement Type not specified");
-                    flowFile = session.penalize(flowFile);
-                    session.transfer(flowFile, REL_FAILURE);
-                } else {
-                    RecordSchema recordSchema;
-                    try (final InputStream in = session.read(flowFile)) {
-
-                        final RecordReader recordParser = recordParserFactory.createRecordReader(flowFile, in, log);
-                        recordSchema = recordParser.getSchema();
-
-                        if (SQL_TYPE.equalsIgnoreCase(statementType)) {
-
-                            // Find which field has the SQL statement in it
-                            final String sqlField = context.getProperty(FIELD_CONTAINING_SQL).evaluateAttributeExpressions(flowFile).getValue();
-                            if (StringUtils.isEmpty(sqlField)) {
-                                log.error("SQL specified as Statement Type but no Field Containing SQL was found, flowfile {} will be penalized and routed to failure", new Object[]{flowFile});
-                                flowFile = session.putAttribute(flowFile, PUT_DATABASE_RECORD_ERROR, "Field Containing SQL not found");
-                                flowFile = session.penalize(flowFile);
-                                session.transfer(flowFile, REL_FAILURE);
-                            } else {
-                                boolean schemaHasSqlField = recordSchema.getFields().stream().anyMatch((field) -> sqlField.equals(field.getFieldName()));
-                                if (schemaHasSqlField) {
-                                    try (Statement s = con.createStatement()) {
-
-                                        try {
-                                            s.setQueryTimeout(queryTimeout); // timeout in seconds
-                                        } catch (SQLException se) {
-                                            // If the driver doesn't support query timeout, then assume it is "infinite". Allow a timeout of zero only
-                                            if (queryTimeout > 0) {
-                                                throw se;
-                                            }
-                                        }
-
-                                        Record currentRecord;
-                                        while ((currentRecord = recordParser.nextRecord()) != null) {
-                                            Object sql = currentRecord.getValue(sqlField);
-                                            if (sql != null && !StringUtils.isEmpty((String) sql)) {
-                                                // Execute the statement as-is
-                                                s.execute((String) sql);
-                                            } else {
-                                                log.error("Record had no (or null) value for Field Containing SQL: {}, flowfile {} will be penalized and routed to failure",
-                                                        new Object[]{sqlField, flowFile});
-                                                flowFile = session.putAttribute(flowFile, PUT_DATABASE_RECORD_ERROR, "Field Containing SQL missing value");
-                                                flowFile = session.penalize(flowFile);
-                                                session.transfer(flowFile, REL_FAILURE);
-                                                return;
-                                            }
-                                        }
-                                        session.transfer(flowFile, REL_SUCCESS);
-                                        session.getProvenanceReporter().send(flowFile, jdbcURL);
-                                    } catch (final SQLNonTransientException e) {
-                                        log.error("Failed to update database for {} due to {}; rolling back database and routing to failure", new Object[]{flowFile, e}, e);
-                                        try {
-                                            con.rollback();
-                                        } catch (SQLException se) {
-                                            log.error("Failed to rollback database, transaction may be incomplete.", se);
-                                        }
-                                        flowFile = session.putAttribute(flowFile, PUT_DATABASE_RECORD_ERROR, e.getMessage());
-                                        flowFile = session.penalize(flowFile);
-                                        session.transfer(flowFile, REL_FAILURE);
-                                    } catch (final SQLException e) {
-                                        log.error("Failed to update database for {} due to {}; rolling back database. It is possible that retrying the operation will succeed, so routing to retry",
-                                                new Object[]{flowFile, e}, e);
-                                        try {
-                                            con.rollback();
-                                        } catch (SQLException se) {
-                                            log.error("Failed to rollback database, transaction may be incomplete.", se);
-                                        }
-                                        flowFile = session.putAttribute(flowFile, PUT_DATABASE_RECORD_ERROR, e.getMessage());
-                                        flowFile = session.penalize(flowFile);
-                                        session.transfer(flowFile, REL_RETRY);
-                                    }
-                                } else {
-                                    log.error("Record schema does not contain Field Containing SQL: {}, flowfile {} will be penalized and routed to failure", new Object[]{sqlField, flowFile});
-                                    flowFile = session.putAttribute(flowFile, PUT_DATABASE_RECORD_ERROR, "Record schema missing Field Containing SQL value");
-                                    flowFile = session.penalize(flowFile);
-                                    session.transfer(flowFile, REL_FAILURE);
-                                }
-                            }
+    private void executeDML(ProcessContext context, ProcessSession session, FlowFile flowFile,
+                            FunctionContext functionContext, RoutingResult result, Connection con,
+                            RecordReader recordParser, String statementType, DMLSettings settings)
+            throws IllegalArgumentException, MalformedRecordException, IOException, SQLException {
 
-                        } else {
-                            // Ensure the table name has been set, the generated SQL statements (and TableSchema cache) will need it
-                            if (StringUtils.isEmpty(tableName)) {
-                                log.error("Cannot process {} because Table Name is null or empty; penalizing and routing to failure", new Object[]{flowFile});
-                                flowFile = session.putAttribute(flowFile, PUT_DATABASE_RECORD_ERROR, "Table Name missing");
-                                flowFile = session.penalize(flowFile);
-                                session.transfer(flowFile, REL_FAILURE);
-                                return;
-                            }
-
-                            // Always get the primary keys if Update Keys is empty. Otherwise if we have an Insert statement first, the table will be
-                            // cached but the primary keys will not be retrieved, causing future UPDATE statements to not have primary keys available
-                            final boolean includePrimaryKeys = (updateKeys == null);
-
-                            // get the database schema from the cache, if one exists. We do this in a synchronized block, rather than
-                            // using a ConcurrentMap because the Map that we are using is a LinkedHashMap with a capacity such that if
-                            // the Map grows beyond this capacity, old elements are evicted. We do this in order to avoid filling the
-                            // Java Heap if there are a lot of different SQL statements being generated that reference different tables.
-                            TableSchema schema;
-                            synchronized (this) {
-                                schema = schemaCache.get(schemaKey);
-                                if (schema == null) {
-                                    // No schema exists for this table yet. Query the database to determine the schema and put it into the cache.
-                                    try (final Connection conn = dbcpService.getConnection()) {
-                                        schema = TableSchema.from(conn, catalog, schemaName, tableName, translateFieldNames, includePrimaryKeys);
-                                        schemaCache.put(schemaKey, schema);
-                                    } catch (final SQLNonTransientException e) {
-                                        log.error("Failed to update database for {} due to {}; routing to failure", new Object[]{flowFile, e}, e);
-                                        flowFile = session.putAttribute(flowFile, PUT_DATABASE_RECORD_ERROR, e.getMessage());
-                                        flowFile = session.penalize(flowFile);
-                                        session.transfer(flowFile, REL_FAILURE);
-                                        return;
-                                    } catch (final SQLException e) {
-                                        log.error("Failed to update database for {} due to {}; it is possible that retrying the operation will succeed, so routing to retry",
-                                                new Object[]{flowFile, e}, e);
-                                        flowFile = session.putAttribute(flowFile, PUT_DATABASE_RECORD_ERROR, e.getMessage());
-                                        flowFile = session.penalize(flowFile);
-                                        session.transfer(flowFile, REL_RETRY);
-                                        return;
-                                    }
-                                }
-                            }
-
-                            final SqlAndIncludedColumns sqlHolder;
-                            try {
-                                // build the fully qualified table name
-                                final StringBuilder tableNameBuilder = new StringBuilder();
-                                if (catalog != null) {
-                                    tableNameBuilder.append(catalog).append(".");
-                                }
-                                if (schemaName != null) {
-                                    tableNameBuilder.append(schemaName).append(".");
-                                }
-                                tableNameBuilder.append(tableName);
-                                final String fqTableName = tableNameBuilder.toString();
-
-                                if (INSERT_TYPE.equalsIgnoreCase(statementType)) {
-                                    sqlHolder = generateInsert(recordSchema, fqTableName, schema, translateFieldNames, ignoreUnmappedFields,
-                                            failUnmappedColumns, warningUnmappedColumns, escapeColumnNames, quoteTableName);
-                                } else if (UPDATE_TYPE.equalsIgnoreCase(statementType)) {
-                                    sqlHolder = generateUpdate(recordSchema, fqTableName, updateKeys, schema, translateFieldNames, ignoreUnmappedFields,
-                                            failUnmappedColumns, warningUnmappedColumns, escapeColumnNames, quoteTableName);
-                                } else if (DELETE_TYPE.equalsIgnoreCase(statementType)) {
-                                    sqlHolder = generateDelete(recordSchema, fqTableName, schema, translateFieldNames, ignoreUnmappedFields,
-                                            failUnmappedColumns, warningUnmappedColumns, escapeColumnNames, quoteTableName);
-                                } else {
-                                    log.error("Statement Type {} is not valid, flowfile {} will be penalized and routed to failure", new Object[]{statementType, flowFile});
-                                    flowFile = session.putAttribute(flowFile, PUT_DATABASE_RECORD_ERROR, "Statement Type invalid");
-                                    flowFile = session.penalize(flowFile);
-                                    session.transfer(flowFile, REL_FAILURE);
-                                    return;
-                                }
-                            } catch (final ProcessException pe) {
-                                log.error("Failed to convert {} to a SQL {} statement due to {}; routing to failure",
-                                        new Object[]{flowFile, statementType, pe.toString()}, pe);
-                                flowFile = session.putAttribute(flowFile, PUT_DATABASE_RECORD_ERROR, pe.getMessage());
-                                flowFile = session.penalize(flowFile);
-                                session.transfer(flowFile, REL_FAILURE);
-                                return;
-                            }
-
-                            try (PreparedStatement ps = con.prepareStatement(sqlHolder.getSql())) {
-
-                                try {
-                                    ps.setQueryTimeout(queryTimeout); // timeout in seconds
-                                } catch (SQLException se) {
-                                    // If the driver doesn't support query timeout, then assume it is "infinite". Allow a timeout of zero only
-                                    if (queryTimeout > 0) {
-                                        throw se;
-                                    }
-                                }
-
-                                Record currentRecord;
-                                List<Integer> fieldIndexes = sqlHolder.getFieldIndexes();
-
-                                while ((currentRecord = recordParser.nextRecord()) != null) {
-                                    Object[] values = currentRecord.getValues();
-                                    if (values != null) {
-                                        if (fieldIndexes != null) {
-                                            for (int i = 0; i < fieldIndexes.size(); i++) {
-                                                ps.setObject(i + 1, values[fieldIndexes.get(i)]);
-                                            }
-                                        } else {
-                                            // If there's no index map, assume all values are included and set them in order
-                                            for (int i = 0; i < values.length; i++) {
-                                                ps.setObject(i + 1, values[i]);
-                                            }
-                                        }
-                                        ps.addBatch();
-                                    }
-                                }
-
-                                log.debug("Executing query {}", new Object[]{sqlHolder});
-                                ps.executeBatch();
-                                session.transfer(flowFile, REL_SUCCESS);
-                                session.getProvenanceReporter().send(flowFile, jdbcURL);
-
-                            } catch (final SQLNonTransientException | BatchUpdateException e) {
-                                log.error("Failed to update database for {} due to {}; rolling back database, routing to failure", new Object[]{flowFile, e}, e);
-                                try {
-                                    con.rollback();
-                                } catch (SQLException se) {
-                                    log.error("Failed to rollback database, transaction may be incomplete.", se);
-                                }
-                                flowFile = session.putAttribute(flowFile, PUT_DATABASE_RECORD_ERROR, e.getMessage());
-                                flowFile = session.penalize(flowFile);
-                                session.transfer(flowFile, REL_FAILURE);
-                            } catch (final SQLException e) {
-                                log.error("Failed to update database for {} due to {}; rolling back database. It is possible that retrying the operation will succeed, so routing to retry",
-                                        new Object[]{flowFile, e}, e);
-                                try {
-                                    con.rollback();
-                                } catch (SQLException se) {
-                                    log.error("Failed to rollback database, transaction may be incomplete.", se);
-                                }
-                                flowFile = session.putAttribute(flowFile, PUT_DATABASE_RECORD_ERROR, e.getMessage());
-                                flowFile = session.penalize(flowFile);
-                                session.transfer(flowFile, REL_RETRY);
-                            }
-                        }
-                    } catch (final MalformedRecordException | SchemaNotFoundException | IOException e) {
-                        log.error("Failed to determine schema of data records for {}, routing to failure", new Object[]{flowFile}, e);
+        final RecordSchema recordSchema = recordParser.getSchema();
+        final ComponentLog log = getLogger();
 
-                        flowFile = session.putAttribute(flowFile, PUT_DATABASE_RECORD_ERROR, e.getMessage());
-                        flowFile = session.penalize(flowFile);
-                        session.transfer(flowFile, REL_FAILURE);
-                    }
+        final String catalog = context.getProperty(CATALOG_NAME).evaluateAttributeExpressions(flowFile).getValue();
+        final String schemaName = context.getProperty(SCHEMA_NAME).evaluateAttributeExpressions(flowFile).getValue();
+        final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue();
+        final String updateKeys = context.getProperty(UPDATE_KEYS).evaluateAttributeExpressions(flowFile).getValue();
+        final SchemaKey schemaKey = new PutDatabaseRecord.SchemaKey(catalog, schemaName, tableName);
+
+        // Ensure the table name has been set, the generated SQL statements (and TableSchema cache) will need it
+        if (StringUtils.isEmpty(tableName)) {
+            throw new IllegalArgumentException(format("Cannot process %s because Table Name is null or empty", flowFile));
+        }
+
+        // Always get the primary keys if Update Keys is empty. Otherwise if we have an Insert statement first, the table will be
+        // cached but the primary keys will not be retrieved, causing future UPDATE statements to not have primary keys available
+        final boolean includePrimaryKeys = updateKeys == null;
+
+        // get the database schema from the cache, if one exists. We do this in a synchronized block, rather than
+        // using a ConcurrentMap because the Map that we are using is a LinkedHashMap with a capacity such that if
+        // the Map grows beyond this capacity, old elements are evicted. We do this in order to avoid filling the
+        // Java Heap if there are a lot of different SQL statements being generated that reference different tables.
+        TableSchema tableSchema;
+        synchronized (this) {
+            tableSchema = schemaCache.get(schemaKey);
+            if (tableSchema == null) {
+                // No schema exists for this table yet. Query the database to determine the schema and put it into the cache.
+                tableSchema = TableSchema.from(con, catalog, schemaName, tableName, settings.translateFieldNames, includePrimaryKeys);
+                schemaCache.put(schemaKey, tableSchema);
+            }
+        }
+        if (tableSchema == null) {
+            throw new IllegalArgumentException("No table schema specified!");
+        }
+
+        // build the fully qualified table name
+        final StringBuilder tableNameBuilder = new StringBuilder();
+        if (catalog != null) {
+            tableNameBuilder.append(catalog).append(".");
+        }
+        if (schemaName != null) {
+            tableNameBuilder.append(schemaName).append(".");
+        }
+        tableNameBuilder.append(tableName);
+        final String fqTableName = tableNameBuilder.toString();
+
+        if (recordSchema == null) {
+            throw new IllegalArgumentException("No record schema specified!");
+        }
+
+        final SqlAndIncludedColumns sqlHolder;
+        if (INSERT_TYPE.equalsIgnoreCase(statementType)) {
+            sqlHolder = generateInsert(recordSchema, fqTableName, tableSchema, settings);
+
+        } else if (UPDATE_TYPE.equalsIgnoreCase(statementType)) {
+            sqlHolder = generateUpdate(recordSchema, fqTableName, updateKeys, tableSchema, settings);
+
+        } else if (DELETE_TYPE.equalsIgnoreCase(statementType)) {
+            sqlHolder = generateDelete(recordSchema, fqTableName, tableSchema, settings);
+
+        } else {
+            throw new IllegalArgumentException(format("Statement Type %s is not valid, FlowFile %s", statementType, flowFile));
+        }
+
+        try (PreparedStatement ps = con.prepareStatement(sqlHolder.getSql())) {
+
+            final int queryTimeout = functionContext.queryTimeout;
+            try {
+                ps.setQueryTimeout(queryTimeout); // timeout in seconds
+            } catch (SQLException se) {
+                // If the driver doesn't support query timeout, then assume it is "infinite". Allow a timeout of zero only
+                if (queryTimeout > 0) {
+                    throw se;
                 }
-            } finally {
-                try {
-                    con.commit();
-                } finally {
-                    // make sure that we try to set the auto commit back to whatever it was.
-                    if (originalAutoCommit) {
-                        try {
-                            con.setAutoCommit(originalAutoCommit);
-                        } catch (final SQLException se) {
-                            // Nothing to do if it didn't work, indicates an issue with the driver
+            }
+
+            Record currentRecord;
+            List<Integer> fieldIndexes = sqlHolder.getFieldIndexes();
+
+            while ((currentRecord = recordParser.nextRecord()) != null) {
+                Object[] values = currentRecord.getValues();
+                if (values != null) {
+                    if (fieldIndexes != null) {
+                        for (int i = 0; i < fieldIndexes.size(); i++) {
+                            ps.setObject(i + 1, values[fieldIndexes.get(i)]);
+                        }
+                    } else {
+                        // If there's no index map, assume all values are included and set them in order
+                        for (int i = 0; i < values.length; i++) {
+                            ps.setObject(i + 1, values[i]);
                         }
                     }
+                    ps.addBatch();
                 }
             }
-        } catch (final ProcessException | SQLException e) {
-            log.error("Error occurred during processing, yielding the processor", e);
-            context.yield();
+
+            log.debug("Executing query {}", new Object[]{sqlHolder});
+            ps.executeBatch();
+            result.routeTo(flowFile, REL_SUCCESS);
+            session.getProvenanceReporter().send(flowFile, functionContext.jdbcUrl);
+
         }
     }
 
+    @Override
+    public void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) throws ProcessException {
+
+        final Boolean rollbackOnFailure = context.getProperty(RollbackOnFailure.ROLLBACK_ON_FAILURE).asBoolean();
+        final Integer queryTimeout = context.getProperty(QUERY_TIMEOUT).evaluateAttributeExpressions().asTimePeriod(TimeUnit.SECONDS).intValue();
+
+        final FunctionContext functionContext = new FunctionContext(rollbackOnFailure, queryTimeout);
+
+        RollbackOnFailure.onTrigger(context, sessionFactory, functionContext, getLogger(), session -> process.onTrigger(context, session, functionContext));
+
+    }
+
     private Set<String> getNormalizedColumnNames(final RecordSchema schema, final boolean translateFieldNames) {
         final Set<String> normalizedFieldNames = new HashSet<>();
         if (schema != null) {
@@ -636,27 +686,19 @@ public class PutDatabaseRecord extends AbstractProcessor {
         return normalizedFieldNames;
     }
 
-    SqlAndIncludedColumns generateInsert(final RecordSchema recordSchema, final String tableName,
-                                         final TableSchema tableSchema, final boolean translateFieldNames, final boolean ignoreUnmappedFields, final boolean failUnmappedColumns,
-                                         final boolean warningUnmappedColumns, boolean escapeColumnNames, boolean quoteTableName) {
-
-        if (recordSchema == null) {
-            throw new ProcessException("No table schema specified!");
-        }
-        if (tableSchema == null) {
-            throw new ProcessException("No table schema specified!");
-        }
+    SqlAndIncludedColumns generateInsert(final RecordSchema recordSchema, final String tableName, final TableSchema tableSchema, final DMLSettings settings)
+            throws IllegalArgumentException, SQLException {
 
-        final Set<String> normalizedFieldNames = getNormalizedColumnNames(recordSchema, translateFieldNames);
+        final Set<String> normalizedFieldNames = getNormalizedColumnNames(recordSchema, settings.translateFieldNames);
 
         for (final String requiredColName : tableSchema.getRequiredColumnNames()) {
-            final String normalizedColName = normalizeColumnName(requiredColName, translateFieldNames);
+            final String normalizedColName = normalizeColumnName(requiredColName, settings.translateFieldNames);
             if (!normalizedFieldNames.contains(normalizedColName)) {
                 String missingColMessage = "Record does not have a value for the Required column '" + requiredColName + "'";
-                if (failUnmappedColumns) {
+                if (settings.failUnmappedColumns) {
                     getLogger().error(missingColMessage);
-                    throw new ProcessException(missingColMessage);
-                } else if (warningUnmappedColumns) {
+                    throw new IllegalArgumentException(missingColMessage);
+                } else if (settings.warningUnmappedColumns) {
                     getLogger().warn(missingColMessage);
                 }
             }
@@ -664,7 +706,7 @@ public class PutDatabaseRecord extends AbstractProcessor {
 
         final StringBuilder sqlBuilder = new StringBuilder();
         sqlBuilder.append("INSERT INTO ");
-        if (quoteTableName) {
+        if (settings.quoteTableName) {
             sqlBuilder.append(tableSchema.getQuotedIdentifierString())
                     .append(tableName)
                     .append(tableSchema.getQuotedIdentifierString());
@@ -680,14 +722,13 @@ public class PutDatabaseRecord extends AbstractProcessor {
             int fieldCount = fieldNames.size();
             AtomicInteger fieldsFound = new AtomicInteger(0);
 
-            IntStream.range(0, fieldCount).forEach((i) -> {
-
+            for (int i = 0; i < fieldCount; i++) {
                 RecordField field = recordSchema.getField(i);
                 String fieldName = field.getFieldName();
 
-                final ColumnDescription desc = tableSchema.getColumns().get(normalizeColumnName(fieldName, translateFieldNames));
-                if (desc == null && !ignoreUnmappedFields) {
-                    throw new ProcessException("Cannot map field '" + fieldName + "' to any column in the database");
+                final ColumnDescription desc = tableSchema.getColumns().get(normalizeColumnName(fieldName, settings.translateFieldNames));
+                if (desc == null && !settings.ignoreUnmappedFields) {
+                    throw new SQLDataException("Cannot map field '" + fieldName + "' to any column in the database");
                 }
 
                 if (desc != null) {
@@ -695,7 +736,7 @@ public class PutDatabaseRecord extends AbstractProcessor {
                         sqlBuilder.append(", ");
                     }
 
-                    if (escapeColumnNames) {
+                    if (settings.escapeColumnNames) {
                         sqlBuilder.append(tableSchema.getQuotedIdentifierString())
                                 .append(desc.getColumnName())
                                 .append(tableSchema.getQuotedIdentifierString());
@@ -704,7 +745,7 @@ public class PutDatabaseRecord extends AbstractProcessor {
                     }
                     includedColumns.add(i);
                 }
-            });
+            }
 
             // complete the SQL statements by adding ?'s for all of the values to be escaped.
             sqlBuilder.append(") VALUES (");
@@ -712,22 +753,15 @@ public class PutDatabaseRecord extends AbstractProcessor {
             sqlBuilder.append(")");
 
             if (fieldsFound.get() == 0) {
-                throw new ProcessException("None of the fields in the record map to the columns defined by the " + tableName + " table");
+                throw new SQLDataException("None of the fields in the record map to the columns defined by the " + tableName + " table");
             }
         }
         return new SqlAndIncludedColumns(sqlBuilder.toString(), includedColumns);
     }
 
     SqlAndIncludedColumns generateUpdate(final RecordSchema recordSchema, final String tableName, final String updateKeys,
-                                         final TableSchema tableSchema, final boolean translateFieldNames, final boolean ignoreUnmappedFields, final boolean failUnmappedColumns,
-                                         final boolean warningUnmappedColumns, boolean escapeColumnNames, boolean quoteTableName) {
-
-        if (recordSchema == null) {
-            throw new ProcessException("No table schema specified!");
-        }
-        if (tableSchema == null) {
-            throw new ProcessException("No table schema specified!");
-        }
+                                         final TableSchema tableSchema, final DMLSettings settings)
+            throws IllegalArgumentException, MalformedRecordException, SQLException {
 
         final Set<String> updateKeyNames;
         if (updateKeys == null) {
@@ -740,12 +774,12 @@ public class PutDatabaseRecord extends AbstractProcessor {
         }
 
         if (updateKeyNames.isEmpty()) {
-            throw new ProcessException("Table '" + tableName + "' does not have a Primary Key and no Update Keys were specified");
+            throw new SQLIntegrityConstraintViolationException("Table '" + tableName + "' does not have a Primary Key and no Update Keys were specified");
         }
 
         final StringBuilder sqlBuilder = new StringBuilder();
         sqlBuilder.append("UPDATE ");
-        if (quoteTableName) {
+        if (settings.quoteTableName) {
             sqlBuilder.append(tableSchema.getQuotedIdentifierString())
                     .append(tableName)
                     .append(tableSchema.getQuotedIdentifierString());
@@ -755,18 +789,18 @@ public class PutDatabaseRecord extends AbstractProcessor {
 
         // Create a Set of all normalized Update Key names, and ensure that there is a field in the record
         // for each of the Update Key fields.
-        final Set<String> normalizedFieldNames = getNormalizedColumnNames(recordSchema, translateFieldNames);
+        final Set<String> normalizedFieldNames = getNormalizedColumnNames(recordSchema, settings.translateFieldNames);
         final Set<String> normalizedUpdateNames = new HashSet<>();
         for (final String uk : updateKeyNames) {
-            final String normalizedUK = normalizeColumnName(uk, translateFieldNames);
+            final String normalizedUK = normalizeColumnName(uk, settings.translateFieldNames);
             normalizedUpdateNames.add(normalizedUK);
 
             if (!normalizedFieldNames.contains(normalizedUK)) {
                 String missingColMessage = "Record does not have a value for the " + (updateKeys == null ? "Primary" : "Update") + "Key column '" + uk + "'";
-                if (failUnmappedColumns) {
+                if (settings.failUnmappedColumns) {
                     getLogger().error(missingColMessage);
-                    throw new ProcessException(missingColMessage);
-                } else if (warningUnmappedColumns) {
+                    throw new MalformedRecordException(missingColMessage);
+                } else if (settings.warningUnmappedColumns) {
                     getLogger().warn(missingColMessage);
                 }
             }
@@ -781,18 +815,17 @@ public class PutDatabaseRecord extends AbstractProcessor {
             int fieldCount = fieldNames.size();
             AtomicInteger fieldsFound = new AtomicInteger(0);
 
-            IntStream.range(0, fieldCount).forEach((i) -> {
-
+            for (int i = 0; i < fieldCount; i++) {
                 RecordField field = recordSchema.getField(i);
                 String fieldName = field.getFieldName();
 
-                final String normalizedColName = normalizeColumnName(fieldName, translateFieldNames);
-                final ColumnDescription desc = tableSchema.getColumns().get(normalizeColumnName(fieldName, translateFieldNames));
+                final String normalizedColName = normalizeColumnName(fieldName, settings.translateFieldNames);
+                final ColumnDescription desc = tableSchema.getColumns().get(normalizeColumnName(fieldName, settings.translateFieldNames));
                 if (desc == null) {
-                    if (!ignoreUnmappedFields) {
-                        throw new ProcessException("Cannot map field '" + fieldName + "' to any column in the database");
+                    if (!settings.ignoreUnmappedFields) {
+                        throw new SQLDataException("Cannot map field '" + fieldName + "' to any column in the database");
                     } else {
-                        return;
+                        continue;
                     }
                 }
 
@@ -803,7 +836,7 @@ public class PutDatabaseRecord extends AbstractProcessor {
                         sqlBuilder.append(", ");
                     }
 
-                    if (escapeColumnNames) {
+                    if (settings.escapeColumnNames) {
                         sqlBuilder.append(tableSchema.getQuotedIdentifierString())
                                 .append(desc.getColumnName())
                                 .append(tableSchema.getQuotedIdentifierString());
@@ -814,19 +847,19 @@ public class PutDatabaseRecord extends AbstractProcessor {
                     sqlBuilder.append(" = ?");
                     includedColumns.add(i);
                 }
-            });
+            }
 
             // Set the WHERE clause based on the Update Key values
             sqlBuilder.append(" WHERE ");
             AtomicInteger whereFieldCount = new AtomicInteger(0);
 
-            IntStream.range(0, fieldCount).forEach((i) -> {
+            for (int i = 0; i < fieldCount; i++) {
 
                 RecordField field = recordSchema.getField(i);
                 String fieldName = field.getFieldName();
 
-                final String normalizedColName = normalizeColumnName(fieldName, translateFieldNames);
-                final ColumnDescription desc = tableSchema.getColumns().get(normalizeColumnName(fieldName, translateFieldNames));
+                final String normalizedColName = normalizeColumnName(fieldName, settings.translateFieldNames);
+                final ColumnDescription desc = tableSchema.getColumns().get(normalizeColumnName(fieldName, settings.translateFieldNames));
                 if (desc != null) {
 
                     // Check if this column is a Update Key. If so, add it to the WHERE clause
@@ -836,7 +869,7 @@ public class PutDatabaseRecord extends AbstractProcessor {
                             sqlBuilder.append(" AND ");
                         }
 
-                        if (escapeColumnNames) {
+                        if (settings.escapeColumnNames) {
                             sqlBuilder.append(tableSchema.getQuotedIdentifierString())
                                     .append(normalizedColName)
                                     .append(tableSchema.getQuotedIdentifierString());
@@ -847,31 +880,23 @@ public class PutDatabaseRecord extends AbstractProcessor {
                         includedColumns.add(i);
                     }
                 }
-            });
+            }
         }
         return new SqlAndIncludedColumns(sqlBuilder.toString(), includedColumns);
     }
 
-    SqlAndIncludedColumns generateDelete(final RecordSchema recordSchema, final String tableName,
-                                         final TableSchema tableSchema, final boolean translateFieldNames, final boolean ignoreUnmappedFields, final boolean failUnmappedColumns,
-                                         final boolean warningUnmappedColumns, boolean escapeColumnNames, boolean quoteTableName) {
+    SqlAndIncludedColumns generateDelete(final RecordSchema recordSchema, final String tableName, final TableSchema tableSchema, final DMLSettings settings)
+            throws IllegalArgumentException, MalformedRecordException, SQLDataException {
 
-        if (recordSchema == null) {
-            throw new ProcessException("No table schema specified!");
-        }
-        if (tableSchema == null) {
-            throw new ProcessException("No table schema specified!");
-        }
-
-        final Set<String> normalizedFieldNames = getNormalizedColumnNames(recordSchema, translateFieldNames);
+        final Set<String> normalizedFieldNames = getNormalizedColumnNames(recordSchema, settings.translateFieldNames);
         for (final String requiredColName : tableSchema.getRequiredColumnNames()) {
-            final String normalizedColName = normalizeColumnName(requiredColName, translateFieldNames);
+            final String normalizedColName = normalizeColumnName(requiredColName, settings.translateFieldNames);
             if (!normalizedFieldNames.contains(normalizedColName)) {
                 String missingColMessage = "Record does not have a value for the Required column '" + requiredColName + "'";
-                if (failUnmappedColumns) {
+                if (settings.failUnmappedColumns) {
                     getLogger().error(missingColMessage);
-                    throw new ProcessException(missingColMessage);
-                } else if (warningUnmappedColumns) {
+                    throw new MalformedRecordException(missingColMessage);
+                } else if (settings.warningUnmappedColumns) {
                     getLogger().warn(missingColMessage);
                 }
             }
@@ -879,7 +904,7 @@ public class PutDatabaseRecord extends AbstractProcessor {
 
         final StringBuilder sqlBuilder = new StringBuilder();
         sqlBuilder.append("DELETE FROM ");
-        if (quoteTableName) {
+        if (settings.quoteTableName) {
             sqlBuilder.append(tableSchema.getQuotedIdentifierString())
                     .append(tableName)
                     .append(tableSchema.getQuotedIdentifierString());
@@ -895,14 +920,14 @@ public class PutDatabaseRecord extends AbstractProcessor {
             int fieldCount = fieldNames.size();
             AtomicInteger fieldsFound = new AtomicInteger(0);
 
-            IntStream.range(0, fieldCount).forEach((i) -> {
+            for (int i = 0; i < fieldCount; i++) {
 
                 RecordField field = recordSchema.getField(i);
                 String fieldName = field.getFieldName();
 
-                final ColumnDescription desc = tableSchema.getColumns().get(normalizeColumnName(fieldName, translateFieldNames));
-                if (desc == null && !ignoreUnmappedFields) {
-                    throw new ProcessException("Cannot map field '" + fieldName + "' to any column in the database");
+                final ColumnDescription desc = tableSchema.getColumns().get(normalizeColumnName(fieldName, settings.translateFieldNames));
+                if (desc == null && !settings.ignoreUnmappedFields) {
+                    throw new SQLDataException("Cannot map field '" + fieldName + "' to any column in the database");
                 }
 
                 if (desc != null) {
@@ -910,7 +935,7 @@ public class PutDatabaseRecord extends AbstractProcessor {
                         sqlBuilder.append(" AND ");
                     }
 
-                    if (escapeColumnNames) {
+                    if (settings.escapeColumnNames) {
                         sqlBuilder.append(tableSchema.getQuotedIdentifierString())
                                 .append(desc.getColumnName())
                                 .append(tableSchema.getQuotedIdentifierString());
@@ -921,10 +946,10 @@ public class PutDatabaseRecord extends AbstractProcessor {
                     includedColumns.add(i);
 
                 }
-            });
+            }
 
             if (fieldsFound.get() == 0) {
-                throw new ProcessException("None of the fields in the record map to the columns defined by the " + tableName + " table");
+                throw new SQLDataException("None of the fields in the record map to the columns defined by the " + tableName + " table");
             }
         }
 
@@ -985,7 +1010,7 @@ public class PutDatabaseRecord extends AbstractProcessor {
 
                 final Set<String> primaryKeyColumns = new HashSet<>();
                 if (includePrimaryKeys) {
-                    try (final ResultSet pkrs = conn.getMetaData().getPrimaryKeys(catalog, null, tableName)) {
+                    try (final ResultSet pkrs = dmd.getPrimaryKeys(catalog, null, tableName)) {
 
                         while (pkrs.next()) {
                             final String colName = pkrs.getString("COLUMN_NAME");