You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by tp...@apache.org on 2021/04/09 15:27:29 UTC

[nifi] branch main updated: NIFI-8176 - Move the timeout check after we process the response from Splunk to make sure we poll for acknowledgement at least once. (No need for flag.)

This is an automated email from the ASF dual-hosted git repository.

tpalfy pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new bfd964b  NIFI-8176 - Move the timeout check after we process the response from Splunk to make sure we poll for acknowledgement at least once. (No need for flag.)
bfd964b is described below

commit bfd964b9c7fdac7190780dc03a5fe02ce754b66b
Author: Timea Barna <ti...@gmail.com>
AuthorDate: Tue Feb 16 08:40:35 2021 +0100

    NIFI-8176 - Move the timeout check after we process the response from Splunk to make sure we poll for acknowledgement at least once. (No need for flag.)
    
    This closes #4824.
    
    Signed-off-by: Tamas Palfy <ta...@gmail.com>
---
 .../processors/splunk/QuerySplunkIndexingStatus.java   | 18 ++++++++++--------
 .../apache/nifi/processors/splunk/SplunkAPICall.java   | 14 +++++++++++---
 .../additionalDetails.html                             |  5 +++--
 .../splunk/TestQuerySplunkIndexingStatus.java          | 12 ------------
 4 files changed, 24 insertions(+), 25 deletions(-)

diff --git a/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/QuerySplunkIndexingStatus.java b/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/QuerySplunkIndexingStatus.java
index 9ed5210..747e5e4 100644
--- a/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/QuerySplunkIndexingStatus.java
+++ b/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/QuerySplunkIndexingStatus.java
@@ -167,8 +167,6 @@ public class QuerySplunkIndexingStatus extends SplunkAPICall {
                 getLogger().error("Flow file ({}) attributes {} and {} are expected to be set using 64-bit integer values!",
                         new Object[]{flowFile.getId(), SplunkAPICall.RESPONDED_AT_ATTRIBUTE, SplunkAPICall.ACKNOWLEDGEMENT_ID_ATTRIBUTE});
                 session.transfer(flowFile, RELATIONSHIP_FAILURE);
-            } else if (sentAt.get() + ttl < currentTime) {
-                session.transfer(flowFile, RELATIONSHIP_UNACKNOWLEDGED);
             } else {
                 undetermined.put(ackId.get(), flowFile);
             }
@@ -193,14 +191,18 @@ public class QuerySplunkIndexingStatus extends SplunkAPICall {
             if (responseMessage.getStatus() == 200) {
                 final EventIndexStatusResponse splunkResponse = unmarshallResult(responseMessage.getContent(), EventIndexStatusResponse.class);
 
-                splunkResponse.getAcks().entrySet().forEach(result -> {
-                    final FlowFile toTransfer = undetermined.get(result.getKey());
-
-                    if (result.getValue()) {
+                splunkResponse.getAcks().forEach((flowFileId, isAcknowledged) -> {
+                    final FlowFile toTransfer = undetermined.get(flowFileId);
+                    if (isAcknowledged) {
                         session.transfer(toTransfer, RELATIONSHIP_ACKNOWLEDGED);
                     } else {
-                        session.penalize(toTransfer);
-                        session.transfer(toTransfer, RELATIONSHIP_UNDETERMINED);
+                        final Long sentAt = extractLong(toTransfer.getAttribute(SplunkAPICall.RESPONDED_AT_ATTRIBUTE)).get();
+                        if (sentAt + ttl < currentTime) {
+                            session.transfer(toTransfer, RELATIONSHIP_UNACKNOWLEDGED);
+                        } else {
+                            session.penalize(toTransfer);
+                            session.transfer(toTransfer, RELATIONSHIP_UNDETERMINED);
+                        }
                     }
                 });
             } else {
diff --git a/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/SplunkAPICall.java b/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/SplunkAPICall.java
index 46b6de8..8cf4f8a 100644
--- a/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/SplunkAPICall.java
+++ b/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/SplunkAPICall.java
@@ -54,6 +54,7 @@ abstract class SplunkAPICall extends AbstractProcessor {
 
     static final PropertyDescriptor SCHEME = new PropertyDescriptor.Builder()
             .name("Scheme")
+            .displayName("Scheme")
             .description("The scheme for connecting to Splunk.")
             .allowableValues(HTTPS_SCHEME, HTTP_SCHEME)
             .defaultValue(HTTPS_SCHEME)
@@ -62,6 +63,7 @@ abstract class SplunkAPICall extends AbstractProcessor {
 
     static final PropertyDescriptor HOSTNAME = new PropertyDescriptor.Builder()
             .name("Hostname")
+            .displayName("Hostname")
             .description("The ip address or hostname of the Splunk server.")
             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
             .defaultValue("localhost")
@@ -71,15 +73,17 @@ abstract class SplunkAPICall extends AbstractProcessor {
 
     static final PropertyDescriptor PORT = new PropertyDescriptor
             .Builder().name("Port")
-            .description("The HTTP Port Number of the Splunk server.")
+            .displayName("HTTP Event Collector Port")
+            .description("The HTTP Event Collector HTTP Port Number.")
             .required(true)
             .addValidator(StandardValidators.PORT_VALIDATOR)
-            .defaultValue("9088")
+            .defaultValue("8088")
             .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
             .build();
 
     static final PropertyDescriptor SECURITY_PROTOCOL = new PropertyDescriptor.Builder()
             .name("Security Protocol")
+            .displayName("Security Protocol")
             .description("The security protocol to use for communicating with Splunk.")
             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
             .allowableValues(TLS_1_2_VALUE, TLS_1_1_VALUE, TLS_1_VALUE, SSL_3_VALUE)
@@ -88,6 +92,7 @@ abstract class SplunkAPICall extends AbstractProcessor {
 
     static final PropertyDescriptor OWNER = new PropertyDescriptor.Builder()
             .name("Owner")
+            .displayName("Owner")
             .description("The owner to pass to Splunk.")
             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
             .required(false)
@@ -96,7 +101,8 @@ abstract class SplunkAPICall extends AbstractProcessor {
 
     static final PropertyDescriptor TOKEN = new PropertyDescriptor.Builder()
             .name("Token")
-            .description("The token to pass to Splunk.")
+            .displayName("HTTP Event Collector Token")
+            .description("HTTP Event Collector token starting with the string Splunk. For example Splunk 1234578-abcd-1234-abcd-1234abcd")
             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
             .required(false)
             .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
@@ -104,6 +110,7 @@ abstract class SplunkAPICall extends AbstractProcessor {
 
     static final PropertyDescriptor USERNAME = new PropertyDescriptor.Builder()
             .name("Username")
+            .displayName("Username")
             .description("The username to authenticate to Splunk.")
             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
             .required(false)
@@ -112,6 +119,7 @@ abstract class SplunkAPICall extends AbstractProcessor {
 
     static final PropertyDescriptor PASSWORD = new PropertyDescriptor.Builder()
             .name("Password")
+            .displayName("Password")
             .description("The password to authenticate to Splunk.")
             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
             .required(false)
diff --git a/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/resources/docs/org.apache.nifi.processors.splunk.QuerySplunkIndexingStatus/additionalDetails.html b/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/resources/docs/org.apache.nifi.processors.splunk.QuerySplunkIndexingStatus/additionalDetails.html
index 9d81de8..50f6f87 100644
--- a/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/resources/docs/org.apache.nifi.processors.splunk.QuerySplunkIndexingStatus/additionalDetails.html
+++ b/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/resources/docs/org.apache.nifi.processors.splunk.QuerySplunkIndexingStatus/additionalDetails.html
@@ -46,8 +46,9 @@
     includes unsuccessful or ongoing indexing and unknown acknowledgement identifiers. In order to avoid infinite tries,
     QuerySplunkIndexingStatus gives user the possibility to set a "Maximum waiting time". Results with value of false from Splunk
     within the specified waiting time will be handled as "undetermined" and are transferred to the "undetermined" relationship.
-    Flow files outside of this time range will be transferred to the "unacknowledged" relationship next time the processor is
-    triggered. In order to determine if the indexing of a given event is within the waiting time, the Unix Epoch of the original
+    Flow files outside of this time range will be queried as well and be transferred to either "acknowledged" or "unacknowledged"
+    relationship determined by the Splunk response.
+    In order to determine if the indexing of a given event is within the waiting time, the Unix Epoch of the original
     Splunk response is stored in the attribute "splunk.responded.at". Setting "Maximum waiting time" too low might
     result some false negative result as in case under higher load, Splunk server might index slower than it is expected.
 </p>
diff --git a/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/test/java/org/apache/nifi/processors/splunk/TestQuerySplunkIndexingStatus.java b/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/test/java/org/apache/nifi/processors/splunk/TestQuerySplunkIndexingStatus.java
index 2b91f17..bd33947 100644
--- a/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/test/java/org/apache/nifi/processors/splunk/TestQuerySplunkIndexingStatus.java
+++ b/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/test/java/org/apache/nifi/processors/splunk/TestQuerySplunkIndexingStatus.java
@@ -39,7 +39,6 @@ import java.io.UnsupportedEncodingException;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
 @RunWith(MockitoJUnitRunner.class)
@@ -121,17 +120,6 @@ public class TestQuerySplunkIndexingStatus {
     }
 
     @Test
-    public void testTimedOutEvents() throws Exception {
-        // when
-        testRunner.enqueue(givenFlowFile(1, System.currentTimeMillis() - TimeUnit.HOURS.toMillis(2)));
-        testRunner.run();
-
-        // then
-        Mockito.verify(service, Mockito.never()).send(Mockito.anyString(), Mockito.any(RequestMessage.class));
-        testRunner.assertAllFlowFilesTransferred(QuerySplunkIndexingStatus.RELATIONSHIP_UNACKNOWLEDGED, 1);
-    }
-
-    @Test
     public void testWhenFlowFileIsLackOfNecessaryAttributes() throws Exception {
         // when
         testRunner.enqueue(EVENT);