You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by GitBox <gi...@apache.org> on 2021/02/16 08:49:05 UTC

[GitHub] [nifi] simonbence commented on a change in pull request #4824: NIFI-8176 adding ackChecked flag to QuerySplunkIndexingStatus.onTrigg…

simonbence commented on a change in pull request #4824:
URL: https://github.com/apache/nifi/pull/4824#discussion_r576630600



##########
File path: nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/QuerySplunkIndexingStatus.java
##########
@@ -57,6 +58,7 @@
 @SeeAlso(PutSplunkHTTP.class)
 public class QuerySplunkIndexingStatus extends SplunkAPICall {
     private static final String ENDPOINT = "/services/collector/ack";
+    private static final String ACKCHECKED_ATTRIBUTE = "ack.checked";

Review comment:
       I would prefix this with "splunk." or something similar. This is because later processors will have not context about what kind of acknowledgement is this

##########
File path: nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/QuerySplunkIndexingStatus.java
##########
@@ -194,7 +197,11 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
                 final EventIndexStatusResponse splunkResponse = unmarshallResult(responseMessage.getContent(), EventIndexStatusResponse.class);
 
                 splunkResponse.getAcks().entrySet().forEach(result -> {
-                    final FlowFile toTransfer = undetermined.get(result.getKey());
+                    FlowFile toTransfer = undetermined.get(result.getKey());

Review comment:
       I would go with an approach which keeps final in place. Like: introduce a helper method, which extracts and returns the toTransfer file from the undetermined map and enriches it with the attribute internally

##########
File path: nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/QuerySplunkIndexingStatus.java
##########
@@ -234,4 +241,14 @@ private String generateContent(final Map<Long, FlowFile> undetermined) throws IO
             return Optional.empty();
         }
     }
+
+    private static Optional<Boolean> extractBoolean(final String value) {
+        return Objects.nonNull(value) ? Optional.of(value).map(Boolean::valueOf) : Optional.empty();
+    }
+
+    private FlowFile setAckCheckedToTrue(final ProcessSession session, final FlowFile flowFile) {
+        final Map<String, String> attributes = new HashMap<>(flowFile.getAttributes());

Review comment:
       If you care about the HashMap size (which is a good thing :) ) add 1 to the flowFile.getAttributes() already, as you will extend the map. ;) 

##########
File path: nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/QuerySplunkIndexingStatus.java
##########
@@ -194,7 +197,11 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
                 final EventIndexStatusResponse splunkResponse = unmarshallResult(responseMessage.getContent(), EventIndexStatusResponse.class);
 
                 splunkResponse.getAcks().entrySet().forEach(result -> {
-                    final FlowFile toTransfer = undetermined.get(result.getKey());
+                    FlowFile toTransfer = undetermined.get(result.getKey());
+                    if (!extractBoolean(toTransfer.getAttribute(ACKCHECKED_ATTRIBUTE)).orElse(Boolean.FALSE)) {
+                        toTransfer = setAckCheckedToTrue(session, toTransfer);

Review comment:
       This extraction logic looks to be the same as above. It would be luckier to not copy it (for the case we want to change the behaviour). I would suggest some method which simply tells if a flow file has the flag set. The value is irrelevant now anyway. Something like: `private static boolean isAlreadyChecked(FlowFile ff)`. 

##########
File path: nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/QuerySplunkIndexingStatus.java
##########
@@ -194,7 +197,11 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
                 final EventIndexStatusResponse splunkResponse = unmarshallResult(responseMessage.getContent(), EventIndexStatusResponse.class);
 
                 splunkResponse.getAcks().entrySet().forEach(result -> {
-                    final FlowFile toTransfer = undetermined.get(result.getKey());
+                    FlowFile toTransfer = undetermined.get(result.getKey());
+                    if (!extractBoolean(toTransfer.getAttribute(ACKCHECKED_ATTRIBUTE)).orElse(Boolean.FALSE)) {
+                        toTransfer = setAckCheckedToTrue(session, toTransfer);
+                        session.getProvenanceReporter().modifyAttributes(toTransfer, "ackChecked attribute has been modified to true.");

Review comment:
       I am not sure we need to store this as a provenance event (however it might not an issue)

##########
File path: nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/test/java/org/apache/nifi/processors/splunk/TestQuerySplunkIndexingStatus.java
##########
@@ -121,16 +121,44 @@ public void testMoreIncomingFlowFileThanQueryLimit() throws Exception {
     }
 
     @Test
-    public void testTimedOutEvents() throws Exception {
+    public void testAckCheckedIsTrueAndFlowFileWithTimedOutEvents() throws Exception {
         // when
-        testRunner.enqueue(givenFlowFile(1, System.currentTimeMillis() - TimeUnit.HOURS.toMillis(2)));
+        testRunner.enqueue(givenFlowFile(1, System.currentTimeMillis() - TimeUnit.HOURS.toMillis(2), true));
         testRunner.run();
 
         // then
         Mockito.verify(service, Mockito.never()).send(Mockito.anyString(), Mockito.any(RequestMessage.class));
         testRunner.assertAllFlowFilesTransferred(QuerySplunkIndexingStatus.RELATIONSHIP_UNACKNOWLEDGED, 1);
     }
 
+    @Test
+    public void testAckCheckedIsFalseAndTimedOutEventFlowFileWithAcknowledgeResponse() throws Exception {
+        // when
+        final Map<Integer, Boolean> acks = new HashMap<>();

Review comment:
       Minor: you can make it more readable and shorter with Collections.singletonMap
   Also:  this should be part of the given block

##########
File path: nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/QuerySplunkIndexingStatus.java
##########
@@ -234,4 +241,14 @@ private String generateContent(final Map<Long, FlowFile> undetermined) throws IO
             return Optional.empty();
         }
     }
+
+    private static Optional<Boolean> extractBoolean(final String value) {
+        return Objects.nonNull(value) ? Optional.of(value).map(Boolean::valueOf) : Optional.empty();
+    }
+
+    private FlowFile setAckCheckedToTrue(final ProcessSession session, final FlowFile flowFile) {
+        final Map<String, String> attributes = new HashMap<>(flowFile.getAttributes());
+        attributes.put( ACKCHECKED_ATTRIBUTE, String.valueOf(Boolean.TRUE));

Review comment:
       Minor: unnecessary space after `put(`

##########
File path: nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/QuerySplunkIndexingStatus.java
##########
@@ -162,12 +164,13 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
         for (final FlowFile flowFile : flowFiles)  {
             final Optional<Long> sentAt = extractLong(flowFile.getAttribute(SplunkAPICall.RESPONDED_AT_ATTRIBUTE));
             final Optional<Long> ackId = extractLong(flowFile.getAttribute(SplunkAPICall.ACKNOWLEDGEMENT_ID_ATTRIBUTE));
+            final Optional<Boolean> ackChecked = extractBoolean(flowFile.getAttribute(ACKCHECKED_ATTRIBUTE));
 
             if (!sentAt.isPresent() || !ackId.isPresent()) {
                 getLogger().error("Flow file ({}) attributes {} and {} are expected to be set using 64-bit integer values!",
                         new Object[]{flowFile.getId(), SplunkAPICall.RESPONDED_AT_ATTRIBUTE, SplunkAPICall.ACKNOWLEDGEMENT_ID_ATTRIBUTE});
                 session.transfer(flowFile, RELATIONSHIP_FAILURE);
-            } else if (sentAt.get() + ttl < currentTime) {
+            } else if (ackChecked.orElse(Boolean.FALSE) && sentAt.get() + ttl < currentTime) {

Review comment:
       For readability, in the condition I would suggest to use something simpler. Maybe the simplest way would be to not use optional when you define ackChecked, but convert it into a boolean in the first place (in other words: consider non-existent attribute as false, not an optional empty)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org