You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2015/10/27 21:53:56 UTC
[1/2] nifi git commit: NIFI-977: Allow SQL Data Types with numerals
that are negative
Repository: nifi
Updated Branches:
refs/heads/master 4c0cf7d72 -> dc4004de6
NIFI-977: Allow SQL Data Types with numerals that are negative
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/84db3725
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/84db3725
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/84db3725
Branch: refs/heads/master
Commit: 84db3725386fe44bbef8a18d84852a4f716addb6
Parents: 49ee06b
Author: Mark Payne <ma...@hotmail.com>
Authored: Wed Oct 14 13:12:10 2015 -0400
Committer: Mark Payne <ma...@hotmail.com>
Committed: Wed Oct 14 13:12:10 2015 -0400
----------------------------------------------------------------------
.../apache/nifi/processors/standard/PutSQL.java | 156 +++++++++----------
.../nifi/processors/standard/TestPutSQL.java | 46 +++++-
2 files changed, 119 insertions(+), 83 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/84db3725/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java
index b087737..5c2bbc2 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java
@@ -70,81 +70,81 @@ import org.apache.nifi.stream.io.StreamUtils;
@SeeAlso(ConvertJSONToSQL.class)
@Tags({"sql", "put", "rdbms", "database", "update", "insert", "relational"})
@CapabilityDescription("Executes a SQL UPDATE or INSERT command. The content of an incoming FlowFile is expected to be the SQL command "
- + "to execute. The SQL command may use the ? to escape parameters. In this case, the parameters to use must exist as FlowFile attributes "
- + "with the naming convention sql.args.N.type and sql.args.N.value, where N is a positive integer. The sql.args.N.type is expected to be "
- + "a number indicating the JDBC Type. The content of the FlowFile is expected to be in UTF-8 format.")
+ + "to execute. The SQL command may use the ? to escape parameters. In this case, the parameters to use must exist as FlowFile attributes "
+ + "with the naming convention sql.args.N.type and sql.args.N.value, where N is a positive integer. The sql.args.N.type is expected to be "
+ + "a number indicating the JDBC Type. The content of the FlowFile is expected to be in UTF-8 format.")
@ReadsAttributes({
- @ReadsAttribute(attribute="fragment.identifier", description="If the <Support Fragment Transactions> property is true, this attribute is used to determine whether or "
- + "not two FlowFiles belong to the same transaction."),
- @ReadsAttribute(attribute="fragment.count", description="If the <Support Fragment Transactions> property is true, this attribute is used to determine how many FlowFiles "
- + "are needed to complete the transaction."),
- @ReadsAttribute(attribute="fragment.index", description="If the <Support Fragment Transactions> property is true, this attribute is used to determine the order that the FlowFiles "
- + "in a transaction should be evaluated."),
- @ReadsAttribute(attribute="sql.args.N.type", description="Incoming FlowFiles are expected to be parameterized SQL statements. The type of each Parameter is specified as an integer "
- + "that represents the JDBC Type of the parameter."),
- @ReadsAttribute(attribute="sql.args.N.value", description="Incoming FlowFiles are expected to be parameterized SQL statements. The value of the Parameters are specified as "
- + "sql.args.1.value, sql.args.2.value, sql.args.3.value, and so on. The type of the sql.args.1.value Parameter is specified by the sql.args.1.type attribute.")
+ @ReadsAttribute(attribute = "fragment.identifier", description = "If the <Support Fragment Transactions> property is true, this attribute is used to determine whether or "
+ + "not two FlowFiles belong to the same transaction."),
+ @ReadsAttribute(attribute = "fragment.count", description = "If the <Support Fragment Transactions> property is true, this attribute is used to determine how many FlowFiles "
+ + "are needed to complete the transaction."),
+ @ReadsAttribute(attribute = "fragment.index", description = "If the <Support Fragment Transactions> property is true, this attribute is used to determine the order that the FlowFiles "
+ + "in a transaction should be evaluated."),
+ @ReadsAttribute(attribute = "sql.args.N.type", description = "Incoming FlowFiles are expected to be parameterized SQL statements. The type of each Parameter is specified as an integer "
+ + "that represents the JDBC Type of the parameter."),
+ @ReadsAttribute(attribute = "sql.args.N.value", description = "Incoming FlowFiles are expected to be parameterized SQL statements. The value of the Parameters are specified as "
+ + "sql.args.1.value, sql.args.2.value, sql.args.3.value, and so on. The type of the sql.args.1.value Parameter is specified by the sql.args.1.type attribute.")
})
@WritesAttributes({
- @WritesAttribute(attribute="sql.generated.key", description="If the database generated a key for an INSERT statement and the Obtain Generated Keys property is set to true, "
- + "this attribute will be added to indicate the generated key, if possible. This feature is not supported by all database vendors.")
+ @WritesAttribute(attribute = "sql.generated.key", description = "If the database generated a key for an INSERT statement and the Obtain Generated Keys property is set to true, "
+ + "this attribute will be added to indicate the generated key, if possible. This feature is not supported by all database vendors.")
})
public class PutSQL extends AbstractProcessor {
static final PropertyDescriptor CONNECTION_POOL = new PropertyDescriptor.Builder()
- .name("JDBC Connection Pool")
- .description("Specifies the JDBC Connection Pool to use in order to convert the JSON message to a SQL statement. "
- + "The Connection Pool is necessary in order to determine the appropriate database column types.")
- .identifiesControllerService(DBCPService.class)
- .required(true)
- .build();
+ .name("JDBC Connection Pool")
+ .description("Specifies the JDBC Connection Pool to use in order to convert the JSON message to a SQL statement. "
+ + "The Connection Pool is necessary in order to determine the appropriate database column types.")
+ .identifiesControllerService(DBCPService.class)
+ .required(true)
+ .build();
static final PropertyDescriptor SUPPORT_TRANSACTIONS = new PropertyDescriptor.Builder()
- .name("Support Fragmented Transactions")
- .description("If true, when a FlowFile is consumed by this Processor, the Processor will first check the fragment.identifier and fragment.count attributes of that FlowFile. "
- + "If the fragment.count value is greater than 1, the Processor will not process any FlowFile will that fragment.identifier until all are available; "
- + "at that point, it will process all FlowFiles with that fragment.identifier as a single transaction, in the order specified by the FlowFiles' fragment.index attributes. "
- + "This Provides atomicity of those SQL statements. If this value is false, these attributes will be ignored and the updates will occur independent of one another.")
- .allowableValues("true", "false")
- .defaultValue("true")
- .build();
+ .name("Support Fragmented Transactions")
+ .description("If true, when a FlowFile is consumed by this Processor, the Processor will first check the fragment.identifier and fragment.count attributes of that FlowFile. "
+ + "If the fragment.count value is greater than 1, the Processor will not process any FlowFile will that fragment.identifier until all are available; "
+ + "at that point, it will process all FlowFiles with that fragment.identifier as a single transaction, in the order specified by the FlowFiles' fragment.index attributes. "
+ + "This Provides atomicity of those SQL statements. If this value is false, these attributes will be ignored and the updates will occur independent of one another.")
+ .allowableValues("true", "false")
+ .defaultValue("true")
+ .build();
static final PropertyDescriptor TRANSACTION_TIMEOUT = new PropertyDescriptor.Builder()
- .name("Transaction Timeout")
- .description("If the <Support Fragmented Transactions> property is set to true, specifies how long to wait for all FlowFiles for a particular fragment.identifier attribute "
- + "to arrive before just transferring all of the FlowFiles with that identifier to the 'failure' relationship")
- .required(false)
- .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
- .build();
+ .name("Transaction Timeout")
+ .description("If the <Support Fragmented Transactions> property is set to true, specifies how long to wait for all FlowFiles for a particular fragment.identifier attribute "
+ + "to arrive before just transferring all of the FlowFiles with that identifier to the 'failure' relationship")
+ .required(false)
+ .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+ .build();
static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
- .name("Batch Size")
- .description("The preferred number of FlowFiles to put to the database in a single transaction")
- .required(true)
- .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
- .defaultValue("100")
- .build();
+ .name("Batch Size")
+ .description("The preferred number of FlowFiles to put to the database in a single transaction")
+ .required(true)
+ .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+ .defaultValue("100")
+ .build();
static final PropertyDescriptor OBTAIN_GENERATED_KEYS = new PropertyDescriptor.Builder()
- .name("Obtain Generated Keys")
- .description("If true, any key that is automatically generated by the database will be added to the FlowFile that generated it using the sql.generate.key attribute. "
- + "This may result in slightly slower performance and is not supported by all databases.")
- .allowableValues("true", "false")
- .defaultValue("false")
- .build();
+ .name("Obtain Generated Keys")
+ .description("If true, any key that is automatically generated by the database will be added to the FlowFile that generated it using the sql.generate.key attribute. "
+ + "This may result in slightly slower performance and is not supported by all databases.")
+ .allowableValues("true", "false")
+ .defaultValue("false")
+ .build();
static final Relationship REL_SUCCESS = new Relationship.Builder()
- .name("success")
- .description("A FlowFile is routed to this relationship after the database is successfully updated")
- .build();
+ .name("success")
+ .description("A FlowFile is routed to this relationship after the database is successfully updated")
+ .build();
static final Relationship REL_RETRY = new Relationship.Builder()
- .name("retry")
- .description("A FlowFile is routed to this relationship if the database cannot be updated but attempting the operation again may succeed")
- .build();
+ .name("retry")
+ .description("A FlowFile is routed to this relationship if the database cannot be updated but attempting the operation again may succeed")
+ .build();
static final Relationship REL_FAILURE = new Relationship.Builder()
- .name("failure")
- .description("A FlowFile is routed to this relationship if the database cannot be updated and retrying the operation will also fail, "
- + "such as an invalid query or an integrity constraint violation")
- .build();
+ .name("failure")
+ .description("A FlowFile is routed to this relationship if the database cannot be updated and retrying the operation will also fail, "
+ + "such as an invalid query or an integrity constraint violation")
+ .build();
private static final Pattern SQL_TYPE_ATTRIBUTE_PATTERN = Pattern.compile("sql\\.args\\.(\\d+)\\.type");
- private static final Pattern NUMBER_PATTERN = Pattern.compile("\\d+");
+ private static final Pattern NUMBER_PATTERN = Pattern.compile("-?\\d+");
private static final String FRAGMENT_ID_ATTR = "fragment.identifier";
private static final String FRAGMENT_INDEX_ATTR = "fragment.index";
@@ -187,7 +187,7 @@ public class PutSQL extends AbstractProcessor {
final long startNanos = System.nanoTime();
final boolean obtainKeys = context.getProperty(OBTAIN_GENERATED_KEYS).asBoolean();
final Map<String, StatementFlowFileEnclosure> statementMap = new HashMap<>(); // Map SQL to a PreparedStatement and FlowFiles
- final List<FlowFile> sentFlowFiles = new ArrayList<>(); // flowfiles that have been sent
+ final List<FlowFile> sentFlowFiles = new ArrayList<>(); // flowfiles that have been sent
final List<FlowFile> processedFlowFiles = new ArrayList<>(); // all flowfiles that we have processed
final Set<StatementFlowFileEnclosure> enclosuresToExecute = new LinkedHashSet<>(); // the enclosures that we've processed
@@ -286,7 +286,7 @@ public class PutSQL extends AbstractProcessor {
conn.rollback();
final FlowFile offendingFlowFile = batchFlowFiles.get(offendingFlowFileIndex);
getLogger().error("Failed to update database due to a failed batch update. A total of {} FlowFiles are required for this transaction, so routing all to failure. "
- + "Offending FlowFile was {}, which caused the following error: {}", new Object[] {flowFiles.size(), offendingFlowFile, e});
+ + "Offending FlowFile was {}, which caused the following error: {}", new Object[] {flowFiles.size(), offendingFlowFile, e});
session.transfer(flowFiles, REL_FAILURE);
return;
}
@@ -300,7 +300,7 @@ public class PutSQL extends AbstractProcessor {
int failureCount = 0;
int successCount = 0;
int retryCount = 0;
- for (int i=0; i < updateCounts.length; i++) {
+ for (int i = 0; i < updateCounts.length; i++) {
final int updateCount = updateCounts[i];
final FlowFile flowFile = batchFlowFiles.get(i);
if (updateCount == Statement.EXECUTE_FAILED) {
@@ -329,7 +329,7 @@ public class PutSQL extends AbstractProcessor {
}
getLogger().error("Failed to update database due to a failed batch update. There were a total of {} FlowFiles that failed, {} that succeeded, "
- + "and {} that were not execute and will be routed to retry; ", new Object[] {failureCount, successCount, retryCount});
+ + "and {} that were not execute and will be routed to retry; ", new Object[] {failureCount, successCount, retryCount});
} catch (final SQLNonTransientException e) {
getLogger().error("Failed to update database for {} due to {}; routing to failure", new Object[] {enclosure.getFlowFiles(), e});
@@ -339,7 +339,7 @@ public class PutSQL extends AbstractProcessor {
continue;
} catch (final SQLException e) {
getLogger().error("Failed to update database for {} due to {}; it is possible that retrying the operation will succeed, so routing to retry",
- new Object[] {enclosure.getFlowFiles(), e});
+ new Object[] {enclosure.getFlowFiles(), e});
for (final FlowFile flowFile : enclosure.getFlowFiles()) {
destinationRelationships.put(flowFile, REL_RETRY);
@@ -484,7 +484,7 @@ public class PutSQL extends AbstractProcessor {
*
* @param stmt the statement that generated a key
* @return the key that was generated from the given statement, or <code>null</code> if no key
- * was generated or it could not be determined.
+ * was generated or it could not be determined.
*/
private String determineGeneratedKey(final PreparedStatement stmt) {
try {
@@ -514,7 +514,7 @@ public class PutSQL extends AbstractProcessor {
* @throws SQLException if unable to create the appropriate PreparedStatement
*/
private StatementFlowFileEnclosure getEnclosure(final String sql, final Connection conn, final Map<String, StatementFlowFileEnclosure> stmtMap,
- final boolean obtainKeys, final boolean fragmentedTransaction) throws SQLException {
+ final boolean obtainKeys, final boolean fragmentedTransaction) throws SQLException {
StatementFlowFileEnclosure enclosure = stmtMap.get(sql);
if (enclosure != null) {
return enclosure;
@@ -620,9 +620,9 @@ public class PutSQL extends AbstractProcessor {
*
* @param flowFiles the FlowFiles whose relationship is to be determined
* @param transactionTimeoutMillis the maximum amount of time (in milliseconds) that we should wait
- * for all FlowFiles in a transaction to be present before routing to failure
+ * for all FlowFiles in a transaction to be present before routing to failure
* @return the appropriate relationship to route the FlowFiles to, or <code>null</code> if the FlowFiles
- * should instead be processed
+ * should instead be processed
*/
Relationship determineRelationship(final List<FlowFile> flowFiles, final Long transactionTimeoutMillis) {
int selectedNumFragments = 0;
@@ -634,7 +634,7 @@ public class PutSQL extends AbstractProcessor {
return null;
} else if (fragmentCount == null) {
getLogger().error("Cannot process {} because there are {} FlowFiles with the same fragment.identifier "
- + "attribute but not all FlowFiles have a fragment.count attribute; routing all to failure", new Object[] {flowFile, flowFiles.size()});
+ + "attribute but not all FlowFiles have a fragment.count attribute; routing all to failure", new Object[] {flowFile, flowFiles.size()});
return REL_FAILURE;
}
@@ -643,13 +643,13 @@ public class PutSQL extends AbstractProcessor {
numFragments = Integer.parseInt(fragmentCount);
} catch (final NumberFormatException nfe) {
getLogger().error("Cannot process {} because the fragment.count attribute has a value of '{}', which is not an integer; "
- + "routing all FlowFiles with this fragment.identifier to failure", new Object[] {flowFile, fragmentCount});
+ + "routing all FlowFiles with this fragment.identifier to failure", new Object[] {flowFile, fragmentCount});
return REL_FAILURE;
}
if (numFragments < 1) {
getLogger().error("Cannot process {} because the fragment.count attribute has a value of '{}', which is not a positive integer; "
- + "routing all FlowFiles with this fragment.identifier to failure", new Object[] {flowFile, fragmentCount});
+ + "routing all FlowFiles with this fragment.identifier to failure", new Object[] {flowFile, fragmentCount});
return REL_FAILURE;
}
@@ -657,14 +657,14 @@ public class PutSQL extends AbstractProcessor {
selectedNumFragments = numFragments;
} else if (numFragments != selectedNumFragments) {
getLogger().error("Cannot process {} because the fragment.count attribute has different values for different FlowFiles with the same fragment.identifier; "
- + "routing all FlowFiles with this fragment.identifier to failure", new Object[] {flowFile});
+ + "routing all FlowFiles with this fragment.identifier to failure", new Object[] {flowFile});
return REL_FAILURE;
}
final String fragmentIndex = flowFile.getAttribute(FRAGMENT_INDEX_ATTR);
if (fragmentIndex == null) {
getLogger().error("Cannot process {} because the fragment.index attribute is missing; "
- + "routing all FlowFiles with this fragment.identifier to failure", new Object[] {flowFile});
+ + "routing all FlowFiles with this fragment.identifier to failure", new Object[] {flowFile});
return REL_FAILURE;
}
@@ -673,19 +673,19 @@ public class PutSQL extends AbstractProcessor {
idx = Integer.parseInt(fragmentIndex);
} catch (final NumberFormatException nfe) {
getLogger().error("Cannot process {} because the fragment.index attribute has a value of '{}', which is not an integer; "
- + "routing all FlowFiles with this fragment.identifier to failure", new Object[] {flowFile, fragmentIndex});
+ + "routing all FlowFiles with this fragment.identifier to failure", new Object[] {flowFile, fragmentIndex});
return REL_FAILURE;
}
if (idx < 0) {
getLogger().error("Cannot process {} because the fragment.index attribute has a value of '{}', which is not a positive integer; "
- + "routing all FlowFiles with this fragment.identifier to failure", new Object[] {flowFile, fragmentIndex});
+ + "routing all FlowFiles with this fragment.identifier to failure", new Object[] {flowFile, fragmentIndex});
return REL_FAILURE;
}
if (bitSet.get(idx)) {
getLogger().error("Cannot process {} because it has the same value for the fragment.index attribute as another FlowFile with the same fragment.identifier; "
- + "routing all FlowFiles with this fragment.identifier to failure", new Object[] {flowFile});
+ + "routing all FlowFiles with this fragment.identifier to failure", new Object[] {flowFile});
return REL_FAILURE;
}
@@ -693,7 +693,7 @@ public class PutSQL extends AbstractProcessor {
}
if (selectedNumFragments == flowFiles.size()) {
- return null; // no relationship to route FlowFiles to yet - process the FlowFiles.
+ return null; // no relationship to route FlowFiles to yet - process the FlowFiles.
}
long latestQueueTime = 0L;
@@ -711,7 +711,7 @@ public class PutSQL extends AbstractProcessor {
}
getLogger().debug("Not enough FlowFiles for transaction. Returning all FlowFiles to queue");
- return Relationship.SELF; // not enough FlowFiles for this transaction. Return them all to queue.
+ return Relationship.SELF; // not enough FlowFiles for this transaction. Return them all to queue.
}
/**
@@ -769,7 +769,7 @@ public class PutSQL extends AbstractProcessor {
break;
default:
throw new SQLException("The '" + attrName + "' attribute has a value of '" + parameterValue
- + "' and a type of '" + jdbcType + "' but this is not a known data type");
+ + "' and a type of '" + jdbcType + "' but this is not a known data type");
}
}
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/84db3725/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSQL.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSQL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSQL.java
index a348c9e..17506f7 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSQL.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutSQL.java
@@ -103,6 +103,7 @@ public class TestPutSQL {
}
}
+
@Test
public void testInsertWithGeneratedKeys() throws InitializationException, ProcessException, SQLException, IOException {
final TestRunner runner = TestRunners.newTestRunner(PutSQL.class);
@@ -158,7 +159,7 @@ public class TestPutSQL {
runner.setProperty(PutSQL.OBTAIN_GENERATED_KEYS, "false");
runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp");
runner.enqueue("INSERT INTO PERSONS (NAME, CODE) VALUES ('Mark', 84)".getBytes());
- runner.enqueue("INSERT INTO PERSONS".getBytes()); // intentionally wrong syntax
+ runner.enqueue("INSERT INTO PERSONS".getBytes()); // intentionally wrong syntax
runner.enqueue("INSERT INTO PERSONS (NAME, CODE) VALUES ('Tom', 3)".getBytes());
runner.enqueue("INSERT INTO PERSONS (NAME, CODE) VALUES ('Harry', 44)".getBytes());
runner.run();
@@ -256,6 +257,41 @@ public class TestPutSQL {
}
+ @Test
+ public void testUsingSqlDataTypesWithNegativeValues() throws InitializationException, ProcessException, SQLException, IOException {
+ final TestRunner runner = TestRunners.newTestRunner(PutSQL.class);
+ final File tempDir = folder.getRoot();
+ final File dbDir = new File(tempDir, "db");
+ final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath());
+ runner.addControllerService("dbcp", service);
+ runner.enableControllerService(service);
+
+ try (final Connection conn = service.getConnection()) {
+ try (final Statement stmt = conn.createStatement()) {
+ stmt.executeUpdate("CREATE TABLE PERSONS (id integer primary key, name varchar(100), code bigint)");
+ }
+ }
+
+ runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp");
+ final Map<String, String> attributes = new HashMap<>();
+ attributes.put("sql.args.1.type", "-5");
+ attributes.put("sql.args.1.value", "84");
+ runner.enqueue("INSERT INTO PERSONS (ID, NAME, CODE) VALUES (1, 'Mark', ?)".getBytes(), attributes);
+ runner.run();
+
+ runner.assertAllFlowFilesTransferred(PutSQL.REL_SUCCESS, 1);
+
+ try (final Connection conn = service.getConnection()) {
+ try (final Statement stmt = conn.createStatement()) {
+ final ResultSet rs = stmt.executeQuery("SELECT * FROM PERSONS");
+ assertTrue(rs.next());
+ assertEquals(1, rs.getInt(1));
+ assertEquals("Mark", rs.getString(2));
+ assertEquals(84, rs.getInt(3));
+ assertFalse(rs.next());
+ }
+ }
+ }
@Test
public void testStatementsWithPreparedParameters() throws InitializationException, ProcessException, SQLException, IOException {
@@ -343,7 +379,7 @@ public class TestPutSQL {
runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp");
final String sql = "INSERT INTO PERSONS (ID, NAME, CODE) VALUES (?, ?, ?); " +
- "UPDATE PERSONS SET NAME='George' WHERE ID=?; ";
+ "UPDATE PERSONS SET NAME='George' WHERE ID=?; ";
final Map<String, String> attributes = new HashMap<>();
attributes.put("sql.args.1.type", String.valueOf(Types.INTEGER));
attributes.put("sql.args.1.value", "1");
@@ -432,7 +468,7 @@ public class TestPutSQL {
runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp");
final String sql = "INSERT INTO PERSONS (ID, NAME, CODE) VALUES (?, ?, ?); " +
- "UPDATE SOME_RANDOM_TABLE NAME='George' WHERE ID=?; ";
+ "UPDATE SOME_RANDOM_TABLE NAME='George' WHERE ID=?; ";
final Map<String, String> attributes = new HashMap<>();
attributes.put("sql.args.1.type", String.valueOf(Types.INTEGER));
attributes.put("sql.args.1.value", "1");
@@ -471,7 +507,7 @@ public class TestPutSQL {
runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp");
final String sql = "INSERT INTO PERSONS (ID, NAME, CODE) VALUES (?, ?, ?); " +
- "UPDATE PERSONS SET NAME='George' WHERE ID=?; ";
+ "UPDATE PERSONS SET NAME='George' WHERE ID=?; ";
final Map<String, String> attributes = new HashMap<>();
attributes.put("sql.args.1.type", String.valueOf(Types.INTEGER));
attributes.put("sql.args.1.value", "1");
@@ -579,7 +615,7 @@ public class TestPutSQL {
final MockFlowFile mff = new MockFlowFile(0L) {
@Override
public Long getLastQueueDate() {
- return System.currentTimeMillis() - 10000L; // return 10 seconds ago
+ return System.currentTimeMillis() - 10000L; // return 10 seconds ago
}
@Override
[2/2] nifi git commit: Merge branch 'NIFI-977'
Posted by ma...@apache.org.
Merge branch 'NIFI-977'
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/dc4004de
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/dc4004de
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/dc4004de
Branch: refs/heads/master
Commit: dc4004de64d10ba82298eb1f6ecb5924fd0afb62
Parents: 4c0cf7d 84db372
Author: Mark Payne <ma...@hotmail.com>
Authored: Tue Oct 27 16:53:38 2015 -0400
Committer: Mark Payne <ma...@hotmail.com>
Committed: Tue Oct 27 16:53:38 2015 -0400
----------------------------------------------------------------------
.../apache/nifi/processors/standard/PutSQL.java | 156 +++++++++----------
.../nifi/processors/standard/TestPutSQL.java | 46 +++++-
2 files changed, 119 insertions(+), 83 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/dc4004de/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java
----------------------------------------------------------------------
diff --cc nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java
index 0913f86,5c2bbc2..c2056fe
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSQL.java
@@@ -70,27 -68,26 +70,27 @@@ import org.apache.nifi.stream.io.Stream
@SupportsBatching
@SeeAlso(ConvertJSONToSQL.class)
+@InputRequirement(Requirement.INPUT_REQUIRED)
@Tags({"sql", "put", "rdbms", "database", "update", "insert", "relational"})
@CapabilityDescription("Executes a SQL UPDATE or INSERT command. The content of an incoming FlowFile is expected to be the SQL command "
- + "to execute. The SQL command may use the ? to escape parameters. In this case, the parameters to use must exist as FlowFile attributes "
- + "with the naming convention sql.args.N.type and sql.args.N.value, where N is a positive integer. The sql.args.N.type is expected to be "
- + "a number indicating the JDBC Type. The content of the FlowFile is expected to be in UTF-8 format.")
+ + "to execute. The SQL command may use the ? to escape parameters. In this case, the parameters to use must exist as FlowFile attributes "
+ + "with the naming convention sql.args.N.type and sql.args.N.value, where N is a positive integer. The sql.args.N.type is expected to be "
+ + "a number indicating the JDBC Type. The content of the FlowFile is expected to be in UTF-8 format.")
@ReadsAttributes({
- @ReadsAttribute(attribute="fragment.identifier", description="If the <Support Fragment Transactions> property is true, this attribute is used to determine whether or "
- + "not two FlowFiles belong to the same transaction."),
- @ReadsAttribute(attribute="fragment.count", description="If the <Support Fragment Transactions> property is true, this attribute is used to determine how many FlowFiles "
- + "are needed to complete the transaction."),
- @ReadsAttribute(attribute="fragment.index", description="If the <Support Fragment Transactions> property is true, this attribute is used to determine the order that the FlowFiles "
- + "in a transaction should be evaluated."),
- @ReadsAttribute(attribute="sql.args.N.type", description="Incoming FlowFiles are expected to be parameterized SQL statements. The type of each Parameter is specified as an integer "
- + "that represents the JDBC Type of the parameter."),
- @ReadsAttribute(attribute="sql.args.N.value", description="Incoming FlowFiles are expected to be parameterized SQL statements. The value of the Parameters are specified as "
- + "sql.args.1.value, sql.args.2.value, sql.args.3.value, and so on. The type of the sql.args.1.value Parameter is specified by the sql.args.1.type attribute.")
+ @ReadsAttribute(attribute = "fragment.identifier", description = "If the <Support Fragment Transactions> property is true, this attribute is used to determine whether or "
+ + "not two FlowFiles belong to the same transaction."),
+ @ReadsAttribute(attribute = "fragment.count", description = "If the <Support Fragment Transactions> property is true, this attribute is used to determine how many FlowFiles "
+ + "are needed to complete the transaction."),
+ @ReadsAttribute(attribute = "fragment.index", description = "If the <Support Fragment Transactions> property is true, this attribute is used to determine the order that the FlowFiles "
+ + "in a transaction should be evaluated."),
+ @ReadsAttribute(attribute = "sql.args.N.type", description = "Incoming FlowFiles are expected to be parameterized SQL statements. The type of each Parameter is specified as an integer "
+ + "that represents the JDBC Type of the parameter."),
+ @ReadsAttribute(attribute = "sql.args.N.value", description = "Incoming FlowFiles are expected to be parameterized SQL statements. The value of the Parameters are specified as "
+ + "sql.args.1.value, sql.args.2.value, sql.args.3.value, and so on. The type of the sql.args.1.value Parameter is specified by the sql.args.1.type attribute.")
})
@WritesAttributes({
- @WritesAttribute(attribute="sql.generated.key", description="If the database generated a key for an INSERT statement and the Obtain Generated Keys property is set to true, "
- + "this attribute will be added to indicate the generated key, if possible. This feature is not supported by all database vendors.")
+ @WritesAttribute(attribute = "sql.generated.key", description = "If the database generated a key for an INSERT statement and the Obtain Generated Keys property is set to true, "
+ + "this attribute will be added to indicate the generated key, if possible. This feature is not supported by all database vendors.")
})
public class PutSQL extends AbstractProcessor {