You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2017/04/27 17:56:56 UTC
[2/5] nifi git commit: NIFI-3415: Add Rollback on Failure.
http://git-wip-us.apache.org/repos/asf/nifi/blob/d9acdb54/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
index fd414c4..0f65014 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
@@ -31,13 +31,19 @@ import org.apache.nifi.dbcp.DBCPService;
import org.apache.nifi.expression.AttributeExpression;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
-import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
-import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.processor.util.pattern.ErrorTypes;
+import org.apache.nifi.processor.util.pattern.ExceptionHandler;
+import org.apache.nifi.processor.util.pattern.PartialFunctions;
+import org.apache.nifi.processor.util.pattern.Put;
+import org.apache.nifi.processor.util.pattern.RollbackOnFailure;
+import org.apache.nifi.processor.util.pattern.RoutingResult;
import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
@@ -53,7 +59,9 @@ import java.sql.DatabaseMetaData;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
+import java.sql.SQLDataException;
import java.sql.SQLException;
+import java.sql.SQLIntegrityConstraintViolationException;
import java.sql.SQLNonTransientException;
import java.sql.Statement;
import java.util.ArrayList;
@@ -66,7 +74,8 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
-import java.util.stream.IntStream;
+
+import static java.lang.String.format;
@EventDriven
@@ -83,7 +92,7 @@ import java.util.stream.IntStream;
+ "will be used to determine the type of statement (INSERT, UPDATE, DELETE, SQL, etc.) to generate and execute.")
@WritesAttribute(attribute = PutDatabaseRecord.PUT_DATABASE_RECORD_ERROR, description = "If an error occurs during processing, the flow file will be routed to failure or retry, and this attribute "
+ "will be populated with the cause of the error.")
-public class PutDatabaseRecord extends AbstractProcessor {
+public class PutDatabaseRecord extends AbstractSessionFactoryProcessor {
static final String UPDATE_TYPE = "UPDATE";
static final String INSERT_TYPE = "INSERT";
@@ -289,10 +298,13 @@ public class PutDatabaseRecord extends AbstractProcessor {
pds.add(QUOTED_IDENTIFIERS);
pds.add(QUOTED_TABLE_IDENTIFIER);
pds.add(QUERY_TIMEOUT);
+ pds.add(RollbackOnFailure.ROLLBACK_ON_FAILURE);
propDescriptors = Collections.unmodifiableList(pds);
}
+ private Put<FunctionContext, Connection> process;
+ private ExceptionHandler<FunctionContext> exceptionHandler;
@Override
public Set<Relationship> getRelationships() {
@@ -316,318 +328,356 @@ public class PutDatabaseRecord extends AbstractProcessor {
.build();
}
+ private final PartialFunctions.InitConnection<FunctionContext, Connection> initConnection = (c, s, fc) -> {
+ final Connection connection = c.getProperty(DBCP_SERVICE).asControllerService(DBCPService.class).getConnection();
+ try {
+ fc.originalAutoCommit = connection.getAutoCommit();
+ connection.setAutoCommit(false);
+
+ String jdbcUrl = "DBCPService";
+ try {
+ DatabaseMetaData databaseMetaData = connection.getMetaData();
+ if (databaseMetaData != null) {
+ jdbcUrl = databaseMetaData.getURL();
+ }
+ } catch (SQLException se) {
+ // Ignore and use default JDBC URL. This shouldn't happen unless the driver doesn't implement getMetaData() properly
+ } finally {
+ fc.jdbcUrl = jdbcUrl;
+ }
+
+ } catch (SQLException e) {
+ throw new ProcessException("Failed to disable auto commit due to " + e, e);
+ }
+ return connection;
+ };
+
+ private final Put.PutFlowFile<FunctionContext, Connection> putFlowFile = (context, session, functionContext, conn, flowFile, result) -> {
+
+ exceptionHandler.execute(functionContext, flowFile, inputFlowFile -> {
+
+ // Get the statement type from the attribute if necessary
+ final String statementTypeProperty = context.getProperty(STATEMENT_TYPE).getValue();
+ String statementType = statementTypeProperty;
+ if (USE_ATTR_TYPE.equals(statementTypeProperty)) {
+ statementType = inputFlowFile.getAttribute(STATEMENT_TYPE_ATTRIBUTE);
+ }
+ if (StringUtils.isEmpty(statementType)) {
+ final String msg = format("Statement Type is not specified, FlowFile %s", inputFlowFile);
+ throw new IllegalArgumentException(msg);
+ }
+
+
+ try (final InputStream in = session.read(inputFlowFile)) {
+
+ final RecordReaderFactory recordParserFactory = context.getProperty(RECORD_READER_FACTORY)
+ .asControllerService(RecordReaderFactory.class);
+ final RecordReader recordParser = recordParserFactory.createRecordReader(inputFlowFile, in, getLogger());
+
+ if (SQL_TYPE.equalsIgnoreCase(statementType)) {
+ executeSQL(context, session, inputFlowFile, functionContext, result, conn, recordParser);
+
+ } else {
+ final DMLSettings settings = new DMLSettings(context);
+ executeDML(context, session, inputFlowFile, functionContext, result, conn, recordParser, statementType, settings);
+ }
+ }
+
+ }, (fc, inputFlowFile, r, e) -> {
+
+ getLogger().warn("Failed to process {} due to {}", new Object[]{inputFlowFile, e}, e);
+
+ if (e instanceof BatchUpdateException) {
+ try {
+ // Although process session will move forward in order to route the failed FlowFile,
+ // database transaction should be rolled back to avoid partial batch update.
+ conn.rollback();
+ } catch (SQLException re) {
+ getLogger().error("Failed to rollback database due to {}, transaction may be incomplete.", new Object[]{re}, re);
+ }
+ }
+
+ // Embed Exception detail to FlowFile attribute then delegate error handling to default and rollbackOnFailure.
+ final FlowFile flowFileWithAttributes = session.putAttribute(inputFlowFile, PUT_DATABASE_RECORD_ERROR, e.getMessage());
+ final ExceptionHandler.OnError<FunctionContext, FlowFile> defaultOnError = ExceptionHandler.createOnError(context, session, result, REL_FAILURE, REL_RETRY);
+ final ExceptionHandler.OnError<FunctionContext, FlowFile> rollbackOnFailure = RollbackOnFailure.createOnError(defaultOnError);
+ rollbackOnFailure.apply(fc, flowFileWithAttributes, r, e);
+ });
+ };
+
+
@OnScheduled
public void onScheduled(final ProcessContext context) {
synchronized (this) {
schemaCache.clear();
}
+
+ process = new Put<>();
+
+ process.setLogger(getLogger());
+ process.initConnection(initConnection);
+ process.putFlowFile(putFlowFile);
+ process.adjustRoute(RollbackOnFailure.createAdjustRoute(REL_FAILURE, REL_RETRY));
+
+ process.onCompleted((c, s, fc, conn) -> {
+ try {
+ conn.commit();
+ } catch (SQLException e) {
+ // Throw ProcessException to rollback process session.
+ throw new ProcessException("Failed to commit database connection due to " + e, e);
+ }
+ });
+
+ process.onFailed((c, s, fc, conn, e) -> {
+ try {
+ conn.rollback();
+ } catch (SQLException re) {
+ // Just log the fact that rollback failed.
+ // ProcessSession will be rollback by the thrown Exception so don't have to do anything here.
+ getLogger().warn("Failed to rollback database connection due to %s", new Object[]{re}, re);
+ }
+ });
+
+ process.cleanup((c, s, fc, conn) -> {
+ // make sure that we try to set the auto commit back to whatever it was.
+ if (fc.originalAutoCommit) {
+ try {
+ conn.setAutoCommit(true);
+ } catch (final SQLException se) {
+ getLogger().warn("Failed to reset autocommit due to {}", new Object[]{se});
+ }
+ }
+ });
+
+ exceptionHandler = new ExceptionHandler<>();
+ exceptionHandler.mapException(s -> {
+
+ try {
+ if (s == null) {
+ return ErrorTypes.PersistentFailure;
+ }
+ throw s;
+
+ } catch (IllegalArgumentException
+ |MalformedRecordException
+ |SQLNonTransientException e) {
+ return ErrorTypes.InvalidInput;
+
+ } catch (IOException
+ |SQLException e) {
+ return ErrorTypes.TemporalFailure;
+
+ } catch (Exception e) {
+ return ErrorTypes.UnknownFailure;
+ }
+
+ });
+ exceptionHandler.adjustError(RollbackOnFailure.createAdjustError(getLogger()));
}
- @Override
- public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+ private static class FunctionContext extends RollbackOnFailure {
+ private final int queryTimeout;
+ private boolean originalAutoCommit = false;
+ private String jdbcUrl;
- FlowFile flowFile = session.get();
- if (flowFile == null) {
- return;
+ public FunctionContext(boolean rollbackOnFailure, int queryTimeout) {
+ super(rollbackOnFailure, true);
+ this.queryTimeout = queryTimeout;
}
+ }
- final ComponentLog log = getLogger();
-
- final RecordReaderFactory recordParserFactory = context.getProperty(RECORD_READER_FACTORY)
- .asControllerService(RecordReaderFactory.class);
- final String statementTypeProperty = context.getProperty(STATEMENT_TYPE).getValue();
- final DBCPService dbcpService = context.getProperty(DBCP_SERVICE).asControllerService(DBCPService.class);
- final boolean translateFieldNames = context.getProperty(TRANSLATE_FIELD_NAMES).asBoolean();
- final boolean ignoreUnmappedFields = IGNORE_UNMATCHED_FIELD.getValue().equalsIgnoreCase(context.getProperty(UNMATCHED_FIELD_BEHAVIOR).getValue());
- final Integer queryTimeout = context.getProperty(QUERY_TIMEOUT).evaluateAttributeExpressions().asTimePeriod(TimeUnit.SECONDS).intValue();
+ static class DMLSettings {
+ private final boolean translateFieldNames;
+ private final boolean ignoreUnmappedFields;
// Is the unmatched column behaviour fail or warning?
- final boolean failUnmappedColumns = FAIL_UNMATCHED_COLUMN.getValue().equalsIgnoreCase(context.getProperty(UNMATCHED_COLUMN_BEHAVIOR).getValue());
- final boolean warningUnmappedColumns = WARNING_UNMATCHED_COLUMN.getValue().equalsIgnoreCase(context.getProperty(UNMATCHED_COLUMN_BEHAVIOR).getValue());
+ private final boolean failUnmappedColumns;
+ private final boolean warningUnmappedColumns;
// Escape column names?
- final boolean escapeColumnNames = context.getProperty(QUOTED_IDENTIFIERS).asBoolean();
+ private final boolean escapeColumnNames;
// Quote table name?
- final boolean quoteTableName = context.getProperty(QUOTED_TABLE_IDENTIFIER).asBoolean();
+ private final boolean quoteTableName;
+
+ private DMLSettings(ProcessContext context) {
+ translateFieldNames = context.getProperty(TRANSLATE_FIELD_NAMES).asBoolean();
+ ignoreUnmappedFields = IGNORE_UNMATCHED_FIELD.getValue().equalsIgnoreCase(context.getProperty(UNMATCHED_FIELD_BEHAVIOR).getValue());
+
+ failUnmappedColumns = FAIL_UNMATCHED_COLUMN.getValue().equalsIgnoreCase(context.getProperty(UNMATCHED_COLUMN_BEHAVIOR).getValue());
+ warningUnmappedColumns = WARNING_UNMATCHED_COLUMN.getValue().equalsIgnoreCase(context.getProperty(UNMATCHED_COLUMN_BEHAVIOR).getValue());
+
+ escapeColumnNames = context.getProperty(QUOTED_IDENTIFIERS).asBoolean();
+ quoteTableName = context.getProperty(QUOTED_TABLE_IDENTIFIER).asBoolean();
+ }
+
+ }
- try (final Connection con = dbcpService.getConnection()) {
+ private void executeSQL(ProcessContext context, ProcessSession session,
+ FlowFile flowFile, FunctionContext functionContext, RoutingResult result,
+ Connection con, RecordReader recordParser)
+ throws IllegalArgumentException, MalformedRecordException, IOException, SQLException {
+
+ final RecordSchema recordSchema = recordParser.getSchema();
+
+ // Find which field has the SQL statement in it
+ final String sqlField = context.getProperty(FIELD_CONTAINING_SQL).evaluateAttributeExpressions(flowFile).getValue();
+ if (StringUtils.isEmpty(sqlField)) {
+ throw new IllegalArgumentException(format("SQL specified as Statement Type but no Field Containing SQL was found, FlowFile %s", flowFile));
+ }
+
+ boolean schemaHasSqlField = recordSchema.getFields().stream().anyMatch((field) -> sqlField.equals(field.getFieldName()));
+ if (!schemaHasSqlField) {
+ throw new IllegalArgumentException(format("Record schema does not contain Field Containing SQL: %s, FlowFile %s", sqlField, flowFile));
+ }
+
+ try (Statement s = con.createStatement()) {
- final boolean originalAutoCommit = con.getAutoCommit();
try {
- con.setAutoCommit(false);
+ s.setQueryTimeout(functionContext.queryTimeout); // timeout in seconds
+ } catch (SQLException se) {
+ // If the driver doesn't support query timeout, then assume it is "infinite". Allow a timeout of zero only
+ if (functionContext.queryTimeout > 0) {
+ throw se;
+ }
+ }
- String jdbcURL = "DBCPService";
- try {
- DatabaseMetaData databaseMetaData = con.getMetaData();
- if (databaseMetaData != null) {
- jdbcURL = databaseMetaData.getURL();
- }
- } catch (SQLException se) {
- // Ignore and use default JDBC URL. This shouldn't happen unless the driver doesn't implement getMetaData() properly
+ Record currentRecord;
+ while ((currentRecord = recordParser.nextRecord()) != null) {
+ Object sql = currentRecord.getValue(sqlField);
+ if (sql == null || StringUtils.isEmpty((String) sql)) {
+ throw new MalformedRecordException(format("Record had no (or null) value for Field Containing SQL: %s, FlowFile %s", sqlField, flowFile));
}
- final String catalog = context.getProperty(CATALOG_NAME).evaluateAttributeExpressions(flowFile).getValue();
- final String schemaName = context.getProperty(SCHEMA_NAME).evaluateAttributeExpressions(flowFile).getValue();
- final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue();
- final String updateKeys = context.getProperty(UPDATE_KEYS).evaluateAttributeExpressions(flowFile).getValue();
- final SchemaKey schemaKey = new SchemaKey(catalog, schemaName, tableName);
+ // Execute the statement as-is
+ s.execute((String) sql);
+ }
+ result.routeTo(flowFile, REL_SUCCESS);
+ session.getProvenanceReporter().send(flowFile, functionContext.jdbcUrl);
+ }
+ }
- // Get the statement type from the attribute if necessary
- String statementType = statementTypeProperty;
- if (USE_ATTR_TYPE.equals(statementTypeProperty)) {
- statementType = flowFile.getAttribute(STATEMENT_TYPE_ATTRIBUTE);
- }
- if (StringUtils.isEmpty(statementType)) {
- log.error("Statement Type is not specified, flowfile {} will be penalized and routed to failure", new Object[]{flowFile});
- flowFile = session.putAttribute(flowFile, PUT_DATABASE_RECORD_ERROR, "Statement Type not specified");
- flowFile = session.penalize(flowFile);
- session.transfer(flowFile, REL_FAILURE);
- } else {
- RecordSchema recordSchema;
- try (final InputStream in = session.read(flowFile)) {
-
- final RecordReader recordParser = recordParserFactory.createRecordReader(flowFile, in, log);
- recordSchema = recordParser.getSchema();
-
- if (SQL_TYPE.equalsIgnoreCase(statementType)) {
-
- // Find which field has the SQL statement in it
- final String sqlField = context.getProperty(FIELD_CONTAINING_SQL).evaluateAttributeExpressions(flowFile).getValue();
- if (StringUtils.isEmpty(sqlField)) {
- log.error("SQL specified as Statement Type but no Field Containing SQL was found, flowfile {} will be penalized and routed to failure", new Object[]{flowFile});
- flowFile = session.putAttribute(flowFile, PUT_DATABASE_RECORD_ERROR, "Field Containing SQL not found");
- flowFile = session.penalize(flowFile);
- session.transfer(flowFile, REL_FAILURE);
- } else {
- boolean schemaHasSqlField = recordSchema.getFields().stream().anyMatch((field) -> sqlField.equals(field.getFieldName()));
- if (schemaHasSqlField) {
- try (Statement s = con.createStatement()) {
-
- try {
- s.setQueryTimeout(queryTimeout); // timeout in seconds
- } catch (SQLException se) {
- // If the driver doesn't support query timeout, then assume it is "infinite". Allow a timeout of zero only
- if (queryTimeout > 0) {
- throw se;
- }
- }
-
- Record currentRecord;
- while ((currentRecord = recordParser.nextRecord()) != null) {
- Object sql = currentRecord.getValue(sqlField);
- if (sql != null && !StringUtils.isEmpty((String) sql)) {
- // Execute the statement as-is
- s.execute((String) sql);
- } else {
- log.error("Record had no (or null) value for Field Containing SQL: {}, flowfile {} will be penalized and routed to failure",
- new Object[]{sqlField, flowFile});
- flowFile = session.putAttribute(flowFile, PUT_DATABASE_RECORD_ERROR, "Field Containing SQL missing value");
- flowFile = session.penalize(flowFile);
- session.transfer(flowFile, REL_FAILURE);
- return;
- }
- }
- session.transfer(flowFile, REL_SUCCESS);
- session.getProvenanceReporter().send(flowFile, jdbcURL);
- } catch (final SQLNonTransientException e) {
- log.error("Failed to update database for {} due to {}; rolling back database and routing to failure", new Object[]{flowFile, e}, e);
- try {
- con.rollback();
- } catch (SQLException se) {
- log.error("Failed to rollback database, transaction may be incomplete.", se);
- }
- flowFile = session.putAttribute(flowFile, PUT_DATABASE_RECORD_ERROR, e.getMessage());
- flowFile = session.penalize(flowFile);
- session.transfer(flowFile, REL_FAILURE);
- } catch (final SQLException e) {
- log.error("Failed to update database for {} due to {}; rolling back database. It is possible that retrying the operation will succeed, so routing to retry",
- new Object[]{flowFile, e}, e);
- try {
- con.rollback();
- } catch (SQLException se) {
- log.error("Failed to rollback database, transaction may be incomplete.", se);
- }
- flowFile = session.putAttribute(flowFile, PUT_DATABASE_RECORD_ERROR, e.getMessage());
- flowFile = session.penalize(flowFile);
- session.transfer(flowFile, REL_RETRY);
- }
- } else {
- log.error("Record schema does not contain Field Containing SQL: {}, flowfile {} will be penalized and routed to failure", new Object[]{sqlField, flowFile});
- flowFile = session.putAttribute(flowFile, PUT_DATABASE_RECORD_ERROR, "Record schema missing Field Containing SQL value");
- flowFile = session.penalize(flowFile);
- session.transfer(flowFile, REL_FAILURE);
- }
- }
+ private void executeDML(ProcessContext context, ProcessSession session, FlowFile flowFile,
+ FunctionContext functionContext, RoutingResult result, Connection con,
+ RecordReader recordParser, String statementType, DMLSettings settings)
+ throws IllegalArgumentException, MalformedRecordException, IOException, SQLException {
- } else {
- // Ensure the table name has been set, the generated SQL statements (and TableSchema cache) will need it
- if (StringUtils.isEmpty(tableName)) {
- log.error("Cannot process {} because Table Name is null or empty; penalizing and routing to failure", new Object[]{flowFile});
- flowFile = session.putAttribute(flowFile, PUT_DATABASE_RECORD_ERROR, "Table Name missing");
- flowFile = session.penalize(flowFile);
- session.transfer(flowFile, REL_FAILURE);
- return;
- }
-
- // Always get the primary keys if Update Keys is empty. Otherwise if we have an Insert statement first, the table will be
- // cached but the primary keys will not be retrieved, causing future UPDATE statements to not have primary keys available
- final boolean includePrimaryKeys = (updateKeys == null);
-
- // get the database schema from the cache, if one exists. We do this in a synchronized block, rather than
- // using a ConcurrentMap because the Map that we are using is a LinkedHashMap with a capacity such that if
- // the Map grows beyond this capacity, old elements are evicted. We do this in order to avoid filling the
- // Java Heap if there are a lot of different SQL statements being generated that reference different tables.
- TableSchema schema;
- synchronized (this) {
- schema = schemaCache.get(schemaKey);
- if (schema == null) {
- // No schema exists for this table yet. Query the database to determine the schema and put it into the cache.
- try (final Connection conn = dbcpService.getConnection()) {
- schema = TableSchema.from(conn, catalog, schemaName, tableName, translateFieldNames, includePrimaryKeys);
- schemaCache.put(schemaKey, schema);
- } catch (final SQLNonTransientException e) {
- log.error("Failed to update database for {} due to {}; routing to failure", new Object[]{flowFile, e}, e);
- flowFile = session.putAttribute(flowFile, PUT_DATABASE_RECORD_ERROR, e.getMessage());
- flowFile = session.penalize(flowFile);
- session.transfer(flowFile, REL_FAILURE);
- return;
- } catch (final SQLException e) {
- log.error("Failed to update database for {} due to {}; it is possible that retrying the operation will succeed, so routing to retry",
- new Object[]{flowFile, e}, e);
- flowFile = session.putAttribute(flowFile, PUT_DATABASE_RECORD_ERROR, e.getMessage());
- flowFile = session.penalize(flowFile);
- session.transfer(flowFile, REL_RETRY);
- return;
- }
- }
- }
-
- final SqlAndIncludedColumns sqlHolder;
- try {
- // build the fully qualified table name
- final StringBuilder tableNameBuilder = new StringBuilder();
- if (catalog != null) {
- tableNameBuilder.append(catalog).append(".");
- }
- if (schemaName != null) {
- tableNameBuilder.append(schemaName).append(".");
- }
- tableNameBuilder.append(tableName);
- final String fqTableName = tableNameBuilder.toString();
-
- if (INSERT_TYPE.equalsIgnoreCase(statementType)) {
- sqlHolder = generateInsert(recordSchema, fqTableName, schema, translateFieldNames, ignoreUnmappedFields,
- failUnmappedColumns, warningUnmappedColumns, escapeColumnNames, quoteTableName);
- } else if (UPDATE_TYPE.equalsIgnoreCase(statementType)) {
- sqlHolder = generateUpdate(recordSchema, fqTableName, updateKeys, schema, translateFieldNames, ignoreUnmappedFields,
- failUnmappedColumns, warningUnmappedColumns, escapeColumnNames, quoteTableName);
- } else if (DELETE_TYPE.equalsIgnoreCase(statementType)) {
- sqlHolder = generateDelete(recordSchema, fqTableName, schema, translateFieldNames, ignoreUnmappedFields,
- failUnmappedColumns, warningUnmappedColumns, escapeColumnNames, quoteTableName);
- } else {
- log.error("Statement Type {} is not valid, flowfile {} will be penalized and routed to failure", new Object[]{statementType, flowFile});
- flowFile = session.putAttribute(flowFile, PUT_DATABASE_RECORD_ERROR, "Statement Type invalid");
- flowFile = session.penalize(flowFile);
- session.transfer(flowFile, REL_FAILURE);
- return;
- }
- } catch (final ProcessException pe) {
- log.error("Failed to convert {} to a SQL {} statement due to {}; routing to failure",
- new Object[]{flowFile, statementType, pe.toString()}, pe);
- flowFile = session.putAttribute(flowFile, PUT_DATABASE_RECORD_ERROR, pe.getMessage());
- flowFile = session.penalize(flowFile);
- session.transfer(flowFile, REL_FAILURE);
- return;
- }
-
- try (PreparedStatement ps = con.prepareStatement(sqlHolder.getSql())) {
-
- try {
- ps.setQueryTimeout(queryTimeout); // timeout in seconds
- } catch (SQLException se) {
- // If the driver doesn't support query timeout, then assume it is "infinite". Allow a timeout of zero only
- if (queryTimeout > 0) {
- throw se;
- }
- }
-
- Record currentRecord;
- List<Integer> fieldIndexes = sqlHolder.getFieldIndexes();
-
- while ((currentRecord = recordParser.nextRecord()) != null) {
- Object[] values = currentRecord.getValues();
- if (values != null) {
- if (fieldIndexes != null) {
- for (int i = 0; i < fieldIndexes.size(); i++) {
- ps.setObject(i + 1, values[fieldIndexes.get(i)]);
- }
- } else {
- // If there's no index map, assume all values are included and set them in order
- for (int i = 0; i < values.length; i++) {
- ps.setObject(i + 1, values[i]);
- }
- }
- ps.addBatch();
- }
- }
-
- log.debug("Executing query {}", new Object[]{sqlHolder});
- ps.executeBatch();
- session.transfer(flowFile, REL_SUCCESS);
- session.getProvenanceReporter().send(flowFile, jdbcURL);
-
- } catch (final SQLNonTransientException | BatchUpdateException e) {
- log.error("Failed to update database for {} due to {}; rolling back database, routing to failure", new Object[]{flowFile, e}, e);
- try {
- con.rollback();
- } catch (SQLException se) {
- log.error("Failed to rollback database, transaction may be incomplete.", se);
- }
- flowFile = session.putAttribute(flowFile, PUT_DATABASE_RECORD_ERROR, e.getMessage());
- flowFile = session.penalize(flowFile);
- session.transfer(flowFile, REL_FAILURE);
- } catch (final SQLException e) {
- log.error("Failed to update database for {} due to {}; rolling back database. It is possible that retrying the operation will succeed, so routing to retry",
- new Object[]{flowFile, e}, e);
- try {
- con.rollback();
- } catch (SQLException se) {
- log.error("Failed to rollback database, transaction may be incomplete.", se);
- }
- flowFile = session.putAttribute(flowFile, PUT_DATABASE_RECORD_ERROR, e.getMessage());
- flowFile = session.penalize(flowFile);
- session.transfer(flowFile, REL_RETRY);
- }
- }
- } catch (final MalformedRecordException | SchemaNotFoundException | IOException e) {
- log.error("Failed to determine schema of data records for {}, routing to failure", new Object[]{flowFile}, e);
+ final RecordSchema recordSchema = recordParser.getSchema();
+ final ComponentLog log = getLogger();
- flowFile = session.putAttribute(flowFile, PUT_DATABASE_RECORD_ERROR, e.getMessage());
- flowFile = session.penalize(flowFile);
- session.transfer(flowFile, REL_FAILURE);
- }
+ final String catalog = context.getProperty(CATALOG_NAME).evaluateAttributeExpressions(flowFile).getValue();
+ final String schemaName = context.getProperty(SCHEMA_NAME).evaluateAttributeExpressions(flowFile).getValue();
+ final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue();
+ final String updateKeys = context.getProperty(UPDATE_KEYS).evaluateAttributeExpressions(flowFile).getValue();
+ final SchemaKey schemaKey = new PutDatabaseRecord.SchemaKey(catalog, schemaName, tableName);
+
+ // Ensure the table name has been set, the generated SQL statements (and TableSchema cache) will need it
+ if (StringUtils.isEmpty(tableName)) {
+ throw new IllegalArgumentException(format("Cannot process %s because Table Name is null or empty", flowFile));
+ }
+
+ // Always get the primary keys if Update Keys is empty. Otherwise if we have an Insert statement first, the table will be
+ // cached but the primary keys will not be retrieved, causing future UPDATE statements to not have primary keys available
+ final boolean includePrimaryKeys = updateKeys == null;
+
+ // get the database schema from the cache, if one exists. We do this in a synchronized block, rather than
+ // using a ConcurrentMap because the Map that we are using is a LinkedHashMap with a capacity such that if
+ // the Map grows beyond this capacity, old elements are evicted. We do this in order to avoid filling the
+ // Java Heap if there are a lot of different SQL statements being generated that reference different tables.
+ TableSchema tableSchema;
+ synchronized (this) {
+ tableSchema = schemaCache.get(schemaKey);
+ if (tableSchema == null) {
+ // No schema exists for this table yet. Query the database to determine the schema and put it into the cache.
+ tableSchema = TableSchema.from(con, catalog, schemaName, tableName, settings.translateFieldNames, includePrimaryKeys);
+ schemaCache.put(schemaKey, tableSchema);
+ }
+ }
+ if (tableSchema == null) {
+ throw new IllegalArgumentException("No table schema specified!");
+ }
+
+ // build the fully qualified table name
+ final StringBuilder tableNameBuilder = new StringBuilder();
+ if (catalog != null) {
+ tableNameBuilder.append(catalog).append(".");
+ }
+ if (schemaName != null) {
+ tableNameBuilder.append(schemaName).append(".");
+ }
+ tableNameBuilder.append(tableName);
+ final String fqTableName = tableNameBuilder.toString();
+
+ if (recordSchema == null) {
+ throw new IllegalArgumentException("No record schema specified!");
+ }
+
+ final SqlAndIncludedColumns sqlHolder;
+ if (INSERT_TYPE.equalsIgnoreCase(statementType)) {
+ sqlHolder = generateInsert(recordSchema, fqTableName, tableSchema, settings);
+
+ } else if (UPDATE_TYPE.equalsIgnoreCase(statementType)) {
+ sqlHolder = generateUpdate(recordSchema, fqTableName, updateKeys, tableSchema, settings);
+
+ } else if (DELETE_TYPE.equalsIgnoreCase(statementType)) {
+ sqlHolder = generateDelete(recordSchema, fqTableName, tableSchema, settings);
+
+ } else {
+ throw new IllegalArgumentException(format("Statement Type %s is not valid, FlowFile %s", statementType, flowFile));
+ }
+
+ try (PreparedStatement ps = con.prepareStatement(sqlHolder.getSql())) {
+
+ final int queryTimeout = functionContext.queryTimeout;
+ try {
+ ps.setQueryTimeout(queryTimeout); // timeout in seconds
+ } catch (SQLException se) {
+ // If the driver doesn't support query timeout, then assume it is "infinite". Allow a timeout of zero only
+ if (queryTimeout > 0) {
+ throw se;
}
- } finally {
- try {
- con.commit();
- } finally {
- // make sure that we try to set the auto commit back to whatever it was.
- if (originalAutoCommit) {
- try {
- con.setAutoCommit(originalAutoCommit);
- } catch (final SQLException se) {
- // Nothing to do if it didn't work, indicates an issue with the driver
+ }
+
+ Record currentRecord;
+ List<Integer> fieldIndexes = sqlHolder.getFieldIndexes();
+
+ while ((currentRecord = recordParser.nextRecord()) != null) {
+ Object[] values = currentRecord.getValues();
+ if (values != null) {
+ if (fieldIndexes != null) {
+ for (int i = 0; i < fieldIndexes.size(); i++) {
+ ps.setObject(i + 1, values[fieldIndexes.get(i)]);
+ }
+ } else {
+ // If there's no index map, assume all values are included and set them in order
+ for (int i = 0; i < values.length; i++) {
+ ps.setObject(i + 1, values[i]);
}
}
+ ps.addBatch();
}
}
- } catch (final ProcessException | SQLException e) {
- log.error("Error occurred during processing, yielding the processor", e);
- context.yield();
+
+ log.debug("Executing query {}", new Object[]{sqlHolder});
+ ps.executeBatch();
+ result.routeTo(flowFile, REL_SUCCESS);
+ session.getProvenanceReporter().send(flowFile, functionContext.jdbcUrl);
+
}
}
+ @Override
+ public void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) throws ProcessException {
+
+ final Boolean rollbackOnFailure = context.getProperty(RollbackOnFailure.ROLLBACK_ON_FAILURE).asBoolean();
+ final Integer queryTimeout = context.getProperty(QUERY_TIMEOUT).evaluateAttributeExpressions().asTimePeriod(TimeUnit.SECONDS).intValue();
+
+ final FunctionContext functionContext = new FunctionContext(rollbackOnFailure, queryTimeout);
+
+ RollbackOnFailure.onTrigger(context, sessionFactory, functionContext, getLogger(), session -> process.onTrigger(context, session, functionContext));
+
+ }
+
private Set<String> getNormalizedColumnNames(final RecordSchema schema, final boolean translateFieldNames) {
final Set<String> normalizedFieldNames = new HashSet<>();
if (schema != null) {
@@ -636,27 +686,19 @@ public class PutDatabaseRecord extends AbstractProcessor {
return normalizedFieldNames;
}
- SqlAndIncludedColumns generateInsert(final RecordSchema recordSchema, final String tableName,
- final TableSchema tableSchema, final boolean translateFieldNames, final boolean ignoreUnmappedFields, final boolean failUnmappedColumns,
- final boolean warningUnmappedColumns, boolean escapeColumnNames, boolean quoteTableName) {
-
- if (recordSchema == null) {
- throw new ProcessException("No table schema specified!");
- }
- if (tableSchema == null) {
- throw new ProcessException("No table schema specified!");
- }
+ SqlAndIncludedColumns generateInsert(final RecordSchema recordSchema, final String tableName, final TableSchema tableSchema, final DMLSettings settings)
+ throws IllegalArgumentException, SQLException {
- final Set<String> normalizedFieldNames = getNormalizedColumnNames(recordSchema, translateFieldNames);
+ final Set<String> normalizedFieldNames = getNormalizedColumnNames(recordSchema, settings.translateFieldNames);
for (final String requiredColName : tableSchema.getRequiredColumnNames()) {
- final String normalizedColName = normalizeColumnName(requiredColName, translateFieldNames);
+ final String normalizedColName = normalizeColumnName(requiredColName, settings.translateFieldNames);
if (!normalizedFieldNames.contains(normalizedColName)) {
String missingColMessage = "Record does not have a value for the Required column '" + requiredColName + "'";
- if (failUnmappedColumns) {
+ if (settings.failUnmappedColumns) {
getLogger().error(missingColMessage);
- throw new ProcessException(missingColMessage);
- } else if (warningUnmappedColumns) {
+ throw new IllegalArgumentException(missingColMessage);
+ } else if (settings.warningUnmappedColumns) {
getLogger().warn(missingColMessage);
}
}
@@ -664,7 +706,7 @@ public class PutDatabaseRecord extends AbstractProcessor {
final StringBuilder sqlBuilder = new StringBuilder();
sqlBuilder.append("INSERT INTO ");
- if (quoteTableName) {
+ if (settings.quoteTableName) {
sqlBuilder.append(tableSchema.getQuotedIdentifierString())
.append(tableName)
.append(tableSchema.getQuotedIdentifierString());
@@ -680,14 +722,13 @@ public class PutDatabaseRecord extends AbstractProcessor {
int fieldCount = fieldNames.size();
AtomicInteger fieldsFound = new AtomicInteger(0);
- IntStream.range(0, fieldCount).forEach((i) -> {
-
+ for (int i = 0; i < fieldCount; i++) {
RecordField field = recordSchema.getField(i);
String fieldName = field.getFieldName();
- final ColumnDescription desc = tableSchema.getColumns().get(normalizeColumnName(fieldName, translateFieldNames));
- if (desc == null && !ignoreUnmappedFields) {
- throw new ProcessException("Cannot map field '" + fieldName + "' to any column in the database");
+ final ColumnDescription desc = tableSchema.getColumns().get(normalizeColumnName(fieldName, settings.translateFieldNames));
+ if (desc == null && !settings.ignoreUnmappedFields) {
+ throw new SQLDataException("Cannot map field '" + fieldName + "' to any column in the database");
}
if (desc != null) {
@@ -695,7 +736,7 @@ public class PutDatabaseRecord extends AbstractProcessor {
sqlBuilder.append(", ");
}
- if (escapeColumnNames) {
+ if (settings.escapeColumnNames) {
sqlBuilder.append(tableSchema.getQuotedIdentifierString())
.append(desc.getColumnName())
.append(tableSchema.getQuotedIdentifierString());
@@ -704,7 +745,7 @@ public class PutDatabaseRecord extends AbstractProcessor {
}
includedColumns.add(i);
}
- });
+ }
// complete the SQL statements by adding ?'s for all of the values to be escaped.
sqlBuilder.append(") VALUES (");
@@ -712,22 +753,15 @@ public class PutDatabaseRecord extends AbstractProcessor {
sqlBuilder.append(")");
if (fieldsFound.get() == 0) {
- throw new ProcessException("None of the fields in the record map to the columns defined by the " + tableName + " table");
+ throw new SQLDataException("None of the fields in the record map to the columns defined by the " + tableName + " table");
}
}
return new SqlAndIncludedColumns(sqlBuilder.toString(), includedColumns);
}
SqlAndIncludedColumns generateUpdate(final RecordSchema recordSchema, final String tableName, final String updateKeys,
- final TableSchema tableSchema, final boolean translateFieldNames, final boolean ignoreUnmappedFields, final boolean failUnmappedColumns,
- final boolean warningUnmappedColumns, boolean escapeColumnNames, boolean quoteTableName) {
-
- if (recordSchema == null) {
- throw new ProcessException("No table schema specified!");
- }
- if (tableSchema == null) {
- throw new ProcessException("No table schema specified!");
- }
+ final TableSchema tableSchema, final DMLSettings settings)
+ throws IllegalArgumentException, MalformedRecordException, SQLException {
final Set<String> updateKeyNames;
if (updateKeys == null) {
@@ -740,12 +774,12 @@ public class PutDatabaseRecord extends AbstractProcessor {
}
if (updateKeyNames.isEmpty()) {
- throw new ProcessException("Table '" + tableName + "' does not have a Primary Key and no Update Keys were specified");
+ throw new SQLIntegrityConstraintViolationException("Table '" + tableName + "' does not have a Primary Key and no Update Keys were specified");
}
final StringBuilder sqlBuilder = new StringBuilder();
sqlBuilder.append("UPDATE ");
- if (quoteTableName) {
+ if (settings.quoteTableName) {
sqlBuilder.append(tableSchema.getQuotedIdentifierString())
.append(tableName)
.append(tableSchema.getQuotedIdentifierString());
@@ -755,18 +789,18 @@ public class PutDatabaseRecord extends AbstractProcessor {
// Create a Set of all normalized Update Key names, and ensure that there is a field in the record
// for each of the Update Key fields.
- final Set<String> normalizedFieldNames = getNormalizedColumnNames(recordSchema, translateFieldNames);
+ final Set<String> normalizedFieldNames = getNormalizedColumnNames(recordSchema, settings.translateFieldNames);
final Set<String> normalizedUpdateNames = new HashSet<>();
for (final String uk : updateKeyNames) {
- final String normalizedUK = normalizeColumnName(uk, translateFieldNames);
+ final String normalizedUK = normalizeColumnName(uk, settings.translateFieldNames);
normalizedUpdateNames.add(normalizedUK);
if (!normalizedFieldNames.contains(normalizedUK)) {
String missingColMessage = "Record does not have a value for the " + (updateKeys == null ? "Primary" : "Update") + "Key column '" + uk + "'";
- if (failUnmappedColumns) {
+ if (settings.failUnmappedColumns) {
getLogger().error(missingColMessage);
- throw new ProcessException(missingColMessage);
- } else if (warningUnmappedColumns) {
+ throw new MalformedRecordException(missingColMessage);
+ } else if (settings.warningUnmappedColumns) {
getLogger().warn(missingColMessage);
}
}
@@ -781,18 +815,17 @@ public class PutDatabaseRecord extends AbstractProcessor {
int fieldCount = fieldNames.size();
AtomicInteger fieldsFound = new AtomicInteger(0);
- IntStream.range(0, fieldCount).forEach((i) -> {
-
+ for (int i = 0; i < fieldCount; i++) {
RecordField field = recordSchema.getField(i);
String fieldName = field.getFieldName();
- final String normalizedColName = normalizeColumnName(fieldName, translateFieldNames);
- final ColumnDescription desc = tableSchema.getColumns().get(normalizeColumnName(fieldName, translateFieldNames));
+ final String normalizedColName = normalizeColumnName(fieldName, settings.translateFieldNames);
+ final ColumnDescription desc = tableSchema.getColumns().get(normalizeColumnName(fieldName, settings.translateFieldNames));
if (desc == null) {
- if (!ignoreUnmappedFields) {
- throw new ProcessException("Cannot map field '" + fieldName + "' to any column in the database");
+ if (!settings.ignoreUnmappedFields) {
+ throw new SQLDataException("Cannot map field '" + fieldName + "' to any column in the database");
} else {
- return;
+ continue;
}
}
@@ -803,7 +836,7 @@ public class PutDatabaseRecord extends AbstractProcessor {
sqlBuilder.append(", ");
}
- if (escapeColumnNames) {
+ if (settings.escapeColumnNames) {
sqlBuilder.append(tableSchema.getQuotedIdentifierString())
.append(desc.getColumnName())
.append(tableSchema.getQuotedIdentifierString());
@@ -814,19 +847,19 @@ public class PutDatabaseRecord extends AbstractProcessor {
sqlBuilder.append(" = ?");
includedColumns.add(i);
}
- });
+ }
// Set the WHERE clause based on the Update Key values
sqlBuilder.append(" WHERE ");
AtomicInteger whereFieldCount = new AtomicInteger(0);
- IntStream.range(0, fieldCount).forEach((i) -> {
+ for (int i = 0; i < fieldCount; i++) {
RecordField field = recordSchema.getField(i);
String fieldName = field.getFieldName();
- final String normalizedColName = normalizeColumnName(fieldName, translateFieldNames);
- final ColumnDescription desc = tableSchema.getColumns().get(normalizeColumnName(fieldName, translateFieldNames));
+ final String normalizedColName = normalizeColumnName(fieldName, settings.translateFieldNames);
+ final ColumnDescription desc = tableSchema.getColumns().get(normalizeColumnName(fieldName, settings.translateFieldNames));
if (desc != null) {
// Check if this column is a Update Key. If so, add it to the WHERE clause
@@ -836,7 +869,7 @@ public class PutDatabaseRecord extends AbstractProcessor {
sqlBuilder.append(" AND ");
}
- if (escapeColumnNames) {
+ if (settings.escapeColumnNames) {
sqlBuilder.append(tableSchema.getQuotedIdentifierString())
.append(normalizedColName)
.append(tableSchema.getQuotedIdentifierString());
@@ -847,31 +880,23 @@ public class PutDatabaseRecord extends AbstractProcessor {
includedColumns.add(i);
}
}
- });
+ }
}
return new SqlAndIncludedColumns(sqlBuilder.toString(), includedColumns);
}
- SqlAndIncludedColumns generateDelete(final RecordSchema recordSchema, final String tableName,
- final TableSchema tableSchema, final boolean translateFieldNames, final boolean ignoreUnmappedFields, final boolean failUnmappedColumns,
- final boolean warningUnmappedColumns, boolean escapeColumnNames, boolean quoteTableName) {
+ SqlAndIncludedColumns generateDelete(final RecordSchema recordSchema, final String tableName, final TableSchema tableSchema, final DMLSettings settings)
+ throws IllegalArgumentException, MalformedRecordException, SQLDataException {
- if (recordSchema == null) {
- throw new ProcessException("No table schema specified!");
- }
- if (tableSchema == null) {
- throw new ProcessException("No table schema specified!");
- }
-
- final Set<String> normalizedFieldNames = getNormalizedColumnNames(recordSchema, translateFieldNames);
+ final Set<String> normalizedFieldNames = getNormalizedColumnNames(recordSchema, settings.translateFieldNames);
for (final String requiredColName : tableSchema.getRequiredColumnNames()) {
- final String normalizedColName = normalizeColumnName(requiredColName, translateFieldNames);
+ final String normalizedColName = normalizeColumnName(requiredColName, settings.translateFieldNames);
if (!normalizedFieldNames.contains(normalizedColName)) {
String missingColMessage = "Record does not have a value for the Required column '" + requiredColName + "'";
- if (failUnmappedColumns) {
+ if (settings.failUnmappedColumns) {
getLogger().error(missingColMessage);
- throw new ProcessException(missingColMessage);
- } else if (warningUnmappedColumns) {
+ throw new MalformedRecordException(missingColMessage);
+ } else if (settings.warningUnmappedColumns) {
getLogger().warn(missingColMessage);
}
}
@@ -879,7 +904,7 @@ public class PutDatabaseRecord extends AbstractProcessor {
final StringBuilder sqlBuilder = new StringBuilder();
sqlBuilder.append("DELETE FROM ");
- if (quoteTableName) {
+ if (settings.quoteTableName) {
sqlBuilder.append(tableSchema.getQuotedIdentifierString())
.append(tableName)
.append(tableSchema.getQuotedIdentifierString());
@@ -895,14 +920,14 @@ public class PutDatabaseRecord extends AbstractProcessor {
int fieldCount = fieldNames.size();
AtomicInteger fieldsFound = new AtomicInteger(0);
- IntStream.range(0, fieldCount).forEach((i) -> {
+ for (int i = 0; i < fieldCount; i++) {
RecordField field = recordSchema.getField(i);
String fieldName = field.getFieldName();
- final ColumnDescription desc = tableSchema.getColumns().get(normalizeColumnName(fieldName, translateFieldNames));
- if (desc == null && !ignoreUnmappedFields) {
- throw new ProcessException("Cannot map field '" + fieldName + "' to any column in the database");
+ final ColumnDescription desc = tableSchema.getColumns().get(normalizeColumnName(fieldName, settings.translateFieldNames));
+ if (desc == null && !settings.ignoreUnmappedFields) {
+ throw new SQLDataException("Cannot map field '" + fieldName + "' to any column in the database");
}
if (desc != null) {
@@ -910,7 +935,7 @@ public class PutDatabaseRecord extends AbstractProcessor {
sqlBuilder.append(" AND ");
}
- if (escapeColumnNames) {
+ if (settings.escapeColumnNames) {
sqlBuilder.append(tableSchema.getQuotedIdentifierString())
.append(desc.getColumnName())
.append(tableSchema.getQuotedIdentifierString());
@@ -921,10 +946,10 @@ public class PutDatabaseRecord extends AbstractProcessor {
includedColumns.add(i);
}
- });
+ }
if (fieldsFound.get() == 0) {
- throw new ProcessException("None of the fields in the record map to the columns defined by the " + tableName + " table");
+ throw new SQLDataException("None of the fields in the record map to the columns defined by the " + tableName + " table");
}
}
@@ -985,7 +1010,7 @@ public class PutDatabaseRecord extends AbstractProcessor {
final Set<String> primaryKeyColumns = new HashSet<>();
if (includePrimaryKeys) {
- try (final ResultSet pkrs = conn.getMetaData().getPrimaryKeys(catalog, null, tableName)) {
+ try (final ResultSet pkrs = dmd.getPrimaryKeys(catalog, null, tableName)) {
while (pkrs.next()) {
final String colName = pkrs.getString("COLUMN_NAME");