You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by GitBox <gi...@apache.org> on 2021/02/16 07:57:47 UTC

[GitHub] [nifi] timeabarna opened a new pull request #4824: NIFI-8176 adding ackChecked flag to QuerySplunkIndexingStatus.onTrigg…

timeabarna opened a new pull request #4824:
URL: https://github.com/apache/nifi/pull/4824


   …er() method ensuring at least one acknoledgement check is happening with Splunk
   
   https://issues.apache.org/jira/browse/NIFI-8176#
   
   #### Description of PR
   
   adding flag to ensure at least one acknowledgment check with Splunk
   
   In order to streamline the review of the contribution we ask you
   to ensure the following steps have been taken:
   
   ### For all changes:
   - [ ] Is there a JIRA ticket associated with this PR? Is it referenced 
        in the commit message?
   
   - [ ] Does your PR title start with **NIFI-XXXX** where XXXX is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character.
   
   - [ ] Has your PR been rebased against the latest commit within the target branch (typically `main`)?
   
   - [ ] Is your initial contribution a single, squashed commit? _Additional commits in response to PR reviewer feedback should be made on this branch and pushed to allow change tracking. Do not `squash` or use `--force` when pushing to allow for clean monitoring of changes._
   
   ### For code changes:
   - [ ] Have you ensured that the full suite of tests is executed via `mvn -Pcontrib-check clean install` at the root `nifi` folder?
   - [ ] Have you written or updated unit tests to verify your changes?
   - [ ] Have you verified that the full build is successful on JDK 8?
   - [ ] Have you verified that the full build is successful on JDK 11?
   - [ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)? 
   - [ ] If applicable, have you updated the `LICENSE` file, including the main `LICENSE` file under `nifi-assembly`?
   - [ ] If applicable, have you updated the `NOTICE` file, including the main `NOTICE` file found under `nifi-assembly`?
   - [ ] If adding new Properties, have you added `.displayName` in addition to .name (programmatic access) for each of the new properties?
   
   ### For documentation related changes:
   - [ ] Have you ensured that format looks appropriate for the output in which it is rendered?
   
   ### Note:
   Please ensure that once the PR is submitted, you check GitHub Actions CI for build issues and submit an update to your PR as soon as possible.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] simonbence commented on pull request #4824: NIFI-8176 adding ackChecked flag to QuerySplunkIndexingStatus.onTrigg…

Posted by GitBox <gi...@apache.org>.
simonbence commented on pull request #4824:
URL: https://github.com/apache/nifi/pull/4824#issuecomment-779688897


   One more thing: please extend the additionalDetails file for the processor with the new behaviour


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] simonbence commented on a change in pull request #4824: NIFI-8176 adding ackChecked flag to QuerySplunkIndexingStatus.onTrigg…

Posted by GitBox <gi...@apache.org>.
simonbence commented on a change in pull request #4824:
URL: https://github.com/apache/nifi/pull/4824#discussion_r578386400



##########
File path: nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/resources/docs/org.apache.nifi.processors.splunk.QuerySplunkIndexingStatus/additionalDetails.html
##########
@@ -46,10 +46,21 @@ <h3>Unacknowledged and undetermined cases</h3>
     includes unsuccessful or ongoing indexing and unknown acknowledgement identifiers. In order to avoid infinite tries,
     QuerySplunkIndexingStatus gives user the possibility to set a "Maximum waiting time". Results with value of false from Splunk
     within the specified waiting time will be handled as "undetermined" and are transferred to the "undetermined" relationship.
-    Flow files outside of this time range will be transferred to the "unacknowledged" relationship next time the processor is
-    triggered. In order to determine if the indexing of a given event is within the waiting time, the Unix Epoch of the original
-    Splunk response is stored in the attribute "splunk.responded.at". Setting "Maximum waiting time" too low might
-    result some false negative result as in case under higher load, Splunk server might index slower than it is expected.
+</p>
+
+<p>
+    Acknowledgment status of flow files being in the "undetermined" relationship are checked with Splunk and regardless of the result

Review comment:
       Flow files at this point are not transferred to the undetermined relationship (or any other). In a nutshell, when they are transferred into any relationship, the leave the scope of the processor. I would suggest to phrase like: every flow file, after the first acknowledgement poll query towards the Splunk will be extended with the attribute "ack.checked.at.splunk" considering the Splunk server responded. The attribute serves as a flag for later processing to determine if the acknowledgement was checked at least once.

##########
File path: nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/resources/docs/org.apache.nifi.processors.splunk.QuerySplunkIndexingStatus/additionalDetails.html
##########
@@ -46,10 +46,21 @@ <h3>Unacknowledged and undetermined cases</h3>
     includes unsuccessful or ongoing indexing and unknown acknowledgement identifiers. In order to avoid infinite tries,
     QuerySplunkIndexingStatus gives user the possibility to set a "Maximum waiting time". Results with value of false from Splunk
     within the specified waiting time will be handled as "undetermined" and are transferred to the "undetermined" relationship.
-    Flow files outside of this time range will be transferred to the "unacknowledged" relationship next time the processor is
-    triggered. In order to determine if the indexing of a given event is within the waiting time, the Unix Epoch of the original
-    Splunk response is stored in the attribute "splunk.responded.at". Setting "Maximum waiting time" too low might
-    result some false negative result as in case under higher load, Splunk server might index slower than it is expected.
+</p>
+
+<p>
+    Acknowledgment status of flow files being in the "undetermined" relationship are checked with Splunk and regardless of the result
+    an "ack.checked.at.splunk" attribute with true value will be put on them ensuring at least one acknowledgment check is happening
+    even outside of "Maximum waiting time".
+    Flow files having both their "ack.checked.at.splunk" attribute equals to true and being outside of the time range will be

Review comment:
       I would approach this from the other side: flow files are timed out but not having the "ack.checked.at.splunk" are prevented from being transferred to the "unacknowledged" relationship but instead will be handled as "undetermined"

##########
File path: nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/QuerySplunkIndexingStatus.java
##########
@@ -54,6 +52,9 @@
 @ReadsAttributes({
         @ReadsAttribute(attribute = "splunk.acknowledgement.id", description = "The indexing acknowledgement id provided by Splunk."),
         @ReadsAttribute(attribute = "splunk.responded.at", description = "The time of the response of put request for Splunk.")})
+@WritesAttributes({
+        @WritesAttribute(attribute = "ack.checked.at.splunk", description = "Identifying whether Splunk acknowledgement check has happened.")

Review comment:
       Please extend the description with information about when the attribute is expected to be set (it is not set when not already checked, etc.)

##########
File path: nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/resources/docs/org.apache.nifi.processors.splunk.QuerySplunkIndexingStatus/additionalDetails.html
##########
@@ -72,5 +83,195 @@ <h3>Performance</h3>
     batch but the processor might execute the query with less number of undetermined events.
 </p>
 
+<h2>Example</h2>
+
+The following table represents four flow files being batch processed by the processor. For simplicity reasons "splunk.responded.at" time
+will be provided in HH:MM:SS format instead of Epoch time. The "Maximum waiting time" (referred as TTL from now on) is set for 3 minutes
+and the processor is scheduled to run every minute. The files are penalized for 30 seconds. "Undetermined" relationship is loop backed to the processor.
+
+<table>
+    <tbody>
+    <tr>
+        <th>Flow file id</th>
+        <th>Splunk acknowledgement id</th>
+        <th>Splunk responded at</th>
+        <th>Ack checked at Splunk</th>
+        <th>Relationship</th>
+    </tr>
+    <tr>
+        <td>flow file 1</td>
+        <td>null</td>
+        <td>10:46:02</td>
+        <td>No value set</td>
+    </tr>
+    <tr>
+        <td>flow file 2</td>
+        <td>0</td>
+        <td>10:46:18</td>
+        <td>No value set</td>
+    </tr>
+    <tr>
+        <td>flow file 3</td>
+        <td>1</td>
+        <td>10:47:04</td>
+        <td>No value set</td>
+    </tr>
+    <tr>
+        <td>flow file 4</td>
+        <td>2</td>
+        <td>10:47:23</td>
+        <td>No value set</td>
+    </tr>
+    </tbody>
+</table>
+
+<h4>First iteration</h4>
+
+The current time is 10:48:05 so all of our files are within TTL. Flow file 1 does not have a correct "splunk.acknowledgment.id"
+so will be directed to the "Failure" relationship. The remaining files will be sent for Splunk ack check.
+
+With the following Splunk response "acks:{[0:true, 1:false, 2:false]}" flow file 2 will be directed to "Success" relationship
+and the rest of the files to the "Undetermined" relationship and penalized for 30 seconds.
+
+<table>
+    <tbody>
+    <tr>
+        <th>Flow file id</th>
+        <th>Splunk acknowledgement id</th>
+        <th>Splunk responded at</th>
+        <th>Ack checked at Splunk</th>
+        <th>Relationship</th>
+    </tr>
+    <tr>
+        <td>flow file 1</td>
+        <td>null</td>
+        <td>10:46:02</td>
+        <td>No value set</td>
+        <td>Failure</td>
+    </tr>
+    <tr>
+        <td>flow file 2</td>
+        <td>0</td>
+        <td>10:46:18</td>
+        <td>true</td>
+        <td>Success</td>
+
+    </tr>
+    <tr>
+        <td>flow file 3</td>
+        <td>1</td>
+        <td>10:47:04</td>
+        <td>true</td>
+        <td>Undetermined</td>
+    </tr>
+    <tr>
+        <td>flow file 4</td>
+        <td>2</td>
+        <td>10:47:23</td>
+        <td>true</td>
+        <td>Undetermined</td>
+    </tr>
+    </tbody>
+</table>
+
+<h4>Second iteration</h4>

Review comment:
       Just for keeping the focus, you can remove the FFs from the later tables which are already determined

##########
File path: nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/resources/docs/org.apache.nifi.processors.splunk.QuerySplunkIndexingStatus/additionalDetails.html
##########
@@ -72,5 +83,195 @@ <h3>Performance</h3>
     batch but the processor might execute the query with less number of undetermined events.
 </p>
 
+<h2>Example</h2>
+
+The following table represents four flow files being batch processed by the processor. For simplicity reasons "splunk.responded.at" time
+will be provided in HH:MM:SS format instead of Epoch time. The "Maximum waiting time" (referred as TTL from now on) is set for 3 minutes
+and the processor is scheduled to run every minute. The files are penalized for 30 seconds. "Undetermined" relationship is loop backed to the processor.
+
+<table>
+    <tbody>
+    <tr>
+        <th>Flow file id</th>
+        <th>Splunk acknowledgement id</th>
+        <th>Splunk responded at</th>
+        <th>Ack checked at Splunk</th>
+        <th>Relationship</th>
+    </tr>
+    <tr>
+        <td>flow file 1</td>
+        <td>null</td>
+        <td>10:46:02</td>
+        <td>No value set</td>
+    </tr>
+    <tr>
+        <td>flow file 2</td>
+        <td>0</td>
+        <td>10:46:18</td>
+        <td>No value set</td>
+    </tr>
+    <tr>
+        <td>flow file 3</td>
+        <td>1</td>
+        <td>10:47:04</td>
+        <td>No value set</td>
+    </tr>
+    <tr>
+        <td>flow file 4</td>
+        <td>2</td>
+        <td>10:47:23</td>
+        <td>No value set</td>
+    </tr>
+    </tbody>
+</table>
+
+<h4>First iteration</h4>
+
+The current time is 10:48:05 so all of our files are within TTL. Flow file 1 does not have a correct "splunk.acknowledgment.id"
+so will be directed to the "Failure" relationship. The remaining files will be sent for Splunk ack check.
+
+With the following Splunk response "acks:{[0:true, 1:false, 2:false]}" flow file 2 will be directed to "Success" relationship
+and the rest of the files to the "Undetermined" relationship and penalized for 30 seconds.
+
+<table>
+    <tbody>
+    <tr>
+        <th>Flow file id</th>
+        <th>Splunk acknowledgement id</th>
+        <th>Splunk responded at</th>
+        <th>Ack checked at Splunk</th>
+        <th>Relationship</th>
+    </tr>
+    <tr>
+        <td>flow file 1</td>
+        <td>null</td>
+        <td>10:46:02</td>
+        <td>No value set</td>
+        <td>Failure</td>
+    </tr>
+    <tr>
+        <td>flow file 2</td>
+        <td>0</td>
+        <td>10:46:18</td>
+        <td>true</td>
+        <td>Success</td>
+
+    </tr>
+    <tr>
+        <td>flow file 3</td>
+        <td>1</td>
+        <td>10:47:04</td>
+        <td>true</td>
+        <td>Undetermined</td>
+    </tr>
+    <tr>
+        <td>flow file 4</td>
+        <td>2</td>
+        <td>10:47:23</td>
+        <td>true</td>
+        <td>Undetermined</td>
+    </tr>
+    </tbody>
+</table>
+
+<h4>Second iteration</h4>
+
+The current time is 10:49:18 so both flow file 3 and flow file 4 are within TTL and they will be sent for Splunk ack check.
+
+With the following Splunk response "acks:{[1:false, 2:true]}" flow file 4 will be directed to "Success" relationship and
+flow file 3 to the "Undetermined" relationship and penalized for 30 seconds.
+In the meantime flow file 1 missing "Splunk acknowledgment id" has been corrected.

Review comment:
       I think, this is something we can skip: we do not provide any mechanism to correct this attribute so this situation is too hypothetical.

##########
File path: nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/resources/docs/org.apache.nifi.processors.splunk.QuerySplunkIndexingStatus/additionalDetails.html
##########
@@ -72,5 +83,195 @@ <h3>Performance</h3>
     batch but the processor might execute the query with less number of undetermined events.
 </p>
 
+<h2>Example</h2>
+
+The following table represents four flow files being batch processed by the processor. For simplicity reasons "splunk.responded.at" time
+will be provided in HH:MM:SS format instead of Epoch time. The "Maximum waiting time" (referred as TTL from now on) is set for 3 minutes
+and the processor is scheduled to run every minute. The files are penalized for 30 seconds. "Undetermined" relationship is loop backed to the processor.
+
+<table>
+    <tbody>
+    <tr>
+        <th>Flow file id</th>
+        <th>Splunk acknowledgement id</th>
+        <th>Splunk responded at</th>
+        <th>Ack checked at Splunk</th>
+        <th>Relationship</th>
+    </tr>
+    <tr>
+        <td>flow file 1</td>
+        <td>null</td>
+        <td>10:46:02</td>
+        <td>No value set</td>
+    </tr>
+    <tr>
+        <td>flow file 2</td>
+        <td>0</td>
+        <td>10:46:18</td>
+        <td>No value set</td>
+    </tr>
+    <tr>
+        <td>flow file 3</td>
+        <td>1</td>
+        <td>10:47:04</td>
+        <td>No value set</td>
+    </tr>
+    <tr>
+        <td>flow file 4</td>
+        <td>2</td>
+        <td>10:47:23</td>
+        <td>No value set</td>
+    </tr>
+    </tbody>
+</table>
+
+<h4>First iteration</h4>
+
+The current time is 10:48:05 so all of our files are within TTL. Flow file 1 does not have a correct "splunk.acknowledgment.id"
+so will be directed to the "Failure" relationship. The remaining files will be sent for Splunk ack check.
+
+With the following Splunk response "acks:{[0:true, 1:false, 2:false]}" flow file 2 will be directed to "Success" relationship
+and the rest of the files to the "Undetermined" relationship and penalized for 30 seconds.
+
+<table>
+    <tbody>
+    <tr>
+        <th>Flow file id</th>
+        <th>Splunk acknowledgement id</th>
+        <th>Splunk responded at</th>
+        <th>Ack checked at Splunk</th>
+        <th>Relationship</th>
+    </tr>
+    <tr>
+        <td>flow file 1</td>
+        <td>null</td>
+        <td>10:46:02</td>
+        <td>No value set</td>
+        <td>Failure</td>
+    </tr>
+    <tr>
+        <td>flow file 2</td>
+        <td>0</td>
+        <td>10:46:18</td>
+        <td>true</td>
+        <td>Success</td>
+
+    </tr>
+    <tr>
+        <td>flow file 3</td>
+        <td>1</td>
+        <td>10:47:04</td>
+        <td>true</td>
+        <td>Undetermined</td>
+    </tr>
+    <tr>
+        <td>flow file 4</td>
+        <td>2</td>
+        <td>10:47:23</td>
+        <td>true</td>
+        <td>Undetermined</td>
+    </tr>
+    </tbody>
+</table>
+
+<h4>Second iteration</h4>
+
+The current time is 10:49:18 so both flow file 3 and flow file 4 are within TTL and they will be sent for Splunk ack check.

Review comment:
       As your example is pretty exhaustive, it might worth mentioning that during the timeframe of penalise, the processor might be triggered. In the case no further flow files arrived, it will abruptly finishes it's activation, in case there are new FFs are arrived, those will be processed (in this case: before the penalised ones)

##########
File path: nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/resources/docs/org.apache.nifi.processors.splunk.QuerySplunkIndexingStatus/additionalDetails.html
##########
@@ -46,10 +46,21 @@ <h3>Unacknowledged and undetermined cases</h3>
     includes unsuccessful or ongoing indexing and unknown acknowledgement identifiers. In order to avoid infinite tries,
     QuerySplunkIndexingStatus gives user the possibility to set a "Maximum waiting time". Results with value of false from Splunk
     within the specified waiting time will be handled as "undetermined" and are transferred to the "undetermined" relationship.
-    Flow files outside of this time range will be transferred to the "unacknowledged" relationship next time the processor is
-    triggered. In order to determine if the indexing of a given event is within the waiting time, the Unix Epoch of the original
-    Splunk response is stored in the attribute "splunk.responded.at". Setting "Maximum waiting time" too low might
-    result some false negative result as in case under higher load, Splunk server might index slower than it is expected.
+</p>
+
+<p>
+    Acknowledgment status of flow files being in the "undetermined" relationship are checked with Splunk and regardless of the result
+    an "ack.checked.at.splunk" attribute with true value will be put on them ensuring at least one acknowledgment check is happening
+    even outside of "Maximum waiting time".
+    Flow files having both their "ack.checked.at.splunk" attribute equals to true and being outside of the time range will be
+    transferred to the "unacknowledged" relationship next time the processor is triggered.
+</p>
+
+<p>
+    In order to determine if the indexing of
+    a given event is within the waiting time, the Unix Epoch of the original Splunk response is stored in the attribute "splunk.responded.at".

Review comment:
       I would go with: the arrival time of the original Splunk response is stored in the attribute "splunk.responded.at" in Unix Epoch format.

##########
File path: nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/resources/docs/org.apache.nifi.processors.splunk.QuerySplunkIndexingStatus/additionalDetails.html
##########
@@ -72,5 +83,195 @@ <h3>Performance</h3>
     batch but the processor might execute the query with less number of undetermined events.
 </p>
 
+<h2>Example</h2>

Review comment:
       Great idea to add an example! 👍 

##########
File path: nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/resources/docs/org.apache.nifi.processors.splunk.QuerySplunkIndexingStatus/additionalDetails.html
##########
@@ -72,5 +83,195 @@ <h3>Performance</h3>
     batch but the processor might execute the query with less number of undetermined events.
 </p>
 
+<h2>Example</h2>
+
+The following table represents four flow files being batch processed by the processor. For simplicity reasons "splunk.responded.at" time
+will be provided in HH:MM:SS format instead of Epoch time. The "Maximum waiting time" (referred as TTL from now on) is set for 3 minutes

Review comment:
       I think "will be presented" would communicate the intent better

##########
File path: nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/resources/docs/org.apache.nifi.processors.splunk.QuerySplunkIndexingStatus/additionalDetails.html
##########
@@ -72,5 +83,195 @@ <h3>Performance</h3>
     batch but the processor might execute the query with less number of undetermined events.
 </p>
 
+<h2>Example</h2>
+
+The following table represents four flow files being batch processed by the processor. For simplicity reasons "splunk.responded.at" time
+will be provided in HH:MM:SS format instead of Epoch time. The "Maximum waiting time" (referred as TTL from now on) is set for 3 minutes
+and the processor is scheduled to run every minute. The files are penalized for 30 seconds. "Undetermined" relationship is loop backed to the processor.
+
+<table>
+    <tbody>
+    <tr>
+        <th>Flow file id</th>
+        <th>Splunk acknowledgement id</th>
+        <th>Splunk responded at</th>
+        <th>Ack checked at Splunk</th>
+        <th>Relationship</th>
+    </tr>
+    <tr>
+        <td>flow file 1</td>
+        <td>null</td>
+        <td>10:46:02</td>
+        <td>No value set</td>
+    </tr>
+    <tr>
+        <td>flow file 2</td>
+        <td>0</td>
+        <td>10:46:18</td>
+        <td>No value set</td>
+    </tr>
+    <tr>
+        <td>flow file 3</td>
+        <td>1</td>
+        <td>10:47:04</td>
+        <td>No value set</td>
+    </tr>
+    <tr>
+        <td>flow file 4</td>
+        <td>2</td>
+        <td>10:47:23</td>
+        <td>No value set</td>
+    </tr>
+    </tbody>
+</table>
+
+<h4>First iteration</h4>
+
+The current time is 10:48:05 so all of our files are within TTL. Flow file 1 does not have a correct "splunk.acknowledgment.id"

Review comment:
       I think it is a bit challenging  to read flow file 1, within a paragraph. I would suggest either highlight it with some way (italian, bold) or instead refer into the flow files as #1, #2 and so.
   
   A minor clarification: there is no validation on the value of "splunk.acknowledgment.id", thus, setting it to null is considered as "no attribute is set". Just to be sure, I made a quick test, and when the ackId is extracted, having a null will result an Optional#empty just like when it is not set at all. As a result I would suggest to rephrase it to not set.

##########
File path: nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/resources/docs/org.apache.nifi.processors.splunk.QuerySplunkIndexingStatus/additionalDetails.html
##########
@@ -72,5 +83,195 @@ <h3>Performance</h3>
     batch but the processor might execute the query with less number of undetermined events.
 </p>
 
+<h2>Example</h2>
+
+The following table represents four flow files being batch processed by the processor. For simplicity reasons "splunk.responded.at" time
+will be provided in HH:MM:SS format instead of Epoch time. The "Maximum waiting time" (referred as TTL from now on) is set for 3 minutes
+and the processor is scheduled to run every minute. The files are penalized for 30 seconds. "Undetermined" relationship is loop backed to the processor.

Review comment:
       Minor: looped back sounds more natural




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] timeabarna commented on pull request #4824: NIFI-8176 adding ackChecked flag to QuerySplunkIndexingStatus.onTrigg…

Posted by GitBox <gi...@apache.org>.
timeabarna commented on pull request #4824:
URL: https://github.com/apache/nifi/pull/4824#issuecomment-815477807


   @tpalfy Thanks Tamas, PR has been updated with the Property changes


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] timeabarna commented on a change in pull request #4824: NIFI-8176 adding ackChecked flag to QuerySplunkIndexingStatus.onTrigg…

Posted by GitBox <gi...@apache.org>.
timeabarna commented on a change in pull request #4824:
URL: https://github.com/apache/nifi/pull/4824#discussion_r576800712



##########
File path: nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/QuerySplunkIndexingStatus.java
##########
@@ -194,7 +197,11 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
                 final EventIndexStatusResponse splunkResponse = unmarshallResult(responseMessage.getContent(), EventIndexStatusResponse.class);
 
                 splunkResponse.getAcks().entrySet().forEach(result -> {
-                    final FlowFile toTransfer = undetermined.get(result.getKey());
+                    FlowFile toTransfer = undetermined.get(result.getKey());
+                    if (!extractBoolean(toTransfer.getAttribute(ACKCHECKED_ATTRIBUTE)).orElse(Boolean.FALSE)) {
+                        toTransfer = setAckCheckedToTrue(session, toTransfer);

Review comment:
       Thanks, extracted it as proposed




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] timeabarna commented on pull request #4824: NIFI-8176 adding ackChecked flag to QuerySplunkIndexingStatus.onTrigg…

Posted by GitBox <gi...@apache.org>.
timeabarna commented on pull request #4824:
URL: https://github.com/apache/nifi/pull/4824#issuecomment-780411119


   @simonbence Documentation has been updated.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] tpalfy edited a comment on pull request #4824: NIFI-8176 adding ackChecked flag to QuerySplunkIndexingStatus.onTrigg…

Posted by GitBox <gi...@apache.org>.
tpalfy edited a comment on pull request #4824:
URL: https://github.com/apache/nifi/pull/4824#issuecomment-816765079


   LGTM+1.
   
   Thanks for the contribution.
   Merged into main.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] timeabarna commented on a change in pull request #4824: NIFI-8176 adding ackChecked flag to QuerySplunkIndexingStatus.onTrigg…

Posted by GitBox <gi...@apache.org>.
timeabarna commented on a change in pull request #4824:
URL: https://github.com/apache/nifi/pull/4824#discussion_r576800889



##########
File path: nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/QuerySplunkIndexingStatus.java
##########
@@ -194,7 +197,11 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
                 final EventIndexStatusResponse splunkResponse = unmarshallResult(responseMessage.getContent(), EventIndexStatusResponse.class);
 
                 splunkResponse.getAcks().entrySet().forEach(result -> {
-                    final FlowFile toTransfer = undetermined.get(result.getKey());
+                    FlowFile toTransfer = undetermined.get(result.getKey());
+                    if (!extractBoolean(toTransfer.getAttribute(ACKCHECKED_ATTRIBUTE)).orElse(Boolean.FALSE)) {
+                        toTransfer = setAckCheckedToTrue(session, toTransfer);
+                        session.getProvenanceReporter().modifyAttributes(toTransfer, "ackChecked attribute has been modified to true.");

Review comment:
       removed  event logging




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] tpalfy commented on pull request #4824: NIFI-8176 adding ackChecked flag to QuerySplunkIndexingStatus.onTrigg…

Posted by GitBox <gi...@apache.org>.
tpalfy commented on pull request #4824:
URL: https://github.com/apache/nifi/pull/4824#issuecomment-816765079


   LGTM+1.
   
   Thanks for the contribution.
   Merged into master.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] tpalfy commented on a change in pull request #4824: NIFI-8176 adding ackChecked flag to QuerySplunkIndexingStatus.onTrigg…

Posted by GitBox <gi...@apache.org>.
tpalfy commented on a change in pull request #4824:
URL: https://github.com/apache/nifi/pull/4824#discussion_r585637387



##########
File path: nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/QuerySplunkIndexingStatus.java
##########
@@ -234,4 +244,18 @@ private String generateContent(final Map<Long, FlowFile> undetermined) throws IO
             return Optional.empty();
         }
     }
+
+    private static boolean isAlreadyChecked(FlowFile flowFile) {
+        return extractBoolean(flowFile.getAttribute(ACK_CHECKED_ATTRIBUTE));
+    }
+
+    private static boolean extractBoolean(final String value) {
+        return Boolean.parseBoolean(value);
+    }
+
+    private void setAckCheckedToTrue(final ProcessSession session, final FlowFile flowFile) {
+        final Map<String, String> attributes = new HashMap<>(flowFile.getAttributes());
+        attributes.put(ACK_CHECKED_ATTRIBUTE, String.valueOf(Boolean.TRUE));
+        session.putAllAttributes(flowFile, attributes);

Review comment:
       ```suggestion
           return session.putAttribute(flowFile, ACK_CHECKED_ATTRIBUTE, Boolean.toString(true));
   ```
   `putAttribute` (and `putAllAttribute`) are _not_ void methods.
   Shouldn't we work with the returned `flowFile` in the later segments of the code?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] simonbence commented on a change in pull request #4824: NIFI-8176 adding ackChecked flag to QuerySplunkIndexingStatus.onTrigg…

Posted by GitBox <gi...@apache.org>.
simonbence commented on a change in pull request #4824:
URL: https://github.com/apache/nifi/pull/4824#discussion_r576630600



##########
File path: nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/QuerySplunkIndexingStatus.java
##########
@@ -57,6 +58,7 @@
 @SeeAlso(PutSplunkHTTP.class)
 public class QuerySplunkIndexingStatus extends SplunkAPICall {
     private static final String ENDPOINT = "/services/collector/ack";
+    private static final String ACKCHECKED_ATTRIBUTE = "ack.checked";

Review comment:
       I would prefix this with "splunk." or something similar. This is because later processors will have not context about what kind of acknowledgement is this

##########
File path: nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/QuerySplunkIndexingStatus.java
##########
@@ -194,7 +197,11 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
                 final EventIndexStatusResponse splunkResponse = unmarshallResult(responseMessage.getContent(), EventIndexStatusResponse.class);
 
                 splunkResponse.getAcks().entrySet().forEach(result -> {
-                    final FlowFile toTransfer = undetermined.get(result.getKey());
+                    FlowFile toTransfer = undetermined.get(result.getKey());

Review comment:
       I would go with an approach which keeps final in place. Like: introduce a helper method, which extracts and returns the toTransfer file from the undetermined map and enriches it with the attribute internally

##########
File path: nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/QuerySplunkIndexingStatus.java
##########
@@ -234,4 +241,14 @@ private String generateContent(final Map<Long, FlowFile> undetermined) throws IO
             return Optional.empty();
         }
     }
+
+    private static Optional<Boolean> extractBoolean(final String value) {
+        return Objects.nonNull(value) ? Optional.of(value).map(Boolean::valueOf) : Optional.empty();
+    }
+
+    private FlowFile setAckCheckedToTrue(final ProcessSession session, final FlowFile flowFile) {
+        final Map<String, String> attributes = new HashMap<>(flowFile.getAttributes());

Review comment:
       If you care about the HashMap size (which is a good thing :) ) add 1 to the flowFile.getAttributes() already, as you will extend the map. ;) 

##########
File path: nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/QuerySplunkIndexingStatus.java
##########
@@ -194,7 +197,11 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
                 final EventIndexStatusResponse splunkResponse = unmarshallResult(responseMessage.getContent(), EventIndexStatusResponse.class);
 
                 splunkResponse.getAcks().entrySet().forEach(result -> {
-                    final FlowFile toTransfer = undetermined.get(result.getKey());
+                    FlowFile toTransfer = undetermined.get(result.getKey());
+                    if (!extractBoolean(toTransfer.getAttribute(ACKCHECKED_ATTRIBUTE)).orElse(Boolean.FALSE)) {
+                        toTransfer = setAckCheckedToTrue(session, toTransfer);

Review comment:
       This extraction logic looks to be the same as above. It would be luckier to not copy it (for the case we want to change the behaviour). I would suggest some method which simply tells if a flow file has the flag set. The value is irrelevant now anyway. Something like: `private static boolean isAlreadyChecked(FlowFile ff)`. 

##########
File path: nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/QuerySplunkIndexingStatus.java
##########
@@ -194,7 +197,11 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
                 final EventIndexStatusResponse splunkResponse = unmarshallResult(responseMessage.getContent(), EventIndexStatusResponse.class);
 
                 splunkResponse.getAcks().entrySet().forEach(result -> {
-                    final FlowFile toTransfer = undetermined.get(result.getKey());
+                    FlowFile toTransfer = undetermined.get(result.getKey());
+                    if (!extractBoolean(toTransfer.getAttribute(ACKCHECKED_ATTRIBUTE)).orElse(Boolean.FALSE)) {
+                        toTransfer = setAckCheckedToTrue(session, toTransfer);
+                        session.getProvenanceReporter().modifyAttributes(toTransfer, "ackChecked attribute has been modified to true.");

Review comment:
       I am not sure we need to store this as a provenance event (however it might not an issue)

##########
File path: nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/test/java/org/apache/nifi/processors/splunk/TestQuerySplunkIndexingStatus.java
##########
@@ -121,16 +121,44 @@ public void testMoreIncomingFlowFileThanQueryLimit() throws Exception {
     }
 
     @Test
-    public void testTimedOutEvents() throws Exception {
+    public void testAckCheckedIsTrueAndFlowFileWithTimedOutEvents() throws Exception {
         // when
-        testRunner.enqueue(givenFlowFile(1, System.currentTimeMillis() - TimeUnit.HOURS.toMillis(2)));
+        testRunner.enqueue(givenFlowFile(1, System.currentTimeMillis() - TimeUnit.HOURS.toMillis(2), true));
         testRunner.run();
 
         // then
         Mockito.verify(service, Mockito.never()).send(Mockito.anyString(), Mockito.any(RequestMessage.class));
         testRunner.assertAllFlowFilesTransferred(QuerySplunkIndexingStatus.RELATIONSHIP_UNACKNOWLEDGED, 1);
     }
 
+    @Test
+    public void testAckCheckedIsFalseAndTimedOutEventFlowFileWithAcknowledgeResponse() throws Exception {
+        // when
+        final Map<Integer, Boolean> acks = new HashMap<>();

Review comment:
       Minor: you can make it more readable and shorter with Collections.singletonMap
   Also:  this should be part of the given block

##########
File path: nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/QuerySplunkIndexingStatus.java
##########
@@ -234,4 +241,14 @@ private String generateContent(final Map<Long, FlowFile> undetermined) throws IO
             return Optional.empty();
         }
     }
+
+    private static Optional<Boolean> extractBoolean(final String value) {
+        return Objects.nonNull(value) ? Optional.of(value).map(Boolean::valueOf) : Optional.empty();
+    }
+
+    private FlowFile setAckCheckedToTrue(final ProcessSession session, final FlowFile flowFile) {
+        final Map<String, String> attributes = new HashMap<>(flowFile.getAttributes());
+        attributes.put( ACKCHECKED_ATTRIBUTE, String.valueOf(Boolean.TRUE));

Review comment:
       Minor: unnecessary space after `put(`

##########
File path: nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/QuerySplunkIndexingStatus.java
##########
@@ -162,12 +164,13 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
         for (final FlowFile flowFile : flowFiles)  {
             final Optional<Long> sentAt = extractLong(flowFile.getAttribute(SplunkAPICall.RESPONDED_AT_ATTRIBUTE));
             final Optional<Long> ackId = extractLong(flowFile.getAttribute(SplunkAPICall.ACKNOWLEDGEMENT_ID_ATTRIBUTE));
+            final Optional<Boolean> ackChecked = extractBoolean(flowFile.getAttribute(ACKCHECKED_ATTRIBUTE));
 
             if (!sentAt.isPresent() || !ackId.isPresent()) {
                 getLogger().error("Flow file ({}) attributes {} and {} are expected to be set using 64-bit integer values!",
                         new Object[]{flowFile.getId(), SplunkAPICall.RESPONDED_AT_ATTRIBUTE, SplunkAPICall.ACKNOWLEDGEMENT_ID_ATTRIBUTE});
                 session.transfer(flowFile, RELATIONSHIP_FAILURE);
-            } else if (sentAt.get() + ttl < currentTime) {
+            } else if (ackChecked.orElse(Boolean.FALSE) && sentAt.get() + ttl < currentTime) {

Review comment:
       For readability, in the condition I would suggest to use something simpler. Maybe the simplest way would be to not use optional when you define ackChecked, but convert it into a boolean in the first place (in other words: consider non-existent attribute as false, not an optional empty)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] timeabarna commented on a change in pull request #4824: NIFI-8176 adding ackChecked flag to QuerySplunkIndexingStatus.onTrigg…

Posted by GitBox <gi...@apache.org>.
timeabarna commented on a change in pull request #4824:
URL: https://github.com/apache/nifi/pull/4824#discussion_r576800469



##########
File path: nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/QuerySplunkIndexingStatus.java
##########
@@ -194,7 +197,11 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
                 final EventIndexStatusResponse splunkResponse = unmarshallResult(responseMessage.getContent(), EventIndexStatusResponse.class);
 
                 splunkResponse.getAcks().entrySet().forEach(result -> {
-                    final FlowFile toTransfer = undetermined.get(result.getKey());
+                    FlowFile toTransfer = undetermined.get(result.getKey());

Review comment:
       Thanks, added back final




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] timeabarna commented on a change in pull request #4824: NIFI-8176 adding ackChecked flag to QuerySplunkIndexingStatus.onTrigg…

Posted by GitBox <gi...@apache.org>.
timeabarna commented on a change in pull request #4824:
URL: https://github.com/apache/nifi/pull/4824#discussion_r576801324



##########
File path: nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/test/java/org/apache/nifi/processors/splunk/TestQuerySplunkIndexingStatus.java
##########
@@ -121,16 +121,44 @@ public void testMoreIncomingFlowFileThanQueryLimit() throws Exception {
     }
 
     @Test
-    public void testTimedOutEvents() throws Exception {
+    public void testAckCheckedIsTrueAndFlowFileWithTimedOutEvents() throws Exception {
         // when
-        testRunner.enqueue(givenFlowFile(1, System.currentTimeMillis() - TimeUnit.HOURS.toMillis(2)));
+        testRunner.enqueue(givenFlowFile(1, System.currentTimeMillis() - TimeUnit.HOURS.toMillis(2), true));
         testRunner.run();
 
         // then
         Mockito.verify(service, Mockito.never()).send(Mockito.anyString(), Mockito.any(RequestMessage.class));
         testRunner.assertAllFlowFilesTransferred(QuerySplunkIndexingStatus.RELATIONSHIP_UNACKNOWLEDGED, 1);
     }
 
+    @Test
+    public void testAckCheckedIsFalseAndTimedOutEventFlowFileWithAcknowledgeResponse() throws Exception {
+        // when
+        final Map<Integer, Boolean> acks = new HashMap<>();

Review comment:
       added to singletonMap




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] timeabarna commented on a change in pull request #4824: NIFI-8176 adding ackChecked flag to QuerySplunkIndexingStatus.onTrigg…

Posted by GitBox <gi...@apache.org>.
timeabarna commented on a change in pull request #4824:
URL: https://github.com/apache/nifi/pull/4824#discussion_r576801154



##########
File path: nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/QuerySplunkIndexingStatus.java
##########
@@ -234,4 +241,14 @@ private String generateContent(final Map<Long, FlowFile> undetermined) throws IO
             return Optional.empty();
         }
     }
+
+    private static Optional<Boolean> extractBoolean(final String value) {
+        return Objects.nonNull(value) ? Optional.of(value).map(Boolean::valueOf) : Optional.empty();
+    }
+
+    private FlowFile setAckCheckedToTrue(final ProcessSession session, final FlowFile flowFile) {
+        final Map<String, String> attributes = new HashMap<>(flowFile.getAttributes());
+        attributes.put( ACKCHECKED_ATTRIBUTE, String.valueOf(Boolean.TRUE));

Review comment:
       space removed




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] timeabarna commented on a change in pull request #4824: NIFI-8176 adding ackChecked flag to QuerySplunkIndexingStatus.onTrigg…

Posted by GitBox <gi...@apache.org>.
timeabarna commented on a change in pull request #4824:
URL: https://github.com/apache/nifi/pull/4824#discussion_r576800085



##########
File path: nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/QuerySplunkIndexingStatus.java
##########
@@ -57,6 +58,7 @@
 @SeeAlso(PutSplunkHTTP.class)
 public class QuerySplunkIndexingStatus extends SplunkAPICall {
     private static final String ENDPOINT = "/services/collector/ack";
+    private static final String ACKCHECKED_ATTRIBUTE = "ack.checked";

Review comment:
       Thanks Bence, corrected in next commit




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] simonbence commented on pull request #4824: NIFI-8176 adding ackChecked flag to QuerySplunkIndexingStatus.onTrigg…

Posted by GitBox <gi...@apache.org>.
simonbence commented on pull request #4824:
URL: https://github.com/apache/nifi/pull/4824#issuecomment-784949769


   LGTM, thank you for the adjustments!


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] tpalfy commented on a change in pull request #4824: NIFI-8176 adding ackChecked flag to QuerySplunkIndexingStatus.onTrigg…

Posted by GitBox <gi...@apache.org>.
tpalfy commented on a change in pull request #4824:
URL: https://github.com/apache/nifi/pull/4824#discussion_r585783440



##########
File path: nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/QuerySplunkIndexingStatus.java
##########
@@ -54,9 +56,13 @@
 @ReadsAttributes({
         @ReadsAttribute(attribute = "splunk.acknowledgement.id", description = "The indexing acknowledgement id provided by Splunk."),
         @ReadsAttribute(attribute = "splunk.responded.at", description = "The time of the response of put request for Splunk.")})
+@WritesAttributes({
+        @WritesAttribute(attribute = "ack.checked.at.splunk", description = "Contains a boolean value representing whether Splunk acknowledgement check has happened. If not set considered as false.")
+})
 @SeeAlso(PutSplunkHTTP.class)
 public class QuerySplunkIndexingStatus extends SplunkAPICall {
     private static final String ENDPOINT = "/services/collector/ack";
+    private static final String ACK_CHECKED_ATTRIBUTE = "ack.checked.at.splunk";

Review comment:
       ```suggestion
   ```

##########
File path: nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/QuerySplunkIndexingStatus.java
##########
@@ -162,12 +168,13 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
         for (final FlowFile flowFile : flowFiles)  {
             final Optional<Long> sentAt = extractLong(flowFile.getAttribute(SplunkAPICall.RESPONDED_AT_ATTRIBUTE));
             final Optional<Long> ackId = extractLong(flowFile.getAttribute(SplunkAPICall.ACKNOWLEDGEMENT_ID_ATTRIBUTE));
+            final boolean ackChecked = isAlreadyChecked(flowFile);
 
             if (!sentAt.isPresent() || !ackId.isPresent()) {
                 getLogger().error("Flow file ({}) attributes {} and {} are expected to be set using 64-bit integer values!",
                         new Object[]{flowFile.getId(), SplunkAPICall.RESPONDED_AT_ATTRIBUTE, SplunkAPICall.ACKNOWLEDGEMENT_ID_ATTRIBUTE});
                 session.transfer(flowFile, RELATIONSHIP_FAILURE);
-            } else if (sentAt.get() + ttl < currentTime) {
+            } else if (ackChecked && sentAt.get() + ttl < currentTime) {
                 session.transfer(flowFile, RELATIONSHIP_UNACKNOWLEDGED);

Review comment:
       ```suggestion
   ```

##########
File path: nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/QuerySplunkIndexingStatus.java
##########
@@ -195,6 +202,9 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
 
                 splunkResponse.getAcks().entrySet().forEach(result -> {

Review comment:
       ```suggestion
                                   splunkResponse.getAcks().forEach((flowFileId, isAcknowledged) -> {
                       final FlowFile toTransfer = undetermined.get(flowFileId);
   
                       if (isAcknowledged) {
                           session.transfer(toTransfer, RELATIONSHIP_ACKNOWLEDGED);
                       } else {
                           final Long sentAt = extractLong(toTransfer.getAttribute(SplunkAPICall.RESPONDED_AT_ATTRIBUTE)).get();
                           if (sentAt + ttl < currentTime) {
                               session.transfer(toTransfer, RELATIONSHIP_UNACKNOWLEDGED);
                           } else {
                               session.penalize(toTransfer);
                               session.transfer(toTransfer, RELATIONSHIP_UNDETERMINED);
                           }
                       }
                   });
   ```

##########
File path: nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/QuerySplunkIndexingStatus.java
##########
@@ -234,4 +244,18 @@ private String generateContent(final Map<Long, FlowFile> undetermined) throws IO
             return Optional.empty();
         }
     }
+
+    private static boolean isAlreadyChecked(FlowFile flowFile) {
+        return extractBoolean(flowFile.getAttribute(ACK_CHECKED_ATTRIBUTE));
+    }
+
+    private static boolean extractBoolean(final String value) {
+        return Boolean.parseBoolean(value);
+    }
+
+    private void setAckCheckedToTrue(final ProcessSession session, final FlowFile flowFile) {
+        final Map<String, String> attributes = new HashMap<>(flowFile.getAttributes());
+        attributes.put(ACK_CHECKED_ATTRIBUTE, String.valueOf(Boolean.TRUE));
+        session.putAllAttributes(flowFile, attributes);
+    }

Review comment:
       ```suggestion
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] timeabarna commented on a change in pull request #4824: NIFI-8176 adding ackChecked flag to QuerySplunkIndexingStatus.onTrigg…

Posted by GitBox <gi...@apache.org>.
timeabarna commented on a change in pull request #4824:
URL: https://github.com/apache/nifi/pull/4824#discussion_r589361101



##########
File path: nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/QuerySplunkIndexingStatus.java
##########
@@ -54,9 +56,13 @@
 @ReadsAttributes({
         @ReadsAttribute(attribute = "splunk.acknowledgement.id", description = "The indexing acknowledgement id provided by Splunk."),
         @ReadsAttribute(attribute = "splunk.responded.at", description = "The time of the response of put request for Splunk.")})
+@WritesAttributes({
+        @WritesAttribute(attribute = "ack.checked.at.splunk", description = "Contains a boolean value representing whether Splunk acknowledgement check has happened. If not set considered as false.")
+})
 @SeeAlso(PutSplunkHTTP.class)
 public class QuerySplunkIndexingStatus extends SplunkAPICall {
     private static final String ENDPOINT = "/services/collector/ack";
+    private static final String ACK_CHECKED_ATTRIBUTE = "ack.checked.at.splunk";

Review comment:
       Thanks @tpalfy for your help, refactored based on your recommendation.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] timeabarna commented on a change in pull request #4824: NIFI-8176 adding ackChecked flag to QuerySplunkIndexingStatus.onTrigg…

Posted by GitBox <gi...@apache.org>.
timeabarna commented on a change in pull request #4824:
URL: https://github.com/apache/nifi/pull/4824#discussion_r576800326



##########
File path: nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/QuerySplunkIndexingStatus.java
##########
@@ -162,12 +164,13 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
         for (final FlowFile flowFile : flowFiles)  {
             final Optional<Long> sentAt = extractLong(flowFile.getAttribute(SplunkAPICall.RESPONDED_AT_ATTRIBUTE));
             final Optional<Long> ackId = extractLong(flowFile.getAttribute(SplunkAPICall.ACKNOWLEDGEMENT_ID_ATTRIBUTE));
+            final Optional<Boolean> ackChecked = extractBoolean(flowFile.getAttribute(ACKCHECKED_ATTRIBUTE));
 
             if (!sentAt.isPresent() || !ackId.isPresent()) {
                 getLogger().error("Flow file ({}) attributes {} and {} are expected to be set using 64-bit integer values!",
                         new Object[]{flowFile.getId(), SplunkAPICall.RESPONDED_AT_ATTRIBUTE, SplunkAPICall.ACKNOWLEDGEMENT_ID_ATTRIBUTE});
                 session.transfer(flowFile, RELATIONSHIP_FAILURE);
-            } else if (sentAt.get() + ttl < currentTime) {
+            } else if (ackChecked.orElse(Boolean.FALSE) && sentAt.get() + ttl < currentTime) {

Review comment:
       Thanks Bence, modified it to primitive in next commit




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] simonbence commented on a change in pull request #4824: NIFI-8176 adding ackChecked flag to QuerySplunkIndexingStatus.onTrigg…

Posted by GitBox <gi...@apache.org>.
simonbence commented on a change in pull request #4824:
URL: https://github.com/apache/nifi/pull/4824#discussion_r576641889



##########
File path: nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/QuerySplunkIndexingStatus.java
##########
@@ -234,4 +241,14 @@ private String generateContent(final Map<Long, FlowFile> undetermined) throws IO
             return Optional.empty();
         }
     }
+
+    private static Optional<Boolean> extractBoolean(final String value) {
+        return Objects.nonNull(value) ? Optional.of(value).map(Boolean::valueOf) : Optional.empty();
+    }
+
+    private FlowFile setAckCheckedToTrue(final ProcessSession session, final FlowFile flowFile) {
+        final Map<String, String> attributes = new HashMap<>(flowFile.getAttributes());

Review comment:
       If you care about the HashMap size (which is a good thing :) ) add 1 to the flowFile.getAttributes() already, as you will extend the map. ;) 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] asfgit closed pull request #4824: NIFI-8176 adding ackChecked flag to QuerySplunkIndexingStatus.onTrigg…

Posted by GitBox <gi...@apache.org>.
asfgit closed pull request #4824:
URL: https://github.com/apache/nifi/pull/4824


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] simonbence commented on a change in pull request #4824: NIFI-8176 adding ackChecked flag to QuerySplunkIndexingStatus.onTrigg…

Posted by GitBox <gi...@apache.org>.
simonbence commented on a change in pull request #4824:
URL: https://github.com/apache/nifi/pull/4824#discussion_r585660958



##########
File path: nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/QuerySplunkIndexingStatus.java
##########
@@ -234,4 +244,18 @@ private String generateContent(final Map<Long, FlowFile> undetermined) throws IO
             return Optional.empty();
         }
     }
+
+    private static boolean isAlreadyChecked(FlowFile flowFile) {
+        return extractBoolean(flowFile.getAttribute(ACK_CHECKED_ATTRIBUTE));
+    }
+
+    private static boolean extractBoolean(final String value) {
+        return Boolean.parseBoolean(value);
+    }
+
+    private void setAckCheckedToTrue(final ProcessSession session, final FlowFile flowFile) {
+        final Map<String, String> attributes = new HashMap<>(flowFile.getAttributes());
+        attributes.put(ACK_CHECKED_ATTRIBUTE, String.valueOf(Boolean.TRUE));
+        session.putAllAttributes(flowFile, attributes);

Review comment:
       You are correct, FFs are immutable by design.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org