You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by GitBox <gi...@apache.org> on 2020/01/10 12:14:38 UTC

[GitHub] [nifi] woutifier-t opened a new pull request #3977: NIFI-7007 Add update functionality to the PutCassandraRecord processor.

woutifier-t opened a new pull request #3977: NIFI-7007 Add update functionality to the PutCassandraRecord processor.
URL: https://github.com/apache/nifi/pull/3977
 
 
   Thank you for submitting a contribution to Apache NiFi.
   
   Please provide a short description of the PR here:
   
   #### Description of PR
   
   _Enables UPDATE functionality in the PutCassandraRecord processor; fixes bug NIFI-7007._
   
   In order to streamline the review of the contribution we ask you
   to ensure the following steps have been taken:
   
   ### For all changes:
   - [x] Is there a JIRA ticket associated with this PR? Is it referenced 
        in the commit message?
   
   - [x] Does your PR title start with **NIFI-XXXX** where XXXX is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character.
   
   - [x] Has your PR been rebased against the latest commit within the target branch (typically `master`)?
   
   - [x] Is your initial contribution a single, squashed commit? _Additional commits in response to PR reviewer feedback should be made on this branch and pushed to allow change tracking. Do not `squash` or use `--force` when pushing to allow for clean monitoring of changes._
   
   ### For code changes:
   - [x] Have you ensured that the full suite of tests is executed via `mvn -Pcontrib-check clean install` at the root `nifi` folder?
   - [x] Have you written or updated unit tests to verify your changes?
   - [x] Have you verified that the full build is successful on both JDK 8 and JDK 11?
   - [ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)? 
   - [ ] If applicable, have you updated the `LICENSE` file, including the main `LICENSE` file under `nifi-assembly`?
   - [ ] If applicable, have you updated the `NOTICE` file, including the main `NOTICE` file found under `nifi-assembly`?
   - [x] If adding new Properties, have you added `.displayName` in addition to .name (programmatic access) for each of the new properties?
   
   ### For documentation related changes:
   - [ ] Have you ensured that format looks appropriate for the output in which it is rendered?
   
   ### Note:
   Please ensure that once the PR is submitted, you check travis-ci for build issues and submit an update to your PR as soon as possible.
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi] tpalfy commented on a change in pull request #3977: NIFI-7007 Add update functionality to the PutCassandraRecord processor.

Posted by GitBox <gi...@apache.org>.
tpalfy commented on a change in pull request #3977: NIFI-7007 Add update functionality to the PutCassandraRecord processor.
URL: https://github.com/apache/nifi/pull/3977#discussion_r375773901
 
 

 ##########
 File path: nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/PutCassandraRecord.java
 ##########
 @@ -255,14 +257,9 @@ public void onTrigger(ProcessContext context, ProcessSession session) throws Pro
                 throw new IllegalArgumentException(format("Statement Type is not specified, FlowFile %s", inputFlowFile));
             }
 
-            // throw an exception if the statement type is set to update and updateMethod or updateKeys is empty
-            if (UPDATE_TYPE.getValue().equalsIgnoreCase(statementType)) {
-                if (StringUtils.isEmpty(updateMethod)) {
-                    throw new IllegalArgumentException(format("Update Method is not specified, FlowFile %s", inputFlowFile));
-                }
-                if (StringUtils.isEmpty(updateKeys)) {
+            // throw an exception if the statement type is set to update and updateKeys is empty
+            if (UPDATE_TYPE.getValue().equalsIgnoreCase(statementType) && StringUtils.isEmpty(updateKeys)) {
 
 Review comment:
   Checking `updateMethod` _here_ actually makes sense because it can come from a flowfile attribute and can be null/empty.
   I'd leave this check as it was.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi] woutifier-t commented on a change in pull request #3977: NIFI-7007 Add update functionality to the PutCassandraRecord processor.

Posted by GitBox <gi...@apache.org>.
woutifier-t commented on a change in pull request #3977: NIFI-7007 Add update functionality to the PutCassandraRecord processor.
URL: https://github.com/apache/nifi/pull/3977#discussion_r375287946
 
 

 ##########
 File path: nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/PutCassandraRecord.java
 ##########
 @@ -129,8 +205,39 @@ public void onTrigger(ProcessContext context, ProcessSession session) throws Pro
         final String cassandraTable = context.getProperty(TABLE).evaluateAttributeExpressions(inputFlowFile).getValue();
         final RecordReaderFactory recordParserFactory = context.getProperty(RECORD_READER_FACTORY).asControllerService(RecordReaderFactory.class);
         final int batchSize = context.getProperty(BATCH_SIZE).evaluateAttributeExpressions().asInteger();
-        final String batchStatementType = context.getProperty(BATCH_STATEMENT_TYPE).getValue();
         final String serialConsistencyLevel = context.getProperty(CONSISTENCY_LEVEL).getValue();
+        final String updateKeys = context.getProperty(UPDATE_KEYS).evaluateAttributeExpressions(inputFlowFile).getValue();
 
 Review comment:
   this is now implemented.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi] Woutifier edited a comment on issue #3977: NIFI-7007 Add update functionality to the PutCassandraRecord processor.

Posted by GitBox <gi...@apache.org>.
Woutifier edited a comment on issue #3977: NIFI-7007 Add update functionality to the PutCassandraRecord processor.
URL: https://github.com/apache/nifi/pull/3977#issuecomment-579858760
 
 
   > I tested it with a running Cassandra and it seems there are some issue.
   > The `PutCassandraRecord` uses batch statements but it seems you cannot use that for `counter` type columns:
   > 
   > ```java
   > ore.exceptions.InvalidQueryException: Cannot include a counter statement in a logged batch: com.datastax.driver.core.exceptions.InvalidQueryException: Cannot include a counter statement in a logged batch
   > com.datastax.driver.core.exceptions.InvalidQueryException: Cannot include a counter statement in a logged batch
   >         at com.datastax.driver.core.exceptions.InvalidQueryException.copy(InvalidQueryException.java:50)
   >         at com.datastax.driver.core.DriverThrowables.propagateCause(DriverThrowables.java:37)
   >         at com.datastax.driver.core.DefaultResultSetFuture.getUninterruptibly(DefaultResultSetFuture.java:245)
   >         at com.datastax.driver.core.AbstractSession.execute(AbstractSession.java:68)
   >         at org.apache.nifi.processors.cassandra.PutCassandraRecord.onTrigger(PutCassandraRecord.java:285)
   > ```
   > 
   > Maybe we don't want to start supporting counters, but `Increment` and `Decrement` can only be used for those. And if we remove those, the UPDATE SET is basically identical to an INSERT.
   
   Hey @tpalfy 
   
   COUNTER is just another batch statement type, which I've already added. Just make sure that you select COUNTER in the Batch Statement Type (can also be supplied via cql.batch.statement.type if required).
   
   (posted from my personal account as I don't have access to the other one from home)

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi] woutifier-t commented on a change in pull request #3977: NIFI-7007 Add update functionality to the PutCassandraRecord processor.

Posted by GitBox <gi...@apache.org>.
woutifier-t commented on a change in pull request #3977: NIFI-7007 Add update functionality to the PutCassandraRecord processor.
URL: https://github.com/apache/nifi/pull/3977#discussion_r375287233
 
 

 ##########
 File path: nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/PutCassandraRecord.java
 ##########
 @@ -362,6 +381,38 @@ private Long convertFieldObjectToLong(String name, Object value) {
         return ((Number) value).longValue();
     }
 
+    @Override
+    protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
+        Set<ValidationResult> results = (Set<ValidationResult>) super.customValidate(validationContext);
+
+        String statementType = validationContext.getProperty(STATEMENT_TYPE).getValue();
+
+        if (UPDATE_TYPE.getValue().equalsIgnoreCase(statementType)) {
+            // Check that update keys and update method are set
+            String updateKeys = validationContext.getProperty(UPDATE_KEYS).getValue();
+            String updateMethod = validationContext.getProperty(UPDATE_METHOD).getValue();
+            if (StringUtils.isEmpty(updateKeys)) {
+                results.add(new ValidationResult.Builder().subject("Update statement configuration").valid(false).explanation(
+                        "if the Statement Type is set to Update, then the Update Keys must be specified as well").build());
+            }
+            if (StringUtils.isEmpty(updateMethod)) {
+                results.add(new ValidationResult.Builder().subject("Update statement configuration").valid(false).explanation(
+                        "if the Statement Type is set to Update, then the Update Method must be specified as well").build());
+            }
+
+            // Check that if the update method is set to increment or decrement that the batch statement type is set to
+            // unlogged or counter (or USE_ATTR_TYPE, which we cannot check at this point).
+            String batchStatementType = validationContext.getProperty(BATCH_STATEMENT_TYPE).getValue();
+            if (!Set.of(COUNTER_TYPE.getValue(), UNLOGGED_TYPE.getValue(), BATCH_STATEMENT_TYPE_USE_ATTR_TYPE.getValue()).contains(batchStatementType)) {
 
 Review comment:
   Done, thanks for noticing this.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi] woutifier-t commented on a change in pull request #3977: NIFI-7007 Add update functionality to the PutCassandraRecord processor.

Posted by GitBox <gi...@apache.org>.
woutifier-t commented on a change in pull request #3977: NIFI-7007 Add update functionality to the PutCassandraRecord processor.
URL: https://github.com/apache/nifi/pull/3977#discussion_r374602747
 
 

 ##########
 File path: nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/PutCassandraRecord.java
 ##########
 @@ -129,8 +210,39 @@ public void onTrigger(ProcessContext context, ProcessSession session) throws Pro
         final String cassandraTable = context.getProperty(TABLE).evaluateAttributeExpressions(inputFlowFile).getValue();
         final RecordReaderFactory recordParserFactory = context.getProperty(RECORD_READER_FACTORY).asControllerService(RecordReaderFactory.class);
         final int batchSize = context.getProperty(BATCH_SIZE).evaluateAttributeExpressions().asInteger();
-        final String batchStatementType = context.getProperty(BATCH_STATEMENT_TYPE).getValue();
         final String serialConsistencyLevel = context.getProperty(CONSISTENCY_LEVEL).getValue();
+        final String updateKeys = context.getProperty(UPDATE_KEYS).evaluateAttributeExpressions(inputFlowFile).getValue();
 
 Review comment:
   I've added a unit test for this, and added a customValidate to check it.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi] woutifier-t commented on a change in pull request #3977: NIFI-7007 Add update functionality to the PutCassandraRecord processor.

Posted by GitBox <gi...@apache.org>.
woutifier-t commented on a change in pull request #3977: NIFI-7007 Add update functionality to the PutCassandraRecord processor.
URL: https://github.com/apache/nifi/pull/3977#discussion_r368522537
 
 

 ##########
 File path: nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/PutCassandraRecord.java
 ##########
 @@ -67,6 +75,35 @@
             .required(true)
             .build();
 
+    static final PropertyDescriptor STATEMENT_TYPE = new PropertyDescriptor.Builder()
+            .name("put-cassandra-record-statement-type")
+            .displayName("Statement Type")
+            .description("Specifies the type of CQL Statement to generate.")
+            .required(true)
+            .defaultValue(INSERT_TYPE)
+            .allowableValues(UPDATE_TYPE, INSERT_TYPE)
+            .build();
+
+    static final PropertyDescriptor UPDATE_METHOD = new PropertyDescriptor.Builder()
+            .name("put-cassandra-record-update-method")
+            .displayName("Update Method")
+            .description("Specifies the method to use to SET the values.")
 
 Review comment:
   I've modified Statement Type, Update Method as well as Batch Statement Type (in case of counter tables can only be counter) to also take their value from an attribute.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi] woutifier-t commented on a change in pull request #3977: NIFI-7007 Add update functionality to the PutCassandraRecord processor.

Posted by GitBox <gi...@apache.org>.
woutifier-t commented on a change in pull request #3977: NIFI-7007 Add update functionality to the PutCassandraRecord processor.
URL: https://github.com/apache/nifi/pull/3977#discussion_r369023279
 
 

 ##########
 File path: nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/PutCassandraRecord.java
 ##########
 @@ -193,6 +303,81 @@ public void onTrigger(ProcessContext context, ProcessSession session) throws Pro
 
     }
 
+    private Statement generateUpdate(String cassandraTable, RecordSchema schema, String updateKeys, String updateMethod, Map<String, Object> recordContentMap) {
+        Update updateQuery;
+
+        // Split up the update key names separated by a comma, should not be empty
+        final Set<String> updateKeyNames;
+        updateKeyNames = Arrays.stream(updateKeys.split(","))
+                .map(String::trim)
+                .filter(StringUtils::isNotEmpty)
+                .collect(Collectors.toSet());
+        if (updateKeyNames.isEmpty()) {
+            throw new IllegalArgumentException("No Update Keys were specified");
+        }
+
+        // Verify if all update keys are present in the record
+        for (String updateKey : updateKeyNames) {
+            if (!schema.getFieldNames().contains(updateKey)) {
+                throw new IllegalArgumentException("Update key '" + updateKey + "' is not present in the record schema");
+            }
+        }
+
+        // Prepare keyspace/table names
+        if (cassandraTable.contains(".")) {
+            String[] keyspaceAndTable = cassandraTable.split("\\.");
+            updateQuery = QueryBuilder.update(keyspaceAndTable[0], keyspaceAndTable[1]);
+        } else {
+            updateQuery = QueryBuilder.update(cassandraTable);
+        }
+
+        // Loop through the field names, setting those that are not in the update key set, and using those
+        // in the update key set as conditions.
+        for (String fieldName : schema.getFieldNames()) {
+            Object fieldValue = recordContentMap.get(fieldName);
+
+            if (updateKeyNames.contains(fieldName)) {
+                updateQuery.where(QueryBuilder.eq(fieldName, fieldValue));
+            } else {
+                Assignment assignment;
+                if (SET_TYPE.getValue().equalsIgnoreCase(updateMethod)) {
+                    assignment = QueryBuilder.set(fieldName, fieldValue);
+                } else {
+                    // Check if the fieldValue is of type long, as this is the only type that is can be used,
+                    // to increment or decrement.
+                    if (!(fieldValue instanceof Long)) {
+                        throw new IllegalArgumentException("Field '" + fieldName + "' is not of type Long, and cannot be used" +
+                                " to increment or decrement.");
+                    }
+
+                    if (INCR_TYPE.getValue().equalsIgnoreCase(updateMethod)) {
+                        assignment = QueryBuilder.incr(fieldName, (Long)fieldValue);
+                    } else if (DECR_TYPE.getValue().equalsIgnoreCase(updateMethod)) {
+                        assignment = QueryBuilder.decr(fieldName, (Long)fieldValue);
+                    } else {
+                        throw new IllegalArgumentException("Update Method '" + updateMethod + "' is not valid.");
 
 Review comment:
   Agreed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi] woutifier-t commented on issue #3977: NIFI-7007 Add update functionality to the PutCassandraRecord processor.

Posted by GitBox <gi...@apache.org>.
woutifier-t commented on issue #3977: NIFI-7007 Add update functionality to the PutCassandraRecord processor.
URL: https://github.com/apache/nifi/pull/3977#issuecomment-577199911
 
 
   That looks great! I'll prepare a commit adding those.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi] tpalfy commented on a change in pull request #3977: NIFI-7007 Add update functionality to the PutCassandraRecord processor.

Posted by GitBox <gi...@apache.org>.
tpalfy commented on a change in pull request #3977: NIFI-7007 Add update functionality to the PutCassandraRecord processor.
URL: https://github.com/apache/nifi/pull/3977#discussion_r375168357
 
 

 ##########
 File path: nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/PutCassandraRecord.java
 ##########
 @@ -362,6 +381,38 @@ private Long convertFieldObjectToLong(String name, Object value) {
         return ((Number) value).longValue();
     }
 
+    @Override
+    protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
+        Set<ValidationResult> results = (Set<ValidationResult>) super.customValidate(validationContext);
+
+        String statementType = validationContext.getProperty(STATEMENT_TYPE).getValue();
+
+        if (UPDATE_TYPE.getValue().equalsIgnoreCase(statementType)) {
+            // Check that update keys and update method are set
+            String updateKeys = validationContext.getProperty(UPDATE_KEYS).getValue();
+            String updateMethod = validationContext.getProperty(UPDATE_METHOD).getValue();
+            if (StringUtils.isEmpty(updateKeys)) {
+                results.add(new ValidationResult.Builder().subject("Update statement configuration").valid(false).explanation(
+                        "if the Statement Type is set to Update, then the Update Keys must be specified as well").build());
+            }
+            if (StringUtils.isEmpty(updateMethod)) {
 
 Review comment:
   Minor: It's impossible for the `updateMethod` to be empty and reach `customValidate`. (The property is assigned allowed values.)
   I would remove this check. (Current tests don't cover it either.)

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi] woutifier-t commented on a change in pull request #3977: NIFI-7007 Add update functionality to the PutCassandraRecord processor.

Posted by GitBox <gi...@apache.org>.
woutifier-t commented on a change in pull request #3977: NIFI-7007 Add update functionality to the PutCassandraRecord processor.
URL: https://github.com/apache/nifi/pull/3977#discussion_r369426745
 
 

 ##########
 File path: nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/PutCassandraRecord.java
 ##########
 @@ -193,6 +303,81 @@ public void onTrigger(ProcessContext context, ProcessSession session) throws Pro
 
     }
 
+    private Statement generateUpdate(String cassandraTable, RecordSchema schema, String updateKeys, String updateMethod, Map<String, Object> recordContentMap) {
+        Update updateQuery;
+
+        // Split up the update key names separated by a comma, should not be empty
+        final Set<String> updateKeyNames;
+        updateKeyNames = Arrays.stream(updateKeys.split(","))
+                .map(String::trim)
+                .filter(StringUtils::isNotEmpty)
+                .collect(Collectors.toSet());
+        if (updateKeyNames.isEmpty()) {
+            throw new IllegalArgumentException("No Update Keys were specified");
+        }
+
+        // Verify if all update keys are present in the record
+        for (String updateKey : updateKeyNames) {
+            if (!schema.getFieldNames().contains(updateKey)) {
+                throw new IllegalArgumentException("Update key '" + updateKey + "' is not present in the record schema");
+            }
+        }
+
+        // Prepare keyspace/table names
+        if (cassandraTable.contains(".")) {
+            String[] keyspaceAndTable = cassandraTable.split("\\.");
+            updateQuery = QueryBuilder.update(keyspaceAndTable[0], keyspaceAndTable[1]);
+        } else {
+            updateQuery = QueryBuilder.update(cassandraTable);
+        }
+
+        // Loop through the field names, setting those that are not in the update key set, and using those
+        // in the update key set as conditions.
+        for (String fieldName : schema.getFieldNames()) {
+            Object fieldValue = recordContentMap.get(fieldName);
+
+            if (updateKeyNames.contains(fieldName)) {
+                updateQuery.where(QueryBuilder.eq(fieldName, fieldValue));
+            } else {
+                Assignment assignment;
+                if (SET_TYPE.getValue().equalsIgnoreCase(updateMethod)) {
+                    assignment = QueryBuilder.set(fieldName, fieldValue);
+                } else {
+                    // Check if the fieldValue is of type long, as this is the only type that is can be used,
+                    // to increment or decrement.
+                    if (!(fieldValue instanceof Long)) {
+                        throw new IllegalArgumentException("Field '" + fieldName + "' is not of type Long, and cannot be used" +
+                                " to increment or decrement.");
+                    }
+
+                    if (INCR_TYPE.getValue().equalsIgnoreCase(updateMethod)) {
+                        assignment = QueryBuilder.incr(fieldName, (Long)fieldValue);
+                    } else if (DECR_TYPE.getValue().equalsIgnoreCase(updateMethod)) {
+                        assignment = QueryBuilder.decr(fieldName, (Long)fieldValue);
+                    } else {
+                        throw new IllegalArgumentException("Update Method '" + updateMethod + "' is not valid.");
 
 Review comment:
   Ah, no I meant that I see your point and will push a commit to that effect :-).

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi] Woutifier commented on issue #3977: NIFI-7007 Add update functionality to the PutCassandraRecord processor.

Posted by GitBox <gi...@apache.org>.
Woutifier commented on issue #3977: NIFI-7007 Add update functionality to the PutCassandraRecord processor.
URL: https://github.com/apache/nifi/pull/3977#issuecomment-579858760
 
 
   > I tested it with a running Cassandra and it seems there are some issue.
   > The `PutCassandraRecord` uses batch statements but it seems you cannot use that for `counter` type columns:
   > 
   > ```java
   > ore.exceptions.InvalidQueryException: Cannot include a counter statement in a logged batch: com.datastax.driver.core.exceptions.InvalidQueryException: Cannot include a counter statement in a logged batch
   > com.datastax.driver.core.exceptions.InvalidQueryException: Cannot include a counter statement in a logged batch
   >         at com.datastax.driver.core.exceptions.InvalidQueryException.copy(InvalidQueryException.java:50)
   >         at com.datastax.driver.core.DriverThrowables.propagateCause(DriverThrowables.java:37)
   >         at com.datastax.driver.core.DefaultResultSetFuture.getUninterruptibly(DefaultResultSetFuture.java:245)
   >         at com.datastax.driver.core.AbstractSession.execute(AbstractSession.java:68)
   >         at org.apache.nifi.processors.cassandra.PutCassandraRecord.onTrigger(PutCassandraRecord.java:285)
   > ```
   > 
   > Maybe we don't want to start supporting counters, but `Increment` and `Decrement` can only be used for those. And if we remove those, the UPDATE SET is basically identical to an INSERT.
   
   Hey @tpalfy 
   
   COUNTER is just another batch statement type, which I've already added. Just make sure that you select COUNTER in the Batch Statement Type (can also be supplied via cql.batch.statement.type if required).

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi] mattyb149 commented on a change in pull request #3977: NIFI-7007 Add update functionality to the PutCassandraRecord processor.

Posted by GitBox <gi...@apache.org>.
mattyb149 commented on a change in pull request #3977: NIFI-7007 Add update functionality to the PutCassandraRecord processor.
URL: https://github.com/apache/nifi/pull/3977#discussion_r366564750
 
 

 ##########
 File path: nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/PutCassandraRecord.java
 ##########
 @@ -67,6 +75,35 @@
             .required(true)
             .build();
 
+    static final PropertyDescriptor STATEMENT_TYPE = new PropertyDescriptor.Builder()
+            .name("put-cassandra-record-statement-type")
+            .displayName("Statement Type")
+            .description("Specifies the type of CQL Statement to generate.")
+            .required(true)
+            .defaultValue(INSERT_TYPE)
+            .allowableValues(UPDATE_TYPE, INSERT_TYPE)
+            .build();
+
+    static final PropertyDescriptor UPDATE_METHOD = new PropertyDescriptor.Builder()
+            .name("put-cassandra-record-update-method")
+            .displayName("Update Method")
+            .description("Specifies the method to use to SET the values.")
 
 Review comment:
   This should include in the description that the property is only used if the Statement Type is Update, and ignored otherwise.
   
   Also, do you anticipate multiple flow files coming in, each needing a different Update Method? You can't use Expression Language with AllowableValues/dropdowns, but a common pattern is to have an AllowableValue whose display name is `Use cql.update.method attribute`, and if that is chosen, the `cql.update.method` attribute is expected to be set to the name of one of the available options. See PutDatabaseRecord for an example of this pattern. Same goes for the Statement Type, if you allow that to be set by an attribute, then you could send all flow files to one PutCassandraRecord instance, rather than having to route on type and then send to 2+ instances (one for INSERT, one for UPDATE, e.g.)

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi] mattyb149 commented on a change in pull request #3977: NIFI-7007 Add update functionality to the PutCassandraRecord processor.

Posted by GitBox <gi...@apache.org>.
mattyb149 commented on a change in pull request #3977: NIFI-7007 Add update functionality to the PutCassandraRecord processor.
URL: https://github.com/apache/nifi/pull/3977#discussion_r366563487
 
 

 ##########
 File path: nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/PutCassandraRecord.java
 ##########
 @@ -57,6 +60,11 @@
 @CapabilityDescription("This is a record aware processor that reads the content of the incoming FlowFile as individual records using the " +
         "configured 'Record Reader' and writes them to Apache Cassandra using native protocol version 3 or higher.")
 public class PutCassandraRecord extends AbstractCassandraProcessor {
+    static final String UPDATE_TYPE = "UPDATE";
 
 Review comment:
   These should probably be `AllowableValue` instances rather than `String`, that way you can still include the name, but also a display name and a description (which is used for tooltips). You can still add them the same way to the property (since the `.allowableValues()` method accepts either Strings or AllowableValues.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi] tpalfy commented on a change in pull request #3977: NIFI-7007 Add update functionality to the PutCassandraRecord processor.

Posted by GitBox <gi...@apache.org>.
tpalfy commented on a change in pull request #3977: NIFI-7007 Add update functionality to the PutCassandraRecord processor.
URL: https://github.com/apache/nifi/pull/3977#discussion_r369071421
 
 

 ##########
 File path: nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/PutCassandraRecord.java
 ##########
 @@ -193,6 +303,81 @@ public void onTrigger(ProcessContext context, ProcessSession session) throws Pro
 
     }
 
+    private Statement generateUpdate(String cassandraTable, RecordSchema schema, String updateKeys, String updateMethod, Map<String, Object> recordContentMap) {
+        Update updateQuery;
+
+        // Split up the update key names separated by a comma, should not be empty
+        final Set<String> updateKeyNames;
+        updateKeyNames = Arrays.stream(updateKeys.split(","))
+                .map(String::trim)
+                .filter(StringUtils::isNotEmpty)
+                .collect(Collectors.toSet());
+        if (updateKeyNames.isEmpty()) {
+            throw new IllegalArgumentException("No Update Keys were specified");
+        }
+
+        // Verify if all update keys are present in the record
+        for (String updateKey : updateKeyNames) {
+            if (!schema.getFieldNames().contains(updateKey)) {
+                throw new IllegalArgumentException("Update key '" + updateKey + "' is not present in the record schema");
+            }
+        }
+
+        // Prepare keyspace/table names
+        if (cassandraTable.contains(".")) {
+            String[] keyspaceAndTable = cassandraTable.split("\\.");
+            updateQuery = QueryBuilder.update(keyspaceAndTable[0], keyspaceAndTable[1]);
+        } else {
+            updateQuery = QueryBuilder.update(cassandraTable);
+        }
+
+        // Loop through the field names, setting those that are not in the update key set, and using those
+        // in the update key set as conditions.
+        for (String fieldName : schema.getFieldNames()) {
+            Object fieldValue = recordContentMap.get(fieldName);
+
+            if (updateKeyNames.contains(fieldName)) {
+                updateQuery.where(QueryBuilder.eq(fieldName, fieldValue));
+            } else {
+                Assignment assignment;
+                if (SET_TYPE.getValue().equalsIgnoreCase(updateMethod)) {
+                    assignment = QueryBuilder.set(fieldName, fieldValue);
+                } else {
+                    // Check if the fieldValue is of type long, as this is the only type that is can be used,
+                    // to increment or decrement.
+                    if (!(fieldValue instanceof Long)) {
+                        throw new IllegalArgumentException("Field '" + fieldName + "' is not of type Long, and cannot be used" +
+                                " to increment or decrement.");
+                    }
+
+                    if (INCR_TYPE.getValue().equalsIgnoreCase(updateMethod)) {
+                        assignment = QueryBuilder.incr(fieldName, (Long)fieldValue);
+                    } else if (DECR_TYPE.getValue().equalsIgnoreCase(updateMethod)) {
+                        assignment = QueryBuilder.decr(fieldName, (Long)fieldValue);
+                    } else {
+                        throw new IllegalArgumentException("Update Method '" + updateMethod + "' is not valid.");
 
 Review comment:
   Is it intentional you mean?
   
   If for example we want to set string fields, but have an error in the update method (let's say it has a type in it like 'SED' instead of 'SET'), the encountered error message "Field is not of type Long..." would be very misleading.
   
   In general the validity of the update method _itself_ is a higher level issue than the validity of the _parameters_ of the update method and as such might be better to check that first.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi] woutifier-t commented on a change in pull request #3977: NIFI-7007 Add update functionality to the PutCassandraRecord processor.

Posted by GitBox <gi...@apache.org>.
woutifier-t commented on a change in pull request #3977: NIFI-7007 Add update functionality to the PutCassandraRecord processor.
URL: https://github.com/apache/nifi/pull/3977#discussion_r375287529
 
 

 ##########
 File path: nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/PutCassandraRecord.java
 ##########
 @@ -362,6 +381,38 @@ private Long convertFieldObjectToLong(String name, Object value) {
         return ((Number) value).longValue();
     }
 
+    @Override
+    protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
+        Set<ValidationResult> results = (Set<ValidationResult>) super.customValidate(validationContext);
+
+        String statementType = validationContext.getProperty(STATEMENT_TYPE).getValue();
+
+        if (UPDATE_TYPE.getValue().equalsIgnoreCase(statementType)) {
+            // Check that update keys and update method are set
+            String updateKeys = validationContext.getProperty(UPDATE_KEYS).getValue();
+            String updateMethod = validationContext.getProperty(UPDATE_METHOD).getValue();
+            if (StringUtils.isEmpty(updateKeys)) {
+                results.add(new ValidationResult.Builder().subject("Update statement configuration").valid(false).explanation(
+                        "if the Statement Type is set to Update, then the Update Keys must be specified as well").build());
+            }
+            if (StringUtils.isEmpty(updateMethod)) {
 
 Review comment:
   True, removed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi] tpalfy commented on a change in pull request #3977: NIFI-7007 Add update functionality to the PutCassandraRecord processor.

Posted by GitBox <gi...@apache.org>.
tpalfy commented on a change in pull request #3977: NIFI-7007 Add update functionality to the PutCassandraRecord processor.
URL: https://github.com/apache/nifi/pull/3977#discussion_r368970533
 
 

 ##########
 File path: nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/PutCassandraRecord.java
 ##########
 @@ -193,6 +303,81 @@ public void onTrigger(ProcessContext context, ProcessSession session) throws Pro
 
     }
 
+    private Statement generateUpdate(String cassandraTable, RecordSchema schema, String updateKeys, String updateMethod, Map<String, Object> recordContentMap) {
+        Update updateQuery;
+
+        // Split up the update key names separated by a comma, should not be empty
+        final Set<String> updateKeyNames;
+        updateKeyNames = Arrays.stream(updateKeys.split(","))
+                .map(String::trim)
+                .filter(StringUtils::isNotEmpty)
+                .collect(Collectors.toSet());
+        if (updateKeyNames.isEmpty()) {
+            throw new IllegalArgumentException("No Update Keys were specified");
+        }
+
+        // Verify if all update keys are present in the record
+        for (String updateKey : updateKeyNames) {
+            if (!schema.getFieldNames().contains(updateKey)) {
+                throw new IllegalArgumentException("Update key '" + updateKey + "' is not present in the record schema");
+            }
+        }
+
+        // Prepare keyspace/table names
+        if (cassandraTable.contains(".")) {
+            String[] keyspaceAndTable = cassandraTable.split("\\.");
+            updateQuery = QueryBuilder.update(keyspaceAndTable[0], keyspaceAndTable[1]);
+        } else {
+            updateQuery = QueryBuilder.update(cassandraTable);
+        }
+
+        // Loop through the field names, setting those that are not in the update key set, and using those
+        // in the update key set as conditions.
+        for (String fieldName : schema.getFieldNames()) {
+            Object fieldValue = recordContentMap.get(fieldName);
+
+            if (updateKeyNames.contains(fieldName)) {
+                updateQuery.where(QueryBuilder.eq(fieldName, fieldValue));
+            } else {
+                Assignment assignment;
+                if (SET_TYPE.getValue().equalsIgnoreCase(updateMethod)) {
+                    assignment = QueryBuilder.set(fieldName, fieldValue);
+                } else {
+                    // Check if the fieldValue is of type long, as this is the only type that is can be used,
+                    // to increment or decrement.
+                    if (!(fieldValue instanceof Long)) {
 
 Review comment:
   Why can only Long types be used to increment or decrement?
   This schema could be inferred from a JSON or a CSV and some fields could be integers.
   The Cassandra update statement could still be valid couldn't it?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi] tpalfy commented on a change in pull request #3977: NIFI-7007 Add update functionality to the PutCassandraRecord processor.

Posted by GitBox <gi...@apache.org>.
tpalfy commented on a change in pull request #3977: NIFI-7007 Add update functionality to the PutCassandraRecord processor.
URL: https://github.com/apache/nifi/pull/3977#discussion_r369071421
 
 

 ##########
 File path: nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/PutCassandraRecord.java
 ##########
 @@ -193,6 +303,81 @@ public void onTrigger(ProcessContext context, ProcessSession session) throws Pro
 
     }
 
+    private Statement generateUpdate(String cassandraTable, RecordSchema schema, String updateKeys, String updateMethod, Map<String, Object> recordContentMap) {
+        Update updateQuery;
+
+        // Split up the update key names separated by a comma, should not be empty
+        final Set<String> updateKeyNames;
+        updateKeyNames = Arrays.stream(updateKeys.split(","))
+                .map(String::trim)
+                .filter(StringUtils::isNotEmpty)
+                .collect(Collectors.toSet());
+        if (updateKeyNames.isEmpty()) {
+            throw new IllegalArgumentException("No Update Keys were specified");
+        }
+
+        // Verify if all update keys are present in the record
+        for (String updateKey : updateKeyNames) {
+            if (!schema.getFieldNames().contains(updateKey)) {
+                throw new IllegalArgumentException("Update key '" + updateKey + "' is not present in the record schema");
+            }
+        }
+
+        // Prepare keyspace/table names
+        if (cassandraTable.contains(".")) {
+            String[] keyspaceAndTable = cassandraTable.split("\\.");
+            updateQuery = QueryBuilder.update(keyspaceAndTable[0], keyspaceAndTable[1]);
+        } else {
+            updateQuery = QueryBuilder.update(cassandraTable);
+        }
+
+        // Loop through the field names, setting those that are not in the update key set, and using those
+        // in the update key set as conditions.
+        for (String fieldName : schema.getFieldNames()) {
+            Object fieldValue = recordContentMap.get(fieldName);
+
+            if (updateKeyNames.contains(fieldName)) {
+                updateQuery.where(QueryBuilder.eq(fieldName, fieldValue));
+            } else {
+                Assignment assignment;
+                if (SET_TYPE.getValue().equalsIgnoreCase(updateMethod)) {
+                    assignment = QueryBuilder.set(fieldName, fieldValue);
+                } else {
+                    // Check if the fieldValue is of type long, as this is the only type that is can be used,
+                    // to increment or decrement.
+                    if (!(fieldValue instanceof Long)) {
+                        throw new IllegalArgumentException("Field '" + fieldName + "' is not of type Long, and cannot be used" +
+                                " to increment or decrement.");
+                    }
+
+                    if (INCR_TYPE.getValue().equalsIgnoreCase(updateMethod)) {
+                        assignment = QueryBuilder.incr(fieldName, (Long)fieldValue);
+                    } else if (DECR_TYPE.getValue().equalsIgnoreCase(updateMethod)) {
+                        assignment = QueryBuilder.decr(fieldName, (Long)fieldValue);
+                    } else {
+                        throw new IllegalArgumentException("Update Method '" + updateMethod + "' is not valid.");
 
 Review comment:
   Is it intentional you mean?
   
   If for example we want to set string fields, but have an error in the update method (let's say it has a typo in it like 'SED' instead of 'SET'), the encountered error message "Field is not of type Long..." would be very misleading.
   
   In general the validity of the update method _itself_ is a higher level issue than the validity of the _parameters_ of the update method and as such might be better to check that first.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi] tpalfy commented on a change in pull request #3977: NIFI-7007 Add update functionality to the PutCassandraRecord processor.

Posted by GitBox <gi...@apache.org>.
tpalfy commented on a change in pull request #3977: NIFI-7007 Add update functionality to the PutCassandraRecord processor.
URL: https://github.com/apache/nifi/pull/3977#discussion_r369698959
 
 

 ##########
 File path: nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/PutCassandraRecordUpdateTest.java
 ##########
 @@ -0,0 +1,277 @@
+package org.apache.nifi.processors.cassandra;
+
+import com.datastax.driver.core.Statement;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.util.Tuple;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.when;
+
+public class PutCassandraRecordUpdateTest {
+    private PutCassandraRecord testSubject;
+
+    @Mock
+    private RecordSchema schema;
+
+    @Before
+    public void setUp() {
+        MockitoAnnotations.initMocks(this);
+
+        testSubject = new PutCassandraRecord();
+    }
+
+    @Test
+    public void testGenerateUpdateWithEmptyKeyList() {
+        Stream.of("", ",", ",,,").forEach(updateKeys -> testGenerateUpdate(
+                "keyspace.table",
+                updateKeys,
+                PutCassandraRecord.SET_TYPE.getValue(),
+                Arrays.asList(
+                        new Tuple<>("keyField", 1),
+                        new Tuple<>("stringField", "newStringValue")
+                ),
+                new IllegalArgumentException("No Update Keys were specified")
+        ));
+    }
+
+    @Test
+    public void testGenerateUpdateWithMissingKey() {
+        testGenerateUpdate(
+                "keyspace.table",
+                "keyField,missingKeyField",
+                PutCassandraRecord.SET_TYPE.getValue(),
+                Arrays.asList(
+                        new Tuple<>("keyField", 1),
+                        new Tuple<>("stringField", "newStringValue")
+                ),
+                new IllegalArgumentException("Update key 'missingKeyField' is not present in the record schema")
+        );
+    }
+
+    @Test
+    public void testGenerateUpdateWithInvalidUpdateMethod() {
+        testGenerateUpdate(
+                "keyspace.table",
+                "keyField",
+                "invalidUpdateMethod",
+                Arrays.asList(
+                        new Tuple<>("keyField", 1),
+                        new Tuple<>("longField", 15L)
+                ),
+                new IllegalArgumentException("Update Method 'invalidUpdateMethod' is not valid.")
+        );
+    }
+
+    @Test
+    public void testGenerateUpdateIncrementString() {
+        testGenerateUpdate(
+                "keyspace.table",
+                "keyField",
+                PutCassandraRecord.INCR_TYPE.getValue(),
+                Arrays.asList(
+                        new Tuple<>("keyField", 1),
+                        new Tuple<>("stringField", "15")
+                ),
+                new IllegalArgumentException("Field 'stringField' is not of type Number")
+        );
+    }
+
+    @Test
+    public void testGenerateUpdateSimpleTableName() {
+        testGenerateUpdate(
+                "table",
+                "keyField1",
+                PutCassandraRecord.SET_TYPE.getValue(),
+                Arrays.asList(
+                        new Tuple<>("keyField1", 1),
+                        new Tuple<>("stringField", "newStringValue")
+                ),
+                "UPDATE table SET stringField='newStringValue' WHERE keyField1=1;"
+        );
+    }
+
+    @Test
+    public void testGenerateUpdateKeyspacedTableName() {
+        testGenerateUpdate(
+                "keyspace.table",
+                "keyField1",
+                PutCassandraRecord.SET_TYPE.getValue(),
+                Arrays.asList(
+                        new Tuple<>("keyField1", 1),
+                        new Tuple<>("stringField", "newStringValue")
+                ),
+                "UPDATE keyspace.table SET stringField='newStringValue' WHERE keyField1=1;"
+        );
+    }
+
+    @Test
+    public void testGenerateUpdateMultipleKeys() {
+        testGenerateUpdate(
+                "keyspace.table",
+                "keyField1,keyField2,keyField3",
+                PutCassandraRecord.SET_TYPE.getValue(),
+                Arrays.asList(
+                        new Tuple<>("keyField1", 1),
+                        new Tuple<>("keyField2", "key2"),
+                        new Tuple<>("keyField3", 123L),
+                        new Tuple<>("stringField", "newStringValue")
+                ),
+                "UPDATE keyspace.table SET stringField='newStringValue' WHERE keyField1=1 AND keyField2='key2' AND keyField3=123;"
+        );
+    }
+
+    @Test
+    public void testGenerateUpdateIncrementLong() {
+        testGenerateUpdate(
+                "keyspace.table",
+                "keyField",
+                PutCassandraRecord.INCR_TYPE.getValue(),
+                Arrays.asList(
+                        new Tuple<>("keyField", 1),
+                        new Tuple<>("longField", 15L)
+                ),
+                "UPDATE keyspace.table SET longField=longField+15 WHERE keyField=1;"
+        );
+    }
+
+    @Test
+    public void testGenerateUpdateDecrementLong() {
+        testGenerateUpdate(
+                "keyspace.table",
+                "keyField",
+                PutCassandraRecord.DECR_TYPE.getValue(),
+                Arrays.asList(
+                        new Tuple<>("keyField", 1),
+                        new Tuple<>("longField", 15L)
+                ),
+                "UPDATE keyspace.table SET longField=longField-15 WHERE keyField=1;"
+        );
+    }
+
+    @Test
+    public void testGenerateUpdateIncrementInteger() {
+        testGenerateUpdate(
+                "keyspace.table",
+                "keyField",
+                PutCassandraRecord.INCR_TYPE.getValue(),
+                Arrays.asList(
+                        new Tuple<>("keyField", 1),
+                        new Tuple<>("integerField", 15)
+                ),
+                "UPDATE keyspace.table SET integerField=integerField+15 WHERE keyField=1;"
+        );
+    }
+
+    @Test
+    public void testGenerateUpdateIncrementFloat() {
+        testGenerateUpdate(
+                "keyspace.table",
+                "keyField",
+                PutCassandraRecord.INCR_TYPE.getValue(),
+                Arrays.asList(
+                        new Tuple<>("keyField", 1),
+                        new Tuple<>("floatField", 15.05F)
+                ),
+                "UPDATE keyspace.table SET floatField=floatField+15 WHERE keyField=1;"
+        );
+    }
+
+    @Test
+    public void testGenerateUpdateIncrementDouble() {
+        testGenerateUpdate(
+                "keyspace.table",
+                "keyField",
+                PutCassandraRecord.INCR_TYPE.getValue(),
+                Arrays.asList(
+                        new Tuple<>("keyField", 1),
+                        new Tuple<>("doubleField", 15.05D)
+                ),
+                "UPDATE keyspace.table SET doubleField=doubleField+15 WHERE keyField=1;"
+        );
+    }
+
+    @Test
+    public void testGenerateUpdateSetMultipleValues() {
+        testGenerateUpdate(
+                "keyspace.table",
+                "keyField",
+                PutCassandraRecord.SET_TYPE.getValue(),
+                Arrays.asList(
+                        new Tuple<>("keyField", 1),
+                        new Tuple<>("stringField", "newStringValue"),
+                        new Tuple<>("integerField", 15),
+                        new Tuple<>("longField", 67L)
+                ),
+                "UPDATE keyspace.table SET stringField='newStringValue',integerField=15,longField=67 WHERE keyField=1;"
+        );
+    }
+
+    @Test
+    public void testGenerateUpdateIncrementMultipleValues() {
+        testGenerateUpdate(
+                "keyspace.table",
+                "keyField",
+                PutCassandraRecord.INCR_TYPE.getValue(),
+                Arrays.asList(
+                        new Tuple<>("keyField", 1),
+                        new Tuple<>("integerField", 15),
+                        new Tuple<>("longField", 67L)
+                ),
+                "UPDATE keyspace.table SET integerField=integerField+15,longField=longField+67 WHERE keyField=1;"
+        );
+    }
+
+    @Test
+    public void testGenerateUpdateDecrementMultipleValues() {
+        testGenerateUpdate(
+                "keyspace.table",
+                "keyField",
+                PutCassandraRecord.DECR_TYPE.getValue(),
+                Arrays.asList(
+                        new Tuple<>("keyField", 1),
+                        new Tuple<>("integerField", 15),
+                        new Tuple<>("longField", 67L)
+                ),
+                "UPDATE keyspace.table SET integerField=integerField-15,longField=longField-67 WHERE keyField=1;"
+        );
+    }
+
+    private void testGenerateUpdate(String table, String updateKeys, String updateMethod, List<Tuple<String, Object>> records, String expected) {
+        Map<String, Object> recordContentMap = records.stream()
+                .collect(Collectors.toMap(Tuple::getKey, Tuple::getValue));
+
+        List<String> fieldNames = records.stream().map(Tuple::getKey).collect(Collectors.toList());
+
+        when(schema.getFieldNames()).thenReturn(fieldNames);
+        Statement actual = testSubject.generateUpdate(table, schema, updateKeys, updateMethod, recordContentMap);
+
+        assertEquals(expected, actual.toString());
+    }
+
+    private <E extends Exception> void testGenerateUpdate(String table, String updateKeys, String updateMethod, List<Tuple<String, Object>> records, E expected) {
 
 Review comment:
   Minor: The `table` parameter is not used (it is hardcoded instead). I guess it's true that in this case it is irrelevant as we expect an exception (one that is unrelated to the `table` name).
   
   Still we could either remove the parameter or just use it regardless. (Actually it's possible to provide an invalid table name though not sure if we consider it a valid scenario. But "keyspace." would result in an exception for example.)

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi] woutifier-t commented on a change in pull request #3977: NIFI-7007 Add update functionality to the PutCassandraRecord processor.

Posted by GitBox <gi...@apache.org>.
woutifier-t commented on a change in pull request #3977: NIFI-7007 Add update functionality to the PutCassandraRecord processor.
URL: https://github.com/apache/nifi/pull/3977#discussion_r369021524
 
 

 ##########
 File path: nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/PutCassandraRecord.java
 ##########
 @@ -193,6 +303,81 @@ public void onTrigger(ProcessContext context, ProcessSession session) throws Pro
 
     }
 
+    private Statement generateUpdate(String cassandraTable, RecordSchema schema, String updateKeys, String updateMethod, Map<String, Object> recordContentMap) {
+        Update updateQuery;
+
+        // Split up the update key names separated by a comma, should not be empty
+        final Set<String> updateKeyNames;
+        updateKeyNames = Arrays.stream(updateKeys.split(","))
+                .map(String::trim)
+                .filter(StringUtils::isNotEmpty)
+                .collect(Collectors.toSet());
+        if (updateKeyNames.isEmpty()) {
+            throw new IllegalArgumentException("No Update Keys were specified");
+        }
+
+        // Verify if all update keys are present in the record
+        for (String updateKey : updateKeyNames) {
+            if (!schema.getFieldNames().contains(updateKey)) {
+                throw new IllegalArgumentException("Update key '" + updateKey + "' is not present in the record schema");
+            }
+        }
+
+        // Prepare keyspace/table names
+        if (cassandraTable.contains(".")) {
+            String[] keyspaceAndTable = cassandraTable.split("\\.");
+            updateQuery = QueryBuilder.update(keyspaceAndTable[0], keyspaceAndTable[1]);
+        } else {
+            updateQuery = QueryBuilder.update(cassandraTable);
+        }
+
+        // Loop through the field names, setting those that are not in the update key set, and using those
+        // in the update key set as conditions.
+        for (String fieldName : schema.getFieldNames()) {
+            Object fieldValue = recordContentMap.get(fieldName);
+
+            if (updateKeyNames.contains(fieldName)) {
+                updateQuery.where(QueryBuilder.eq(fieldName, fieldValue));
+            } else {
+                Assignment assignment;
+                if (SET_TYPE.getValue().equalsIgnoreCase(updateMethod)) {
+                    assignment = QueryBuilder.set(fieldName, fieldValue);
+                } else {
+                    // Check if the fieldValue is of type long, as this is the only type that is can be used,
+                    // to increment or decrement.
+                    if (!(fieldValue instanceof Long)) {
 
 Review comment:
   The datastax cassandra driver implementation expects a long as a parameter to incr(). Do you have a recommendation on how to reliably cast an Object to a long/Long? What types besides long and integer do we expect?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi] tpalfy commented on a change in pull request #3977: NIFI-7007 Add update functionality to the PutCassandraRecord processor.

Posted by GitBox <gi...@apache.org>.
tpalfy commented on a change in pull request #3977: NIFI-7007 Add update functionality to the PutCassandraRecord processor.
URL: https://github.com/apache/nifi/pull/3977#discussion_r368672071
 
 

 ##########
 File path: nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/PutCassandraRecord.java
 ##########
 @@ -193,6 +298,74 @@ public void onTrigger(ProcessContext context, ProcessSession session) throws Pro
 
     }
 
+    private Statement generateUpdate(String cassandraTable, RecordSchema schema, String updateKeys, String updateMethod, Map<String, Object> recordContentMap) {
 
 Review comment:
   I feel like this is a decent amount of logic. Could add some tests to cover it.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi] woutifier-t commented on a change in pull request #3977: NIFI-7007 Add update functionality to the PutCassandraRecord processor.

Posted by GitBox <gi...@apache.org>.
woutifier-t commented on a change in pull request #3977: NIFI-7007 Add update functionality to the PutCassandraRecord processor.
URL: https://github.com/apache/nifi/pull/3977#discussion_r366836539
 
 

 ##########
 File path: nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/PutCassandraRecord.java
 ##########
 @@ -67,6 +75,35 @@
             .required(true)
             .build();
 
+    static final PropertyDescriptor STATEMENT_TYPE = new PropertyDescriptor.Builder()
+            .name("put-cassandra-record-statement-type")
+            .displayName("Statement Type")
+            .description("Specifies the type of CQL Statement to generate.")
+            .required(true)
+            .defaultValue(INSERT_TYPE)
+            .allowableValues(UPDATE_TYPE, INSERT_TYPE)
+            .build();
+
+    static final PropertyDescriptor UPDATE_METHOD = new PropertyDescriptor.Builder()
+            .name("put-cassandra-record-update-method")
+            .displayName("Update Method")
+            .description("Specifies the method to use to SET the values.")
 
 Review comment:
   Hey @mattyb149 I don't see many more statement types, as Cassandra doesn't really have a notion of upsert (an insert and update are mostly the same, aside from COUNTER tables where insert doesn't work). The only thing I see in the future is a DELETE statement, for which the update methods would also not be applicable. On the other hand, this feels cleaner than having "Update Set, Update Increment,..". I'm fine either way so if fewer properties is preferred then I can get rid of the update method.
   
   Depending on your answer on the above, I'll add the cql.update.method attribute and/or cql.statement.type attribute as well. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi] woutifier-t commented on issue #3977: NIFI-7007 Add update functionality to the PutCassandraRecord processor.

Posted by GitBox <gi...@apache.org>.
woutifier-t commented on issue #3977: NIFI-7007 Add update functionality to the PutCassandraRecord processor.
URL: https://github.com/apache/nifi/pull/3977#issuecomment-576290956
 
 
   The build is failing because of some http/https issue with maven (I believe unrelated to the commits in this PR). Let me know if I can go ahead and rebase against master.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi] woutifier-t commented on a change in pull request #3977: NIFI-7007 Add update functionality to the PutCassandraRecord processor.

Posted by GitBox <gi...@apache.org>.
woutifier-t commented on a change in pull request #3977: NIFI-7007 Add update functionality to the PutCassandraRecord processor.
URL: https://github.com/apache/nifi/pull/3977#discussion_r369021524
 
 

 ##########
 File path: nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/PutCassandraRecord.java
 ##########
 @@ -193,6 +303,81 @@ public void onTrigger(ProcessContext context, ProcessSession session) throws Pro
 
     }
 
+    private Statement generateUpdate(String cassandraTable, RecordSchema schema, String updateKeys, String updateMethod, Map<String, Object> recordContentMap) {
+        Update updateQuery;
+
+        // Split up the update key names separated by a comma, should not be empty
+        final Set<String> updateKeyNames;
+        updateKeyNames = Arrays.stream(updateKeys.split(","))
+                .map(String::trim)
+                .filter(StringUtils::isNotEmpty)
+                .collect(Collectors.toSet());
+        if (updateKeyNames.isEmpty()) {
+            throw new IllegalArgumentException("No Update Keys were specified");
+        }
+
+        // Verify if all update keys are present in the record
+        for (String updateKey : updateKeyNames) {
+            if (!schema.getFieldNames().contains(updateKey)) {
+                throw new IllegalArgumentException("Update key '" + updateKey + "' is not present in the record schema");
+            }
+        }
+
+        // Prepare keyspace/table names
+        if (cassandraTable.contains(".")) {
+            String[] keyspaceAndTable = cassandraTable.split("\\.");
+            updateQuery = QueryBuilder.update(keyspaceAndTable[0], keyspaceAndTable[1]);
+        } else {
+            updateQuery = QueryBuilder.update(cassandraTable);
+        }
+
+        // Loop through the field names, setting those that are not in the update key set, and using those
+        // in the update key set as conditions.
+        for (String fieldName : schema.getFieldNames()) {
+            Object fieldValue = recordContentMap.get(fieldName);
+
+            if (updateKeyNames.contains(fieldName)) {
+                updateQuery.where(QueryBuilder.eq(fieldName, fieldValue));
+            } else {
+                Assignment assignment;
+                if (SET_TYPE.getValue().equalsIgnoreCase(updateMethod)) {
+                    assignment = QueryBuilder.set(fieldName, fieldValue);
+                } else {
+                    // Check if the fieldValue is of type long, as this is the only type that is can be used,
+                    // to increment or decrement.
+                    if (!(fieldValue instanceof Long)) {
 
 Review comment:
   The datastax cassandra driver implementation expects a long as a parameter to incr(). Looks like instead we can use
   `
   long b = ((Number)a).longValue()
   `

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi] mattyb149 commented on a change in pull request #3977: NIFI-7007 Add update functionality to the PutCassandraRecord processor.

Posted by GitBox <gi...@apache.org>.
mattyb149 commented on a change in pull request #3977: NIFI-7007 Add update functionality to the PutCassandraRecord processor.
URL: https://github.com/apache/nifi/pull/3977#discussion_r374252991
 
 

 ##########
 File path: nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/PutCassandraRecord.java
 ##########
 @@ -129,8 +210,39 @@ public void onTrigger(ProcessContext context, ProcessSession session) throws Pro
         final String cassandraTable = context.getProperty(TABLE).evaluateAttributeExpressions(inputFlowFile).getValue();
         final RecordReaderFactory recordParserFactory = context.getProperty(RECORD_READER_FACTORY).asControllerService(RecordReaderFactory.class);
         final int batchSize = context.getProperty(BATCH_SIZE).evaluateAttributeExpressions().asInteger();
-        final String batchStatementType = context.getProperty(BATCH_STATEMENT_TYPE).getValue();
         final String serialConsistencyLevel = context.getProperty(CONSISTENCY_LEVEL).getValue();
+        final String updateKeys = context.getProperty(UPDATE_KEYS).evaluateAttributeExpressions(inputFlowFile).getValue();
 
 Review comment:
   if the `Update Keys` property is not set, then `updateKeys` is `null`, not an empty string as is set in the unit tests. This causes a NullPointerException where the update keys are split from the property value. A null check should be added, either to set the field to the empty string or to throw a different exception than NPE. This can/should also be done in `customValidate`, which illustrates the need for custom validation code (albeit duplicated at times).

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi] woutifier-t commented on issue #3977: NIFI-7007 Add update functionality to the PutCassandraRecord processor.

Posted by GitBox <gi...@apache.org>.
woutifier-t commented on issue #3977: NIFI-7007 Add update functionality to the PutCassandraRecord processor.
URL: https://github.com/apache/nifi/pull/3977#issuecomment-579218673
 
 
   > The `PutCassandraRecordUpdateTest.java` needs a license but otherwise this looks ready.
   
   Done :-). 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi] tpalfy commented on a change in pull request #3977: NIFI-7007 Add update functionality to the PutCassandraRecord processor.

Posted by GitBox <gi...@apache.org>.
tpalfy commented on a change in pull request #3977: NIFI-7007 Add update functionality to the PutCassandraRecord processor.
URL: https://github.com/apache/nifi/pull/3977#discussion_r368646980
 
 

 ##########
 File path: nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/PutCassandraRecord.java
 ##########
 @@ -34,29 +41,68 @@
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.util.StandardValidators;
-import org.apache.nifi.serialization.RecordReaderFactory;
 import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
 import org.apache.nifi.serialization.record.Record;
 import org.apache.nifi.serialization.record.RecordFieldType;
 import org.apache.nifi.serialization.record.RecordSchema;
 import org.apache.nifi.serialization.record.util.DataTypeUtils;
 import org.apache.nifi.util.StopWatch;
 
 import java.io.InputStream;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
+import java.util.*;
 
 Review comment:
   This causes a checkstyle violation error.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi] asfgit closed pull request #3977: NIFI-7007 Add update functionality to the PutCassandraRecord processor.

Posted by GitBox <gi...@apache.org>.
asfgit closed pull request #3977: NIFI-7007 Add update functionality to the PutCassandraRecord processor.
URL: https://github.com/apache/nifi/pull/3977
 
 
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi] mattyb149 commented on a change in pull request #3977: NIFI-7007 Add update functionality to the PutCassandraRecord processor.

Posted by GitBox <gi...@apache.org>.
mattyb149 commented on a change in pull request #3977: NIFI-7007 Add update functionality to the PutCassandraRecord processor.
URL: https://github.com/apache/nifi/pull/3977#discussion_r383290090
 
 

 ##########
 File path: nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/PutCassandraRecord.java
 ##########
 @@ -193,6 +319,106 @@ public void onTrigger(ProcessContext context, ProcessSession session) throws Pro
 
     }
 
+    protected Statement generateUpdate(String cassandraTable, RecordSchema schema, String updateKeys, String updateMethod, Map<String, Object> recordContentMap) {
+        Update updateQuery;
+
+        // Split up the update key names separated by a comma, should not be empty
+        final Set<String> updateKeyNames;
+        updateKeyNames = Arrays.stream(updateKeys.split(","))
+                .map(String::trim)
+                .filter(StringUtils::isNotEmpty)
+                .collect(Collectors.toSet());
+        if (updateKeyNames.isEmpty()) {
+            throw new IllegalArgumentException("No Update Keys were specified");
+        }
+
+        // Verify if all update keys are present in the record
+        for (String updateKey : updateKeyNames) {
+            if (!schema.getFieldNames().contains(updateKey)) {
+                throw new IllegalArgumentException("Update key '" + updateKey + "' is not present in the record schema");
+            }
+        }
+
+        // Prepare keyspace/table names
+        if (cassandraTable.contains(".")) {
+            String[] keyspaceAndTable = cassandraTable.split("\\.");
+            updateQuery = QueryBuilder.update(keyspaceAndTable[0], keyspaceAndTable[1]);
+        } else {
+            updateQuery = QueryBuilder.update(cassandraTable);
+        }
+
+        // Loop through the field names, setting those that are not in the update key set, and using those
+        // in the update key set as conditions.
+        for (String fieldName : schema.getFieldNames()) {
+            Object fieldValue = recordContentMap.get(fieldName);
+
+            if (updateKeyNames.contains(fieldName)) {
+                updateQuery.where(QueryBuilder.eq(fieldName, fieldValue));
+            } else {
+                Assignment assignment;
+                if (SET_TYPE.getValue().equalsIgnoreCase(updateMethod)) {
+                    assignment = QueryBuilder.set(fieldName, fieldValue);
+                } else if (INCR_TYPE.getValue().equalsIgnoreCase(updateMethod)) {
+                    assignment = QueryBuilder.incr(fieldName, convertFieldObjectToLong(fieldName, fieldValue));
+                } else if (DECR_TYPE.getValue().equalsIgnoreCase(updateMethod)) {
+                    assignment = QueryBuilder.decr(fieldName, convertFieldObjectToLong(fieldName, fieldValue));
+                } else {
+                    throw new IllegalArgumentException("Update Method '" + updateMethod + "' is not valid.");
+                }
+                updateQuery.with(assignment);
+            }
+        }
+        return updateQuery;
+    }
+
+    private Long convertFieldObjectToLong(String name, Object value) {
+        if (!(value instanceof Number)) {
+            throw new IllegalArgumentException("Field '" + name + "' is not of type Number");
+        }
+        return ((Number) value).longValue();
+    }
+
+    @Override
+    protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
+        Set<ValidationResult> results = (Set<ValidationResult>) super.customValidate(validationContext);
+
+        String statementType = validationContext.getProperty(STATEMENT_TYPE).getValue();
+
+        if (UPDATE_TYPE.getValue().equalsIgnoreCase(statementType)) {
+            // Check that update keys are set
+            String updateKeys = validationContext.getProperty(UPDATE_KEYS).getValue();
+            if (StringUtils.isEmpty(updateKeys)) {
+                results.add(new ValidationResult.Builder().subject("Update statement configuration").valid(false).explanation(
+                        "if the Statement Type is set to Update, then the Update Keys must be specified as well").build());
+            }
+
+            // Check that if the update method is set to increment or decrement that the batch statement type is set to
+            // unlogged or counter (or USE_ATTR_TYPE, which we cannot check at this point).
+            String batchStatementType = validationContext.getProperty(BATCH_STATEMENT_TYPE).getValue();
+            if (!Sets.newHashSet(COUNTER_TYPE.getValue(), UNLOGGED_TYPE.getValue(), BATCH_STATEMENT_TYPE_USE_ATTR_TYPE.getValue()).contains(batchStatementType)) {
 
 Review comment:
   There's a logic error here. If my Batch Statement Type is LOGGED but my Update Type is "Set", the processor is invalid due to the line below. Probably should just have some explicit if-checks rather than a Set.contains() here.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi] woutifier-t commented on a change in pull request #3977: NIFI-7007 Add update functionality to the PutCassandraRecord processor.

Posted by GitBox <gi...@apache.org>.
woutifier-t commented on a change in pull request #3977: NIFI-7007 Add update functionality to the PutCassandraRecord processor.
URL: https://github.com/apache/nifi/pull/3977#discussion_r369052930
 
 

 ##########
 File path: nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/PutCassandraRecord.java
 ##########
 @@ -67,6 +113,36 @@
             .required(true)
             .build();
 
+    static final PropertyDescriptor STATEMENT_TYPE = new PropertyDescriptor.Builder()
+            .name("put-cassandra-record-statement-type")
+            .displayName("Statement Type")
+            .description("Specifies the type of CQL Statement to generate.")
+            .required(true)
+            .defaultValue(INSERT_TYPE.getValue())
+            .allowableValues(UPDATE_TYPE, INSERT_TYPE, STATEMENT_TYPE_USE_ATTR_TYPE)
+            .build();
+
+    static final PropertyDescriptor UPDATE_METHOD = new PropertyDescriptor.Builder()
+            .name("put-cassandra-record-update-method")
+            .displayName("Update Method")
+            .description("Specifies the method to use to SET the values. This property is used if the Statement Type is " +
+                    "UPDATE and ignored otherwise.")
+            .required(false)
+            .defaultValue(SET_TYPE.getValue())
+            .allowableValues(INCR_TYPE, DECR_TYPE, SET_TYPE, UPDATE_METHOD_USE_ATTR_TYPE)
+            .build();
+
+    static final PropertyDescriptor UPDATE_KEYS = new PropertyDescriptor.Builder()
+            .name("put-cassandra-record-update-keys")
+            .displayName("Update Keys")
+            .description("A comma-separated list of column names that uniquely identifies a row in the database for UPDATE statements. "
+                    + "If the Statement Type is UPDATE and this property is not set, the conversion to CQL will fail. "
+                    + "This property is ignored if the Statement Type is not UPDATE.")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
 
 Review comment:
   Interesting, trying to run some test cases against this validator and it exhibits some (for me) unexpected behavior.
   
   * Passing an empty string to it fails validation (expected)
   * Passing just a separator (,) validates (unexpected), regardless if excludeEmptyEntries is set to true or false
   I don't see much in the way of documentation of that validator, but wouldn't it be logical for "," to fail validation? Debugging shows that Java's split method only works that way if we use the overloaded version `",".split(",", -1)`

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi] mattyb149 commented on a change in pull request #3977: NIFI-7007 Add update functionality to the PutCassandraRecord processor.

Posted by GitBox <gi...@apache.org>.
mattyb149 commented on a change in pull request #3977: NIFI-7007 Add update functionality to the PutCassandraRecord processor.
URL: https://github.com/apache/nifi/pull/3977#discussion_r366566815
 
 

 ##########
 File path: nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/PutCassandraRecord.java
 ##########
 @@ -193,6 +231,74 @@ public void onTrigger(ProcessContext context, ProcessSession session) throws Pro
 
     }
 
+    private Statement generateUpdate(String cassandraTable, RecordSchema schema, String updateKeys, String updateMethod, Map<String, Object> recordContentMap) {
+        Update updateQuery;
+
+        // Split up the update key names separated by a comma, we need at least 1 key.
+        final Set<String> updateKeyNames;
+        updateKeyNames = new HashSet<>();
+        for (final String updateKey : updateKeys.split(",")) {
+            updateKeyNames.add(updateKey.trim());
 
 Review comment:
   You may want to check for the empty string here before adding to the set. There's a common pattern you see in NiFi, something like:
   
   ```
   updateKeyNames = Arrays.stream(updateKeys.split(","))
      .map(String::trim)
      .filter(StringUtils::isNotEmpty)
      collect(Collectors.toSet());
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi] mattyb149 commented on a change in pull request #3977: NIFI-7007 Add update functionality to the PutCassandraRecord processor.

Posted by GitBox <gi...@apache.org>.
mattyb149 commented on a change in pull request #3977: NIFI-7007 Add update functionality to the PutCassandraRecord processor.
URL: https://github.com/apache/nifi/pull/3977#discussion_r366568013
 
 

 ##########
 File path: nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/PutCassandraRecordTest.java
 ##########
 @@ -120,6 +120,27 @@ public void testSimplePut() throws InitializationException {
         testRunner.assertAllFlowFilesTransferred(PutCassandraRecord.REL_SUCCESS, 1);
     }
 
+    @Test
+    public void testSimpleUpdate() throws InitializationException {
 
 Review comment:
   Is it possible to add unit tests for the other update methods? I know the whole "server" is mocked out, but might be nice to test non-happy-paths such as non-Long fields attempting to be incremented.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi] woutifier-t commented on a change in pull request #3977: NIFI-7007 Add update functionality to the PutCassandraRecord processor.

Posted by GitBox <gi...@apache.org>.
woutifier-t commented on a change in pull request #3977: NIFI-7007 Add update functionality to the PutCassandraRecord processor.
URL: https://github.com/apache/nifi/pull/3977#discussion_r369666608
 
 

 ##########
 File path: nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/PutCassandraRecord.java
 ##########
 @@ -67,6 +113,36 @@
             .required(true)
             .build();
 
+    static final PropertyDescriptor STATEMENT_TYPE = new PropertyDescriptor.Builder()
+            .name("put-cassandra-record-statement-type")
+            .displayName("Statement Type")
+            .description("Specifies the type of CQL Statement to generate.")
+            .required(true)
+            .defaultValue(INSERT_TYPE.getValue())
+            .allowableValues(UPDATE_TYPE, INSERT_TYPE, STATEMENT_TYPE_USE_ATTR_TYPE)
+            .build();
+
+    static final PropertyDescriptor UPDATE_METHOD = new PropertyDescriptor.Builder()
+            .name("put-cassandra-record-update-method")
+            .displayName("Update Method")
+            .description("Specifies the method to use to SET the values. This property is used if the Statement Type is " +
+                    "UPDATE and ignored otherwise.")
+            .required(false)
+            .defaultValue(SET_TYPE.getValue())
+            .allowableValues(INCR_TYPE, DECR_TYPE, SET_TYPE, UPDATE_METHOD_USE_ATTR_TYPE)
+            .build();
+
+    static final PropertyDescriptor UPDATE_KEYS = new PropertyDescriptor.Builder()
+            .name("put-cassandra-record-update-keys")
+            .displayName("Update Keys")
+            .description("A comma-separated list of column names that uniquely identifies a row in the database for UPDATE statements. "
+                    + "If the Statement Type is UPDATE and this property is not set, the conversion to CQL will fail. "
+                    + "This property is ignored if the Statement Type is not UPDATE.")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
 
 Review comment:
   I've found some people who agree with me that it's a bug:
   https://issues.apache.org/jira/browse/NIFI-7055

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi] tpalfy commented on a change in pull request #3977: NIFI-7007 Add update functionality to the PutCassandraRecord processor.

Posted by GitBox <gi...@apache.org>.
tpalfy commented on a change in pull request #3977: NIFI-7007 Add update functionality to the PutCassandraRecord processor.
URL: https://github.com/apache/nifi/pull/3977#discussion_r375170038
 
 

 ##########
 File path: nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/PutCassandraRecord.java
 ##########
 @@ -362,6 +381,38 @@ private Long convertFieldObjectToLong(String name, Object value) {
         return ((Number) value).longValue();
     }
 
+    @Override
+    protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
+        Set<ValidationResult> results = (Set<ValidationResult>) super.customValidate(validationContext);
+
+        String statementType = validationContext.getProperty(STATEMENT_TYPE).getValue();
+
+        if (UPDATE_TYPE.getValue().equalsIgnoreCase(statementType)) {
+            // Check that update keys and update method are set
+            String updateKeys = validationContext.getProperty(UPDATE_KEYS).getValue();
+            String updateMethod = validationContext.getProperty(UPDATE_METHOD).getValue();
+            if (StringUtils.isEmpty(updateKeys)) {
+                results.add(new ValidationResult.Builder().subject("Update statement configuration").valid(false).explanation(
+                        "if the Statement Type is set to Update, then the Update Keys must be specified as well").build());
+            }
+            if (StringUtils.isEmpty(updateMethod)) {
+                results.add(new ValidationResult.Builder().subject("Update statement configuration").valid(false).explanation(
+                        "if the Statement Type is set to Update, then the Update Method must be specified as well").build());
+            }
+
+            // Check that if the update method is set to increment or decrement that the batch statement type is set to
+            // unlogged or counter (or USE_ATTR_TYPE, which we cannot check at this point).
+            String batchStatementType = validationContext.getProperty(BATCH_STATEMENT_TYPE).getValue();
+            if (!Set.of(COUNTER_TYPE.getValue(), UNLOGGED_TYPE.getValue(), BATCH_STATEMENT_TYPE_USE_ATTR_TYPE.getValue()).contains(batchStatementType)) {
 
 Review comment:
   `Set.of` is a Java 9 feature.
   Could be replaced with `com.google.common.collect.Sets.newHashSet`.
   
   Also, for me `import java.util.Collection;` was missing from this file.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi] mattyb149 commented on issue #3977: NIFI-7007 Add update functionality to the PutCassandraRecord processor.

Posted by GitBox <gi...@apache.org>.
mattyb149 commented on issue #3977: NIFI-7007 Add update functionality to the PutCassandraRecord processor.
URL: https://github.com/apache/nifi/pull/3977#issuecomment-590528150
 
 
   +1 LGTM, ran contrib-check (after fixing Checkstyle issue) and tests on a live NiFi using various settings, attributes, etc.  Thanks for sticking with this improvement! Merging to master

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi] woutifier-t commented on a change in pull request #3977: NIFI-7007 Add update functionality to the PutCassandraRecord processor.

Posted by GitBox <gi...@apache.org>.
woutifier-t commented on a change in pull request #3977: NIFI-7007 Add update functionality to the PutCassandraRecord processor.
URL: https://github.com/apache/nifi/pull/3977#discussion_r368851237
 
 

 ##########
 File path: nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/PutCassandraRecord.java
 ##########
 @@ -129,8 +205,39 @@ public void onTrigger(ProcessContext context, ProcessSession session) throws Pro
         final String cassandraTable = context.getProperty(TABLE).evaluateAttributeExpressions(inputFlowFile).getValue();
         final RecordReaderFactory recordParserFactory = context.getProperty(RECORD_READER_FACTORY).asControllerService(RecordReaderFactory.class);
         final int batchSize = context.getProperty(BATCH_SIZE).evaluateAttributeExpressions().asInteger();
-        final String batchStatementType = context.getProperty(BATCH_STATEMENT_TYPE).getValue();
         final String serialConsistencyLevel = context.getProperty(CONSISTENCY_LEVEL).getValue();
+        final String updateKeys = context.getProperty(UPDATE_KEYS).evaluateAttributeExpressions(inputFlowFile).getValue();
 
 Review comment:
   I think you are referring to the update keys/update method being present when the statement type is update. However, I don't think that would work with setting the statement type using an attribute (cql.statement.type). In that case we would still need the current validation logic and I'm a bit hesitant to have it duplicated.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi] woutifier-t commented on a change in pull request #3977: NIFI-7007 Add update functionality to the PutCassandraRecord processor.

Posted by GitBox <gi...@apache.org>.
woutifier-t commented on a change in pull request #3977: NIFI-7007 Add update functionality to the PutCassandraRecord processor.
URL: https://github.com/apache/nifi/pull/3977#discussion_r375886439
 
 

 ##########
 File path: nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/PutCassandraRecord.java
 ##########
 @@ -255,14 +257,9 @@ public void onTrigger(ProcessContext context, ProcessSession session) throws Pro
                 throw new IllegalArgumentException(format("Statement Type is not specified, FlowFile %s", inputFlowFile));
             }
 
-            // throw an exception if the statement type is set to update and updateMethod or updateKeys is empty
-            if (UPDATE_TYPE.getValue().equalsIgnoreCase(statementType)) {
-                if (StringUtils.isEmpty(updateMethod)) {
-                    throw new IllegalArgumentException(format("Update Method is not specified, FlowFile %s", inputFlowFile));
-                }
-                if (StringUtils.isEmpty(updateKeys)) {
+            // throw an exception if the statement type is set to update and updateKeys is empty
+            if (UPDATE_TYPE.getValue().equalsIgnoreCase(statementType) && StringUtils.isEmpty(updateKeys)) {
 
 Review comment:
   I've added a test for this, but this is already checked on lines 355 to 367 (exception thrown on 366).

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi] woutifier-t commented on a change in pull request #3977: NIFI-7007 Add update functionality to the PutCassandraRecord processor.

Posted by GitBox <gi...@apache.org>.
woutifier-t commented on a change in pull request #3977: NIFI-7007 Add update functionality to the PutCassandraRecord processor.
URL: https://github.com/apache/nifi/pull/3977#discussion_r370017696
 
 

 ##########
 File path: nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/PutCassandraRecordUpdateTest.java
 ##########
 @@ -0,0 +1,277 @@
+package org.apache.nifi.processors.cassandra;
+
+import com.datastax.driver.core.Statement;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.util.Tuple;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.when;
+
+public class PutCassandraRecordUpdateTest {
+    private PutCassandraRecord testSubject;
+
+    @Mock
+    private RecordSchema schema;
+
+    @Before
+    public void setUp() {
+        MockitoAnnotations.initMocks(this);
+
+        testSubject = new PutCassandraRecord();
+    }
+
+    @Test
+    public void testGenerateUpdateWithEmptyKeyList() {
+        Stream.of("", ",", ",,,").forEach(updateKeys -> testGenerateUpdate(
+                "keyspace.table",
+                updateKeys,
+                PutCassandraRecord.SET_TYPE.getValue(),
+                Arrays.asList(
+                        new Tuple<>("keyField", 1),
+                        new Tuple<>("stringField", "newStringValue")
+                ),
+                new IllegalArgumentException("No Update Keys were specified")
+        ));
+    }
+
+    @Test
+    public void testGenerateUpdateWithMissingKey() {
+        testGenerateUpdate(
+                "keyspace.table",
+                "keyField,missingKeyField",
+                PutCassandraRecord.SET_TYPE.getValue(),
+                Arrays.asList(
+                        new Tuple<>("keyField", 1),
+                        new Tuple<>("stringField", "newStringValue")
+                ),
+                new IllegalArgumentException("Update key 'missingKeyField' is not present in the record schema")
+        );
+    }
+
+    @Test
+    public void testGenerateUpdateWithInvalidUpdateMethod() {
+        testGenerateUpdate(
+                "keyspace.table",
+                "keyField",
+                "invalidUpdateMethod",
+                Arrays.asList(
+                        new Tuple<>("keyField", 1),
+                        new Tuple<>("longField", 15L)
+                ),
+                new IllegalArgumentException("Update Method 'invalidUpdateMethod' is not valid.")
+        );
+    }
+
+    @Test
+    public void testGenerateUpdateIncrementString() {
+        testGenerateUpdate(
+                "keyspace.table",
+                "keyField",
+                PutCassandraRecord.INCR_TYPE.getValue(),
+                Arrays.asList(
+                        new Tuple<>("keyField", 1),
+                        new Tuple<>("stringField", "15")
+                ),
+                new IllegalArgumentException("Field 'stringField' is not of type Number")
+        );
+    }
+
+    @Test
+    public void testGenerateUpdateSimpleTableName() {
+        testGenerateUpdate(
+                "table",
+                "keyField1",
+                PutCassandraRecord.SET_TYPE.getValue(),
+                Arrays.asList(
+                        new Tuple<>("keyField1", 1),
+                        new Tuple<>("stringField", "newStringValue")
+                ),
+                "UPDATE table SET stringField='newStringValue' WHERE keyField1=1;"
+        );
+    }
+
+    @Test
+    public void testGenerateUpdateKeyspacedTableName() {
+        testGenerateUpdate(
+                "keyspace.table",
+                "keyField1",
+                PutCassandraRecord.SET_TYPE.getValue(),
+                Arrays.asList(
+                        new Tuple<>("keyField1", 1),
+                        new Tuple<>("stringField", "newStringValue")
+                ),
+                "UPDATE keyspace.table SET stringField='newStringValue' WHERE keyField1=1;"
+        );
+    }
+
+    @Test
+    public void testGenerateUpdateMultipleKeys() {
+        testGenerateUpdate(
+                "keyspace.table",
+                "keyField1,keyField2,keyField3",
+                PutCassandraRecord.SET_TYPE.getValue(),
+                Arrays.asList(
+                        new Tuple<>("keyField1", 1),
+                        new Tuple<>("keyField2", "key2"),
+                        new Tuple<>("keyField3", 123L),
+                        new Tuple<>("stringField", "newStringValue")
+                ),
+                "UPDATE keyspace.table SET stringField='newStringValue' WHERE keyField1=1 AND keyField2='key2' AND keyField3=123;"
+        );
+    }
+
+    @Test
+    public void testGenerateUpdateIncrementLong() {
+        testGenerateUpdate(
+                "keyspace.table",
+                "keyField",
+                PutCassandraRecord.INCR_TYPE.getValue(),
+                Arrays.asList(
+                        new Tuple<>("keyField", 1),
+                        new Tuple<>("longField", 15L)
+                ),
+                "UPDATE keyspace.table SET longField=longField+15 WHERE keyField=1;"
+        );
+    }
+
+    @Test
+    public void testGenerateUpdateDecrementLong() {
+        testGenerateUpdate(
+                "keyspace.table",
+                "keyField",
+                PutCassandraRecord.DECR_TYPE.getValue(),
+                Arrays.asList(
+                        new Tuple<>("keyField", 1),
+                        new Tuple<>("longField", 15L)
+                ),
+                "UPDATE keyspace.table SET longField=longField-15 WHERE keyField=1;"
+        );
+    }
+
+    @Test
+    public void testGenerateUpdateIncrementInteger() {
+        testGenerateUpdate(
+                "keyspace.table",
+                "keyField",
+                PutCassandraRecord.INCR_TYPE.getValue(),
+                Arrays.asList(
+                        new Tuple<>("keyField", 1),
+                        new Tuple<>("integerField", 15)
+                ),
+                "UPDATE keyspace.table SET integerField=integerField+15 WHERE keyField=1;"
+        );
+    }
+
+    @Test
+    public void testGenerateUpdateIncrementFloat() {
+        testGenerateUpdate(
+                "keyspace.table",
+                "keyField",
+                PutCassandraRecord.INCR_TYPE.getValue(),
+                Arrays.asList(
+                        new Tuple<>("keyField", 1),
+                        new Tuple<>("floatField", 15.05F)
+                ),
+                "UPDATE keyspace.table SET floatField=floatField+15 WHERE keyField=1;"
+        );
+    }
+
+    @Test
+    public void testGenerateUpdateIncrementDouble() {
+        testGenerateUpdate(
+                "keyspace.table",
+                "keyField",
+                PutCassandraRecord.INCR_TYPE.getValue(),
+                Arrays.asList(
+                        new Tuple<>("keyField", 1),
+                        new Tuple<>("doubleField", 15.05D)
+                ),
+                "UPDATE keyspace.table SET doubleField=doubleField+15 WHERE keyField=1;"
+        );
+    }
+
+    @Test
+    public void testGenerateUpdateSetMultipleValues() {
+        testGenerateUpdate(
+                "keyspace.table",
+                "keyField",
+                PutCassandraRecord.SET_TYPE.getValue(),
+                Arrays.asList(
+                        new Tuple<>("keyField", 1),
+                        new Tuple<>("stringField", "newStringValue"),
+                        new Tuple<>("integerField", 15),
+                        new Tuple<>("longField", 67L)
+                ),
+                "UPDATE keyspace.table SET stringField='newStringValue',integerField=15,longField=67 WHERE keyField=1;"
+        );
+    }
+
+    @Test
+    public void testGenerateUpdateIncrementMultipleValues() {
+        testGenerateUpdate(
+                "keyspace.table",
+                "keyField",
+                PutCassandraRecord.INCR_TYPE.getValue(),
+                Arrays.asList(
+                        new Tuple<>("keyField", 1),
+                        new Tuple<>("integerField", 15),
+                        new Tuple<>("longField", 67L)
+                ),
+                "UPDATE keyspace.table SET integerField=integerField+15,longField=longField+67 WHERE keyField=1;"
+        );
+    }
+
+    @Test
+    public void testGenerateUpdateDecrementMultipleValues() {
+        testGenerateUpdate(
+                "keyspace.table",
+                "keyField",
+                PutCassandraRecord.DECR_TYPE.getValue(),
+                Arrays.asList(
+                        new Tuple<>("keyField", 1),
+                        new Tuple<>("integerField", 15),
+                        new Tuple<>("longField", 67L)
+                ),
+                "UPDATE keyspace.table SET integerField=integerField-15,longField=longField-67 WHERE keyField=1;"
+        );
+    }
+
+    private void testGenerateUpdate(String table, String updateKeys, String updateMethod, List<Tuple<String, Object>> records, String expected) {
+        Map<String, Object> recordContentMap = records.stream()
+                .collect(Collectors.toMap(Tuple::getKey, Tuple::getValue));
+
+        List<String> fieldNames = records.stream().map(Tuple::getKey).collect(Collectors.toList());
+
+        when(schema.getFieldNames()).thenReturn(fieldNames);
+        Statement actual = testSubject.generateUpdate(table, schema, updateKeys, updateMethod, recordContentMap);
+
+        assertEquals(expected, actual.toString());
+    }
+
+    private <E extends Exception> void testGenerateUpdate(String table, String updateKeys, String updateMethod, List<Tuple<String, Object>> records, E expected) {
 
 Review comment:
   I think that moves more into the territory of determining what is valid CQL.
   
   Actually when I was adding the code I removed the table parameter, and then re-added it, mostly for aesthetic reasons. In the future we may want to add more tests with different table names for whatever reason, and this way the usage of the testGenerateUpdate look the same regardless of expecting an exception or a CQL string as the result.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi] mattyb149 commented on a change in pull request #3977: NIFI-7007 Add update functionality to the PutCassandraRecord processor.

Posted by GitBox <gi...@apache.org>.
mattyb149 commented on a change in pull request #3977: NIFI-7007 Add update functionality to the PutCassandraRecord processor.
URL: https://github.com/apache/nifi/pull/3977#discussion_r366569391
 
 

 ##########
 File path: nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/PutCassandraRecord.java
 ##########
 @@ -67,6 +75,35 @@
             .required(true)
             .build();
 
+    static final PropertyDescriptor STATEMENT_TYPE = new PropertyDescriptor.Builder()
+            .name("put-cassandra-record-statement-type")
+            .displayName("Statement Type")
+            .description("Specifies the type of CQL Statement to generate.")
+            .required(true)
+            .defaultValue(INSERT_TYPE)
+            .allowableValues(UPDATE_TYPE, INSERT_TYPE)
+            .build();
+
+    static final PropertyDescriptor UPDATE_METHOD = new PropertyDescriptor.Builder()
+            .name("put-cassandra-record-update-method")
+            .displayName("Update Method")
+            .description("Specifies the method to use to SET the values.")
 
 Review comment:
   Do you see other Statement Types being included someday, such as UPSERT, DELETE, etc.? I ask because I wonder if we need a separate property for Update Method or if we can include 3 Statement Types (Update Set, Update Increment, Update Decrement) and only use one property. Then if we add UPSERT, DELETE, etc. we don't have extra properties that only pertain to one of the Statement Types.
   
   Either way is fine with me, just trying to design for the future so we don't run into compatibility or awkward configuration issues (too many properties, e.g.)

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi] tpalfy commented on issue #3977: NIFI-7007 Add update functionality to the PutCassandraRecord processor.

Posted by GitBox <gi...@apache.org>.
tpalfy commented on issue #3977: NIFI-7007 Add update functionality to the PutCassandraRecord processor.
URL: https://github.com/apache/nifi/pull/3977#issuecomment-579860538
 
 
   > COUNTER is just another batch statement type, which I've already added. Just make sure that you select COUNTER in the Batch Statement Type (can also be supplied via cql.batch.statement.type if required).
   
   Ah okay. In fact it seems to work with COUNTER or UNLOGGED. Hmm, this caught me off-guard. It would be great if a validator could have caught it. I know that values can come from an attribute, but maybe if the value is one of the predefined value, validation could be done.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi] mattyb149 commented on a change in pull request #3977: NIFI-7007 Add update functionality to the PutCassandraRecord processor.

Posted by GitBox <gi...@apache.org>.
mattyb149 commented on a change in pull request #3977: NIFI-7007 Add update functionality to the PutCassandraRecord processor.
URL: https://github.com/apache/nifi/pull/3977#discussion_r383381823
 
 

 ##########
 File path: nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/PutCassandraRecord.java
 ##########
 @@ -394,11 +394,17 @@ private Long convertFieldObjectToLong(String name, Object value) {
 
             // Check that if the update method is set to increment or decrement that the batch statement type is set to
             // unlogged or counter (or USE_ATTR_TYPE, which we cannot check at this point).
+            String updateMethod = validationContext.getProperty(UPDATE_METHOD).getValue();
             String batchStatementType = validationContext.getProperty(BATCH_STATEMENT_TYPE).getValue();
-            if (!Sets.newHashSet(COUNTER_TYPE.getValue(), UNLOGGED_TYPE.getValue(), BATCH_STATEMENT_TYPE_USE_ATTR_TYPE.getValue()).contains(batchStatementType)) {
 
 Review comment:
   Removing this causes a CheckStyle error for the unused import for Sets. I can remove that while merging, but if I end up leaving another comment asking for changes please change this too and run Maven with `-Pcontrib-check` for the cassandra bundle to make sure the build will succeed, thanks!

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi] woutifier-t commented on issue #3977: NIFI-7007 Add update functionality to the PutCassandraRecord processor.

Posted by GitBox <gi...@apache.org>.
woutifier-t commented on issue #3977: NIFI-7007 Add update functionality to the PutCassandraRecord processor.
URL: https://github.com/apache/nifi/pull/3977#issuecomment-581299841
 
 
   > LGTM +1.
   > Tested with running Cassandra.
   > (Extra validation would be welcome but I think it's good to go.)
   
   cool :-). Extra validation I think is for another PR, as it potentially involves a bunch of extra code and tests.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi] tpalfy commented on a change in pull request #3977: NIFI-7007 Add update functionality to the PutCassandraRecord processor.

Posted by GitBox <gi...@apache.org>.
tpalfy commented on a change in pull request #3977: NIFI-7007 Add update functionality to the PutCassandraRecord processor.
URL: https://github.com/apache/nifi/pull/3977#discussion_r368658975
 
 

 ##########
 File path: nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/PutCassandraRecord.java
 ##########
 @@ -193,6 +298,74 @@ public void onTrigger(ProcessContext context, ProcessSession session) throws Pro
 
     }
 
+    private Statement generateUpdate(String cassandraTable, RecordSchema schema, String updateKeys, String updateMethod, Map<String, Object> recordContentMap) {
+        Update updateQuery;
+
+        // Split up the update key names separated by a comma, we need at least 1 key.
+        final Set<String> updateKeyNames;
+        updateKeyNames = Arrays.stream(updateKeys.split(","))
+                .map(String::trim)
+                .filter(StringUtils::isNotEmpty)
+                .collect(Collectors.toSet());
+        if (updateKeyNames.isEmpty()) {
+            throw new IllegalArgumentException("No Update Keys were specified");
+        }
+
+        // Prepare keyspace/table names
+        if (cassandraTable.contains(".")) {
+            String[] keyspaceAndTable = cassandraTable.split("\\.");
+            updateQuery = QueryBuilder.update(keyspaceAndTable[0], keyspaceAndTable[1]);
+        } else {
+            updateQuery = QueryBuilder.update(cassandraTable);
+        }
+
+        // Loop through the field names, setting those that are not in the update key set, and using those
+        // in the update key set as conditions.
+        for (String fieldName : schema.getFieldNames()) {
 
 Review comment:
   Maybe it would be useful to check if the schema has all the `updateKeyNames`?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi] tpalfy commented on a change in pull request #3977: NIFI-7007 Add update functionality to the PutCassandraRecord processor.

Posted by GitBox <gi...@apache.org>.
tpalfy commented on a change in pull request #3977: NIFI-7007 Add update functionality to the PutCassandraRecord processor.
URL: https://github.com/apache/nifi/pull/3977#discussion_r368969710
 
 

 ##########
 File path: nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/PutCassandraRecord.java
 ##########
 @@ -193,6 +303,81 @@ public void onTrigger(ProcessContext context, ProcessSession session) throws Pro
 
     }
 
+    private Statement generateUpdate(String cassandraTable, RecordSchema schema, String updateKeys, String updateMethod, Map<String, Object> recordContentMap) {
+        Update updateQuery;
+
+        // Split up the update key names separated by a comma, should not be empty
+        final Set<String> updateKeyNames;
+        updateKeyNames = Arrays.stream(updateKeys.split(","))
+                .map(String::trim)
+                .filter(StringUtils::isNotEmpty)
+                .collect(Collectors.toSet());
+        if (updateKeyNames.isEmpty()) {
+            throw new IllegalArgumentException("No Update Keys were specified");
+        }
+
+        // Verify if all update keys are present in the record
+        for (String updateKey : updateKeyNames) {
+            if (!schema.getFieldNames().contains(updateKey)) {
+                throw new IllegalArgumentException("Update key '" + updateKey + "' is not present in the record schema");
+            }
+        }
+
+        // Prepare keyspace/table names
+        if (cassandraTable.contains(".")) {
+            String[] keyspaceAndTable = cassandraTable.split("\\.");
+            updateQuery = QueryBuilder.update(keyspaceAndTable[0], keyspaceAndTable[1]);
+        } else {
+            updateQuery = QueryBuilder.update(cassandraTable);
+        }
+
+        // Loop through the field names, setting those that are not in the update key set, and using those
+        // in the update key set as conditions.
+        for (String fieldName : schema.getFieldNames()) {
+            Object fieldValue = recordContentMap.get(fieldName);
+
+            if (updateKeyNames.contains(fieldName)) {
+                updateQuery.where(QueryBuilder.eq(fieldName, fieldValue));
+            } else {
+                Assignment assignment;
+                if (SET_TYPE.getValue().equalsIgnoreCase(updateMethod)) {
+                    assignment = QueryBuilder.set(fieldName, fieldValue);
+                } else {
+                    // Check if the fieldValue is of type long, as this is the only type that is can be used,
+                    // to increment or decrement.
+                    if (!(fieldValue instanceof Long)) {
+                        throw new IllegalArgumentException("Field '" + fieldName + "' is not of type Long, and cannot be used" +
+                                " to increment or decrement.");
+                    }
+
+                    if (INCR_TYPE.getValue().equalsIgnoreCase(updateMethod)) {
+                        assignment = QueryBuilder.incr(fieldName, (Long)fieldValue);
+                    } else if (DECR_TYPE.getValue().equalsIgnoreCase(updateMethod)) {
+                        assignment = QueryBuilder.decr(fieldName, (Long)fieldValue);
+                    } else {
+                        throw new IllegalArgumentException("Update Method '" + updateMethod + "' is not valid.");
 
 Review comment:
   If the Update Method is invalid but the field is not of type Long, we get a "Field is not of type Long, and cannot be used to increment or decrement." error message (instead of the "Update Method is not valid.").

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi] woutifier-t commented on a change in pull request #3977: NIFI-7007 Add update functionality to the PutCassandraRecord processor.

Posted by GitBox <gi...@apache.org>.
woutifier-t commented on a change in pull request #3977: NIFI-7007 Add update functionality to the PutCassandraRecord processor.
URL: https://github.com/apache/nifi/pull/3977#discussion_r369021524
 
 

 ##########
 File path: nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/PutCassandraRecord.java
 ##########
 @@ -193,6 +303,81 @@ public void onTrigger(ProcessContext context, ProcessSession session) throws Pro
 
     }
 
+    private Statement generateUpdate(String cassandraTable, RecordSchema schema, String updateKeys, String updateMethod, Map<String, Object> recordContentMap) {
+        Update updateQuery;
+
+        // Split up the update key names separated by a comma, should not be empty
+        final Set<String> updateKeyNames;
+        updateKeyNames = Arrays.stream(updateKeys.split(","))
+                .map(String::trim)
+                .filter(StringUtils::isNotEmpty)
+                .collect(Collectors.toSet());
+        if (updateKeyNames.isEmpty()) {
+            throw new IllegalArgumentException("No Update Keys were specified");
+        }
+
+        // Verify if all update keys are present in the record
+        for (String updateKey : updateKeyNames) {
+            if (!schema.getFieldNames().contains(updateKey)) {
+                throw new IllegalArgumentException("Update key '" + updateKey + "' is not present in the record schema");
+            }
+        }
+
+        // Prepare keyspace/table names
+        if (cassandraTable.contains(".")) {
+            String[] keyspaceAndTable = cassandraTable.split("\\.");
+            updateQuery = QueryBuilder.update(keyspaceAndTable[0], keyspaceAndTable[1]);
+        } else {
+            updateQuery = QueryBuilder.update(cassandraTable);
+        }
+
+        // Loop through the field names, setting those that are not in the update key set, and using those
+        // in the update key set as conditions.
+        for (String fieldName : schema.getFieldNames()) {
+            Object fieldValue = recordContentMap.get(fieldName);
+
+            if (updateKeyNames.contains(fieldName)) {
+                updateQuery.where(QueryBuilder.eq(fieldName, fieldValue));
+            } else {
+                Assignment assignment;
+                if (SET_TYPE.getValue().equalsIgnoreCase(updateMethod)) {
+                    assignment = QueryBuilder.set(fieldName, fieldValue);
+                } else {
+                    // Check if the fieldValue is of type long, as this is the only type that is can be used,
+                    // to increment or decrement.
+                    if (!(fieldValue instanceof Long)) {
 
 Review comment:
   The datastax cassandra driver implementation expects a long as a parameter to incr(). Do you have a recommendation on how to reliably cast an Object to a long/Long?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi] woutifier-t commented on a change in pull request #3977: NIFI-7007 Add update functionality to the PutCassandraRecord processor.

Posted by GitBox <gi...@apache.org>.
woutifier-t commented on a change in pull request #3977: NIFI-7007 Add update functionality to the PutCassandraRecord processor.
URL: https://github.com/apache/nifi/pull/3977#discussion_r383319786
 
 

 ##########
 File path: nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/PutCassandraRecord.java
 ##########
 @@ -193,6 +319,106 @@ public void onTrigger(ProcessContext context, ProcessSession session) throws Pro
 
     }
 
+    protected Statement generateUpdate(String cassandraTable, RecordSchema schema, String updateKeys, String updateMethod, Map<String, Object> recordContentMap) {
+        Update updateQuery;
+
+        // Split up the update key names separated by a comma, should not be empty
+        final Set<String> updateKeyNames;
+        updateKeyNames = Arrays.stream(updateKeys.split(","))
+                .map(String::trim)
+                .filter(StringUtils::isNotEmpty)
+                .collect(Collectors.toSet());
+        if (updateKeyNames.isEmpty()) {
+            throw new IllegalArgumentException("No Update Keys were specified");
+        }
+
+        // Verify if all update keys are present in the record
+        for (String updateKey : updateKeyNames) {
+            if (!schema.getFieldNames().contains(updateKey)) {
+                throw new IllegalArgumentException("Update key '" + updateKey + "' is not present in the record schema");
+            }
+        }
+
+        // Prepare keyspace/table names
+        if (cassandraTable.contains(".")) {
+            String[] keyspaceAndTable = cassandraTable.split("\\.");
+            updateQuery = QueryBuilder.update(keyspaceAndTable[0], keyspaceAndTable[1]);
+        } else {
+            updateQuery = QueryBuilder.update(cassandraTable);
+        }
+
+        // Loop through the field names, setting those that are not in the update key set, and using those
+        // in the update key set as conditions.
+        for (String fieldName : schema.getFieldNames()) {
+            Object fieldValue = recordContentMap.get(fieldName);
+
+            if (updateKeyNames.contains(fieldName)) {
+                updateQuery.where(QueryBuilder.eq(fieldName, fieldValue));
+            } else {
+                Assignment assignment;
+                if (SET_TYPE.getValue().equalsIgnoreCase(updateMethod)) {
+                    assignment = QueryBuilder.set(fieldName, fieldValue);
+                } else if (INCR_TYPE.getValue().equalsIgnoreCase(updateMethod)) {
+                    assignment = QueryBuilder.incr(fieldName, convertFieldObjectToLong(fieldName, fieldValue));
+                } else if (DECR_TYPE.getValue().equalsIgnoreCase(updateMethod)) {
+                    assignment = QueryBuilder.decr(fieldName, convertFieldObjectToLong(fieldName, fieldValue));
+                } else {
+                    throw new IllegalArgumentException("Update Method '" + updateMethod + "' is not valid.");
+                }
+                updateQuery.with(assignment);
+            }
+        }
+        return updateQuery;
+    }
+
+    private Long convertFieldObjectToLong(String name, Object value) {
+        if (!(value instanceof Number)) {
+            throw new IllegalArgumentException("Field '" + name + "' is not of type Number");
+        }
+        return ((Number) value).longValue();
+    }
+
+    @Override
+    protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
+        Set<ValidationResult> results = (Set<ValidationResult>) super.customValidate(validationContext);
+
+        String statementType = validationContext.getProperty(STATEMENT_TYPE).getValue();
+
+        if (UPDATE_TYPE.getValue().equalsIgnoreCase(statementType)) {
+            // Check that update keys are set
+            String updateKeys = validationContext.getProperty(UPDATE_KEYS).getValue();
+            if (StringUtils.isEmpty(updateKeys)) {
+                results.add(new ValidationResult.Builder().subject("Update statement configuration").valid(false).explanation(
+                        "if the Statement Type is set to Update, then the Update Keys must be specified as well").build());
+            }
+
+            // Check that if the update method is set to increment or decrement that the batch statement type is set to
+            // unlogged or counter (or USE_ATTR_TYPE, which we cannot check at this point).
+            String batchStatementType = validationContext.getProperty(BATCH_STATEMENT_TYPE).getValue();
+            if (!Sets.newHashSet(COUNTER_TYPE.getValue(), UNLOGGED_TYPE.getValue(), BATCH_STATEMENT_TYPE_USE_ATTR_TYPE.getValue()).contains(batchStatementType)) {
 
 Review comment:
   Right, I wrote it in the comments but didn't write the actual check. Good catch :-).

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi] tpalfy commented on a change in pull request #3977: NIFI-7007 Add update functionality to the PutCassandraRecord processor.

Posted by GitBox <gi...@apache.org>.
tpalfy commented on a change in pull request #3977: NIFI-7007 Add update functionality to the PutCassandraRecord processor.
URL: https://github.com/apache/nifi/pull/3977#discussion_r368659695
 
 

 ##########
 File path: nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/PutCassandraRecord.java
 ##########
 @@ -129,8 +205,39 @@ public void onTrigger(ProcessContext context, ProcessSession session) throws Pro
         final String cassandraTable = context.getProperty(TABLE).evaluateAttributeExpressions(inputFlowFile).getValue();
         final RecordReaderFactory recordParserFactory = context.getProperty(RECORD_READER_FACTORY).asControllerService(RecordReaderFactory.class);
         final int batchSize = context.getProperty(BATCH_SIZE).evaluateAttributeExpressions().asInteger();
-        final String batchStatementType = context.getProperty(BATCH_STATEMENT_TYPE).getValue();
         final String serialConsistencyLevel = context.getProperty(CONSISTENCY_LEVEL).getValue();
+        final String updateKeys = context.getProperty(UPDATE_KEYS).evaluateAttributeExpressions(inputFlowFile).getValue();
 
 Review comment:
   Checking correct property values could be done in a `customValidate` method.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi] tpalfy commented on a change in pull request #3977: NIFI-7007 Add update functionality to the PutCassandraRecord processor.

Posted by GitBox <gi...@apache.org>.
tpalfy commented on a change in pull request #3977: NIFI-7007 Add update functionality to the PutCassandraRecord processor.
URL: https://github.com/apache/nifi/pull/3977#discussion_r368657405
 
 

 ##########
 File path: nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/PutCassandraRecord.java
 ##########
 @@ -67,6 +113,36 @@
             .required(true)
             .build();
 
+    static final PropertyDescriptor STATEMENT_TYPE = new PropertyDescriptor.Builder()
+            .name("put-cassandra-record-statement-type")
+            .displayName("Statement Type")
+            .description("Specifies the type of CQL Statement to generate.")
+            .required(true)
+            .defaultValue(INSERT_TYPE.getValue())
+            .allowableValues(UPDATE_TYPE, INSERT_TYPE, STATEMENT_TYPE_USE_ATTR_TYPE)
+            .build();
+
+    static final PropertyDescriptor UPDATE_METHOD = new PropertyDescriptor.Builder()
+            .name("put-cassandra-record-update-method")
+            .displayName("Update Method")
+            .description("Specifies the method to use to SET the values. This property is used if the Statement Type is " +
+                    "UPDATE and ignored otherwise.")
+            .required(false)
+            .defaultValue(SET_TYPE.getValue())
+            .allowableValues(INCR_TYPE, DECR_TYPE, SET_TYPE, UPDATE_METHOD_USE_ATTR_TYPE)
+            .build();
+
+    static final PropertyDescriptor UPDATE_KEYS = new PropertyDescriptor.Builder()
+            .name("put-cassandra-record-update-keys")
+            .displayName("Update Keys")
+            .description("A comma-separated list of column names that uniquely identifies a row in the database for UPDATE statements. "
+                    + "If the Statement Type is UPDATE and this property is not set, the conversion to CQL will fail. "
+                    + "This property is ignored if the Statement Type is not UPDATE.")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
 
 Review comment:
   Can't we use something like`StandardValidators.createListValidator(true, false, StandardValidators.NON_EMPTY_VALIDATOR)` here?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi] mattyb149 commented on a change in pull request #3977: NIFI-7007 Add update functionality to the PutCassandraRecord processor.

Posted by GitBox <gi...@apache.org>.
mattyb149 commented on a change in pull request #3977: NIFI-7007 Add update functionality to the PutCassandraRecord processor.
URL: https://github.com/apache/nifi/pull/3977#discussion_r366565442
 
 

 ##########
 File path: nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/PutCassandraRecord.java
 ##########
 @@ -67,6 +75,35 @@
             .required(true)
             .build();
 
+    static final PropertyDescriptor STATEMENT_TYPE = new PropertyDescriptor.Builder()
+            .name("put-cassandra-record-statement-type")
+            .displayName("Statement Type")
+            .description("Specifies the type of CQL Statement to generate.")
+            .required(true)
+            .defaultValue(INSERT_TYPE)
+            .allowableValues(UPDATE_TYPE, INSERT_TYPE)
+            .build();
+
+    static final PropertyDescriptor UPDATE_METHOD = new PropertyDescriptor.Builder()
+            .name("put-cassandra-record-update-method")
+            .displayName("Update Method")
+            .description("Specifies the method to use to SET the values.")
+            .required(false)
+            .defaultValue(SET_TYPE)
+            .allowableValues(INCR_TYPE, DECR_TYPE, SET_TYPE)
+            .build();
+
+    static final PropertyDescriptor UPDATE_KEYS = new PropertyDescriptor.Builder()
+            .name("put-cassandra-record-update-keys")
+            .displayName("Update Keys")
+            .description("A comma-separated list of column names that uniquely identifies a row in the database for UPDATE statements. "
+                    + "If the Statement Type is UPDATE and this property is not set, the conversion to CQL will fail. "
+                    + "This property is ignored if the Statement Type is INSERT.")
 
 Review comment:
   Nitpick, but in case more verbs are added later, I think the doc should read "This property is ignored if the Statement Type is anything other than UPDATE"

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi] woutifier-t commented on a change in pull request #3977: NIFI-7007 Add update functionality to the PutCassandraRecord processor.

Posted by GitBox <gi...@apache.org>.
woutifier-t commented on a change in pull request #3977: NIFI-7007 Add update functionality to the PutCassandraRecord processor.
URL: https://github.com/apache/nifi/pull/3977#discussion_r366836539
 
 

 ##########
 File path: nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/PutCassandraRecord.java
 ##########
 @@ -67,6 +75,35 @@
             .required(true)
             .build();
 
+    static final PropertyDescriptor STATEMENT_TYPE = new PropertyDescriptor.Builder()
+            .name("put-cassandra-record-statement-type")
+            .displayName("Statement Type")
+            .description("Specifies the type of CQL Statement to generate.")
+            .required(true)
+            .defaultValue(INSERT_TYPE)
+            .allowableValues(UPDATE_TYPE, INSERT_TYPE)
+            .build();
+
+    static final PropertyDescriptor UPDATE_METHOD = new PropertyDescriptor.Builder()
+            .name("put-cassandra-record-update-method")
+            .displayName("Update Method")
+            .description("Specifies the method to use to SET the values.")
 
 Review comment:
   Hey @mattyb149 I don't see many more statement types, as Cassandra doesn't really have a notion of upsert (an insert and update are already mostly the same, aside from COUNTER tables where insert doesn't work). The only thing I see in the future is a DELETE statement, for which the update methods would also not be applicable. On the other hand, this feels cleaner than having "Update Set, Update Increment,..". I'm fine either way so if fewer properties is preferred then I can get rid of the update method.
   
   Depending on your answer on the above, I'll add the cql.update.method attribute and/or cql.statement.type attribute as well. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [nifi] tpalfy commented on a change in pull request #3977: NIFI-7007 Add update functionality to the PutCassandraRecord processor.

Posted by GitBox <gi...@apache.org>.
tpalfy commented on a change in pull request #3977: NIFI-7007 Add update functionality to the PutCassandraRecord processor.
URL: https://github.com/apache/nifi/pull/3977#discussion_r375908127
 
 

 ##########
 File path: nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/PutCassandraRecord.java
 ##########
 @@ -255,14 +257,9 @@ public void onTrigger(ProcessContext context, ProcessSession session) throws Pro
                 throw new IllegalArgumentException(format("Statement Type is not specified, FlowFile %s", inputFlowFile));
             }
 
-            // throw an exception if the statement type is set to update and updateMethod or updateKeys is empty
-            if (UPDATE_TYPE.getValue().equalsIgnoreCase(statementType)) {
-                if (StringUtils.isEmpty(updateMethod)) {
-                    throw new IllegalArgumentException(format("Update Method is not specified, FlowFile %s", inputFlowFile));
-                }
-                if (StringUtils.isEmpty(updateKeys)) {
+            // throw an exception if the statement type is set to update and updateKeys is empty
+            if (UPDATE_TYPE.getValue().equalsIgnoreCase(statementType) && StringUtils.isEmpty(updateKeys)) {
 
 Review comment:
   Okay, that seems to cover it.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services