You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by GitBox <gi...@apache.org> on 2020/12/08 15:04:20 UTC

[GitHub] [nifi] markap14 commented on a change in pull request #4714: NIFI-7801 Adding support for HTTP based Splunk put and indexed acknowledgement

markap14 commented on a change in pull request #4714:
URL: https://github.com/apache/nifi/pull/4714#discussion_r538411444



##########
File path: nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/PutSplunkHTTP.java
##########
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.splunk;
+
+import com.splunk.RequestMessage;
+import com.splunk.ResponseMessage;
+import org.apache.commons.io.IOUtils;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.dto.splunk.SendRawDataResponse;
+import org.apache.nifi.dto.splunk.SendRawDataSuccessResponse;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import java.io.StringWriter;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Tags({"splunk", "logs", "http"})
+@CapabilityDescription("Sends events to the specified Splunk server over HTTP or HTTPS. Supports HEC Index Acknowledgement.")
+public class PutSplunkHTTP extends SplunkAPICall {
+    private static final String ENDPOINT = "/services/collector/raw";
+
+    static final PropertyDescriptor SOURCE = new PropertyDescriptor.Builder()
+            .name("source")
+            .displayName("Source")
+            .description("User-defined event source. Sets a default for all events when unspecified.")
+            .required(false)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    static final PropertyDescriptor SOURCE_TYPE = new PropertyDescriptor.Builder()
+            .name("source-type")
+            .displayName("Source Type")
+            .description("User-defined event sourcetype. Sets a default for all events when unspecified.")
+            .required(false)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    static final PropertyDescriptor HOST = new PropertyDescriptor.Builder()
+            .name("host")
+            .displayName("Host")
+            .description("Specify with the host query string parameter. Sets a default for all events when unspecified.")
+            .required(false)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    static final PropertyDescriptor INDEX = new PropertyDescriptor.Builder()
+            .name("index")
+            .displayName("Index")
+            .description("Index name. Specify with the index query string parameter. Sets a default for all events when unspecified.")
+            .required(false)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder()
+            .name("character-set")
+            .displayName("Character Set")
+            .description("The name of the character set.")
+            .required(true)
+            .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
+            .defaultValue(Charset.defaultCharset().name())
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    static final PropertyDescriptor CONTENT_TYPE = new PropertyDescriptor.Builder()
+            .name("content-type")
+            .displayName("Content Type")
+            .description(
+                    "The media type of the event sent to Splunk. " +
+                    "If not set, \"mime.type\" flow file attribute will be used. " +
+                    "In case of neither of them is specified, this information will not be sent to the server.")
+            .required(false)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    static final Relationship RELATIONSHIP_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("FlowFiles that are sent successfully to the destination are sent out this relationship.")
+            .build();
+
+    static final Relationship RELATIONSHIP_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("FlowFiles that failed to send to the destination are sent out this relationship.")
+            .build();
+
+    private static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
+            RELATIONSHIP_SUCCESS,
+            RELATIONSHIP_FAILURE)));
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return RELATIONSHIPS;
+    }
+
+    @Override
+    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        final List<PropertyDescriptor> result = new ArrayList<>(super.getSupportedPropertyDescriptors());
+        result.add(SOURCE);
+        result.add(SOURCE_TYPE);
+        result.add(HOST);
+        result.add(INDEX);
+        result.add(CONTENT_TYPE);
+        result.add(CHARSET);
+        return result;
+    }
+
+    private volatile String endpoint;
+    private volatile String contentType;
+    private volatile String charset;
+
+    @OnScheduled
+    public void onScheduled(final ProcessContext context) {
+        super.onScheduled(context);
+
+        if (context.getProperty(CONTENT_TYPE).isSet()) {
+            contentType = context.getProperty(CONTENT_TYPE).evaluateAttributeExpressions().getValue();
+        }
+
+        charset = context.getProperty(CHARSET).evaluateAttributeExpressions().getValue();
+
+        final Map<String, String> queryParameters = new HashMap<>();
+
+        if (context.getProperty(SOURCE_TYPE).isSet()) {
+            queryParameters.put("sourcetype", context.getProperty(SOURCE_TYPE).evaluateAttributeExpressions().getValue());
+        }
+
+        if (context.getProperty(SOURCE).isSet()) {
+            queryParameters.put("source", context.getProperty(SOURCE).evaluateAttributeExpressions().getValue());
+        }
+
+        if (context.getProperty(HOST).isSet()) {
+            queryParameters.put("host", context.getProperty(HOST).evaluateAttributeExpressions().getValue());
+        }
+
+        if (context.getProperty(INDEX).isSet()) {
+            queryParameters.put("index", context.getProperty(INDEX).evaluateAttributeExpressions().getValue());
+        }
+
+        if (queryParameters.isEmpty()) {
+            endpoint = ENDPOINT;
+        } else {
+            endpoint = ENDPOINT + '?' + queryParameters.entrySet().stream().map(e -> e.getKey() + '=' + e.getValue()).collect(Collectors.joining("&"));

Review comment:
       Should probably be URL Encoding the user-defined values.

##########
File path: nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/PutSplunkHTTP.java
##########
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.splunk;
+
+import com.splunk.RequestMessage;
+import com.splunk.ResponseMessage;
+import org.apache.commons.io.IOUtils;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.dto.splunk.SendRawDataResponse;
+import org.apache.nifi.dto.splunk.SendRawDataSuccessResponse;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import java.io.StringWriter;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Tags({"splunk", "logs", "http"})
+@CapabilityDescription("Sends events to the specified Splunk server over HTTP or HTTPS. Supports HEC Index Acknowledgement.")
+public class PutSplunkHTTP extends SplunkAPICall {
+    private static final String ENDPOINT = "/services/collector/raw";
+
+    static final PropertyDescriptor SOURCE = new PropertyDescriptor.Builder()
+            .name("source")
+            .displayName("Source")
+            .description("User-defined event source. Sets a default for all events when unspecified.")
+            .required(false)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    static final PropertyDescriptor SOURCE_TYPE = new PropertyDescriptor.Builder()
+            .name("source-type")
+            .displayName("Source Type")
+            .description("User-defined event sourcetype. Sets a default for all events when unspecified.")
+            .required(false)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    static final PropertyDescriptor HOST = new PropertyDescriptor.Builder()
+            .name("host")
+            .displayName("Host")
+            .description("Specify with the host query string parameter. Sets a default for all events when unspecified.")
+            .required(false)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    static final PropertyDescriptor INDEX = new PropertyDescriptor.Builder()
+            .name("index")
+            .displayName("Index")
+            .description("Index name. Specify with the index query string parameter. Sets a default for all events when unspecified.")
+            .required(false)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder()
+            .name("character-set")
+            .displayName("Character Set")
+            .description("The name of the character set.")
+            .required(true)
+            .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
+            .defaultValue(Charset.defaultCharset().name())
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    static final PropertyDescriptor CONTENT_TYPE = new PropertyDescriptor.Builder()
+            .name("content-type")
+            .displayName("Content Type")
+            .description(
+                    "The media type of the event sent to Splunk. " +
+                    "If not set, \"mime.type\" flow file attribute will be used. " +
+                    "In case of neither of them is specified, this information will not be sent to the server.")
+            .required(false)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    static final Relationship RELATIONSHIP_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("FlowFiles that are sent successfully to the destination are sent out this relationship.")
+            .build();
+
+    static final Relationship RELATIONSHIP_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("FlowFiles that failed to send to the destination are sent out this relationship.")
+            .build();
+
+    private static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
+            RELATIONSHIP_SUCCESS,
+            RELATIONSHIP_FAILURE)));
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return RELATIONSHIPS;
+    }
+
+    @Override
+    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        final List<PropertyDescriptor> result = new ArrayList<>(super.getSupportedPropertyDescriptors());
+        result.add(SOURCE);
+        result.add(SOURCE_TYPE);
+        result.add(HOST);
+        result.add(INDEX);
+        result.add(CONTENT_TYPE);
+        result.add(CHARSET);
+        return result;
+    }
+
+    private volatile String endpoint;
+    private volatile String contentType;
+    private volatile String charset;
+
+    @OnScheduled
+    public void onScheduled(final ProcessContext context) {
+        super.onScheduled(context);
+
+        if (context.getProperty(CONTENT_TYPE).isSet()) {
+            contentType = context.getProperty(CONTENT_TYPE).evaluateAttributeExpressions().getValue();
+        }
+
+        charset = context.getProperty(CHARSET).evaluateAttributeExpressions().getValue();
+
+        final Map<String, String> queryParameters = new HashMap<>();
+
+        if (context.getProperty(SOURCE_TYPE).isSet()) {
+            queryParameters.put("sourcetype", context.getProperty(SOURCE_TYPE).evaluateAttributeExpressions().getValue());
+        }
+
+        if (context.getProperty(SOURCE).isSet()) {
+            queryParameters.put("source", context.getProperty(SOURCE).evaluateAttributeExpressions().getValue());
+        }
+
+        if (context.getProperty(HOST).isSet()) {
+            queryParameters.put("host", context.getProperty(HOST).evaluateAttributeExpressions().getValue());
+        }
+
+        if (context.getProperty(INDEX).isSet()) {
+            queryParameters.put("index", context.getProperty(INDEX).evaluateAttributeExpressions().getValue());
+        }
+
+        if (queryParameters.isEmpty()) {
+            endpoint = ENDPOINT;
+        } else {
+            endpoint = ENDPOINT + '?' + queryParameters.entrySet().stream().map(e -> e.getKey() + '=' + e.getValue()).collect(Collectors.joining("&"));
+        }
+    }
+
+    @OnUnscheduled
+    public void onUnscheduled() {

Review comment:
       These should not be done in an @OnUnscheduled method because it can lead to a race condition that can result in values be `null`ed out while the Processor is still running. Instead, this should be done in a method that uses the `@OnStopped` annotation.

##########
File path: nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/PutSplunkHTTP.java
##########
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.splunk;
+
+import com.splunk.RequestMessage;
+import com.splunk.ResponseMessage;
+import org.apache.commons.io.IOUtils;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.dto.splunk.SendRawDataResponse;
+import org.apache.nifi.dto.splunk.SendRawDataSuccessResponse;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import java.io.StringWriter;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Tags({"splunk", "logs", "http"})
+@CapabilityDescription("Sends events to the specified Splunk server over HTTP or HTTPS. Supports HEC Index Acknowledgement.")

Review comment:
       The phrase "Sends events" makes me wonder what type of data is expected here. Are these JSON, logs, etc.? Does the data format matter, or is it sent as a bunch of bytes? Does NiFi parse these events in any way?

##########
File path: nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/PutSplunkHTTP.java
##########
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.splunk;
+
+import com.splunk.RequestMessage;
+import com.splunk.ResponseMessage;
+import org.apache.commons.io.IOUtils;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.dto.splunk.SendRawDataResponse;
+import org.apache.nifi.dto.splunk.SendRawDataSuccessResponse;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import java.io.StringWriter;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Tags({"splunk", "logs", "http"})
+@CapabilityDescription("Sends events to the specified Splunk server over HTTP or HTTPS. Supports HEC Index Acknowledgement.")
+public class PutSplunkHTTP extends SplunkAPICall {
+    private static final String ENDPOINT = "/services/collector/raw";
+
+    static final PropertyDescriptor SOURCE = new PropertyDescriptor.Builder()
+            .name("source")
+            .displayName("Source")
+            .description("User-defined event source. Sets a default for all events when unspecified.")
+            .required(false)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    static final PropertyDescriptor SOURCE_TYPE = new PropertyDescriptor.Builder()
+            .name("source-type")
+            .displayName("Source Type")
+            .description("User-defined event sourcetype. Sets a default for all events when unspecified.")
+            .required(false)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    static final PropertyDescriptor HOST = new PropertyDescriptor.Builder()
+            .name("host")
+            .displayName("Host")
+            .description("Specify with the host query string parameter. Sets a default for all events when unspecified.")
+            .required(false)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    static final PropertyDescriptor INDEX = new PropertyDescriptor.Builder()
+            .name("index")
+            .displayName("Index")
+            .description("Index name. Specify with the index query string parameter. Sets a default for all events when unspecified.")
+            .required(false)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder()
+            .name("character-set")
+            .displayName("Character Set")
+            .description("The name of the character set.")
+            .required(true)
+            .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
+            .defaultValue(Charset.defaultCharset().name())
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    static final PropertyDescriptor CONTENT_TYPE = new PropertyDescriptor.Builder()
+            .name("content-type")
+            .displayName("Content Type")
+            .description(
+                    "The media type of the event sent to Splunk. " +
+                    "If not set, \"mime.type\" flow file attribute will be used. " +
+                    "In case of neither of them is specified, this information will not be sent to the server.")
+            .required(false)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    static final Relationship RELATIONSHIP_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("FlowFiles that are sent successfully to the destination are sent out this relationship.")
+            .build();
+
+    static final Relationship RELATIONSHIP_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("FlowFiles that failed to send to the destination are sent out this relationship.")
+            .build();
+
+    private static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
+            RELATIONSHIP_SUCCESS,
+            RELATIONSHIP_FAILURE)));
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return RELATIONSHIPS;
+    }
+
+    @Override
+    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        final List<PropertyDescriptor> result = new ArrayList<>(super.getSupportedPropertyDescriptors());
+        result.add(SOURCE);
+        result.add(SOURCE_TYPE);
+        result.add(HOST);
+        result.add(INDEX);
+        result.add(CONTENT_TYPE);
+        result.add(CHARSET);
+        return result;
+    }
+
+    private volatile String endpoint;
+    private volatile String contentType;
+    private volatile String charset;
+
+    @OnScheduled
+    public void onScheduled(final ProcessContext context) {
+        super.onScheduled(context);
+
+        if (context.getProperty(CONTENT_TYPE).isSet()) {
+            contentType = context.getProperty(CONTENT_TYPE).evaluateAttributeExpressions().getValue();
+        }
+
+        charset = context.getProperty(CHARSET).evaluateAttributeExpressions().getValue();
+
+        final Map<String, String> queryParameters = new HashMap<>();
+
+        if (context.getProperty(SOURCE_TYPE).isSet()) {
+            queryParameters.put("sourcetype", context.getProperty(SOURCE_TYPE).evaluateAttributeExpressions().getValue());
+        }
+
+        if (context.getProperty(SOURCE).isSet()) {
+            queryParameters.put("source", context.getProperty(SOURCE).evaluateAttributeExpressions().getValue());
+        }
+
+        if (context.getProperty(HOST).isSet()) {
+            queryParameters.put("host", context.getProperty(HOST).evaluateAttributeExpressions().getValue());
+        }
+
+        if (context.getProperty(INDEX).isSet()) {
+            queryParameters.put("index", context.getProperty(INDEX).evaluateAttributeExpressions().getValue());
+        }
+
+        if (queryParameters.isEmpty()) {
+            endpoint = ENDPOINT;
+        } else {
+            endpoint = ENDPOINT + '?' + queryParameters.entrySet().stream().map(e -> e.getKey() + '=' + e.getValue()).collect(Collectors.joining("&"));
+        }
+    }
+
+    @OnUnscheduled
+    public void onUnscheduled() {
+        super.onUnscheduled();
+        contentType = null;
+        charset = null;
+        endpoint = null;
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+        final FlowFile incomingFlowFile = session.get();
+        FlowFile outgoingFlowFile = incomingFlowFile;
+        boolean success = false;
+
+        try {
+            final RequestMessage requestMessage = createRequestMessage(session, incomingFlowFile);
+            final ResponseMessage responseMessage = call(endpoint, requestMessage);
+            outgoingFlowFile = session.putAttribute(outgoingFlowFile, "splunk_status_code", String.valueOf(responseMessage.getStatus()));
+
+            switch (responseMessage.getStatus()) {
+                case 200:
+                    final SendRawDataSuccessResponse successResponse = extractResult(responseMessage.getContent(), SendRawDataSuccessResponse.class);
+
+                    if (successResponse.getCode() == 0) {
+                        outgoingFlowFile = enrichFlowFile(session, outgoingFlowFile, successResponse.getAckId());
+                        success = true;
+                    } else {
+                        outgoingFlowFile = session.putAttribute(outgoingFlowFile, "splunk_response_code", String.valueOf(successResponse.getCode()));
+                        getLogger().error("Putting data into Splunk was not successful: (" + successResponse.getCode() + ") " + successResponse.getText());
+                    }
+
+                    break;
+                case 503 : // HEC is unhealthy, queues are full
+                    context.yield();
+                default:
+                    final SendRawDataResponse response = extractResult(responseMessage.getContent(), SendRawDataResponse.class);
+                    getLogger().error("Putting data into Splunk was not successful: " + response.getText());
+            }
+        } catch (final Exception e) {
+            getLogger().error("Error during communication with Splunk: " + e.getMessage(), e);
+        } finally {
+            session.transfer(outgoingFlowFile, success ? RELATIONSHIP_SUCCESS : RELATIONSHIP_FAILURE);
+        }
+    }
+
+    private RequestMessage createRequestMessage(final ProcessSession session, final FlowFile flowFile) {
+        final RequestMessage requestMessage = new RequestMessage("POST");
+        final String flowFileContentType = Optional.ofNullable(contentType).orElse(flowFile.getAttribute("mime.type"));
+
+        if (flowFileContentType != null) {
+            requestMessage.getHeader().put("Content-Type", flowFileContentType);
+        }
+
+        requestMessage.setContent(extractTextMessageBody(flowFile, session, charset));
+        return requestMessage;
+    }
+
+    private String extractTextMessageBody(final FlowFile flowFile, final ProcessSession session, final String charset) {
+        final StringWriter writer = new StringWriter();
+        session.read(flowFile, in -> IOUtils.copy(in, writer, Charset.forName(charset)));

Review comment:
       Any processor that reads the entire contents of FlowFile into memory should use the `@SystemResourceConsideration` annotation documenting that Memory is an important Resource Consideration because the entire FlowFile is loaded into memory.

##########
File path: nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/QuerySplunkIndexingStatus.java
##########
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.splunk;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.splunk.RequestMessage;
+import com.splunk.ResponseMessage;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.dto.splunk.EventIndexStatusRequest;
+import org.apache.nifi.dto.splunk.EventIndexStatusResponse;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Tags({"splunk", "logs", "http", "acknowledgement"})
+@CapabilityDescription("Queries Splunk server in order to acquire the status of indexing acknowledgement.")
+public class QuerySplunkIndexingStatus extends SplunkAPICall {
+    private static final String ENDPOINT = "/services/collector/ack";
+
+    static final Relationship RELATIONSHIP_ACKNOWLEDGED = new Relationship.Builder()
+            .name("success")
+            .description("A FlowFile is transferred into this relationship when the acknowledgement was successful.")
+            .build();
+
+    static final Relationship RELATIONSHIP_UNACKNOWLEDGED = new Relationship.Builder()
+            .name("unacknowledged")
+            .description("A FlowFile is transferred into this relationship when the acknowledgement was not successful.")
+            .build();
+
+    static final Relationship RELATIONSHIP_UNDETERMINED = new Relationship.Builder()
+            .name("undetermined")
+            .description(
+                    "A FlowFile is transferred into this relationship when the acknowledgement state is not determined. " +
+                    "Flow files transferred into this relationship might be penalized!")
+            .build();
+
+    static final Relationship RELATIONSHIP_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("A FlowFile is transferred into this relationship when the acknowledgement was not successful.")
+            .build();
+
+    private static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
+            RELATIONSHIP_ACKNOWLEDGED,
+            RELATIONSHIP_UNACKNOWLEDGED,
+            RELATIONSHIP_UNDETERMINED,
+            RELATIONSHIP_FAILURE
+    )));
+
+    static final PropertyDescriptor TTL = new PropertyDescriptor.Builder()
+            .name("ttl")
+            .displayName("Maximum waiting time")
+            .description(
+                    "The maximum time the service tries to acquire acknowledgement confirmation for an index, from the point of registration. " +
+                    "After the given amount of time, the service considers the index as not acknowledged and moves it into the output buffer as failed acknowledgement.")
+            .defaultValue("100 secs")
+            .required(true)
+            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor MAX_QUERY_SIZE = new PropertyDescriptor.Builder()
+            .name("max-query-size")
+            .displayName("Maximum Query Size")
+            .description(
+                    "The maximum number of acknowledgement identifiers the service query status for in one batch. " +
+                    "It is suggested to not set it too low in order to reduce network communication.")
+            .defaultValue("100")
+            .required(true)
+            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+            .build();
+
+    private volatile Integer maxQuerySize;
+    private volatile Integer ttl;
+
+    @Override
+    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        final List<PropertyDescriptor> result = new ArrayList<>();
+        final List<PropertyDescriptor> common = super.getSupportedPropertyDescriptors();
+        result.addAll(common);
+        result.add(TTL);
+        result.add(MAX_QUERY_SIZE);
+        return result;
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return RELATIONSHIPS;
+    }
+
+    @OnScheduled
+    public void onScheduled(final ProcessContext context) {
+        super.onScheduled(context);
+        maxQuerySize = context.getProperty(MAX_QUERY_SIZE).asInteger();
+        ttl = context.getProperty(TTL).asTimePeriod(TimeUnit.MILLISECONDS).intValue();
+    }
+
+    @OnUnscheduled
+    public void onUnscheduled() {
+        super.onUnscheduled();
+        maxQuerySize = null;
+        ttl = null;
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+        final long currentTime = System.currentTimeMillis();
+        final List<FlowFile> flowFiles = session.get(maxQuerySize);
+        final Map<Integer, FlowFile> undetermined = new HashMap<>();
+
+        for (final FlowFile flowFile : flowFiles)  {
+            final String insertedAt = flowFile.getAttribute(insertedAtAttributeName);
+            final String ackId = flowFile.getAttribute(ackIdAttributeName);
+
+            if (ackId == null || insertedAt == null) {
+                getLogger().error("Flow file attributes \"" + insertedAtAttributeName + "\" and \"" + ackIdAttributeName + "\" are needed!");
+                session.transfer(flowFile, RELATIONSHIP_FAILURE);
+            } else if (Long.valueOf(insertedAt) + ttl < currentTime) {
+                session.transfer(flowFile, RELATIONSHIP_UNACKNOWLEDGED);
+            } else {
+                undetermined.put(Integer.valueOf(flowFile.getAttribute(ackIdAttributeName)), flowFile);
+            }
+        }
+
+        if (undetermined.isEmpty()) {
+            getLogger().info("There was no eligible flow file to send request to Splunk.");
+            return;
+        }
+
+        try {
+            final RequestMessage requestMessage = createRequestMessage(undetermined);
+
+            final ResponseMessage responseMessage = call(ENDPOINT, requestMessage);
+
+            if (responseMessage.getStatus() == 200) {
+                final EventIndexStatusResponse splunkResponse = extractResult(responseMessage.getContent(), EventIndexStatusResponse.class);
+
+                splunkResponse.getAcks().entrySet().stream().forEach(result -> {
+                    if (result.getValue()) {
+                        session.transfer(undetermined.get(result.getKey()), RELATIONSHIP_ACKNOWLEDGED);
+                    } else {
+                        session.penalize(undetermined.get(result.getKey()));
+                        session.transfer(undetermined.get(result.getKey()), RELATIONSHIP_UNDETERMINED);
+                    }
+                });
+            } else {
+                getLogger().error("Query index status was not successful because of (" + responseMessage.getStatus() + ") " + responseMessage.getContent());
+                context.yield();
+                session.transfer(undetermined.values(), RELATIONSHIP_UNDETERMINED);
+            }
+        } catch (final IOException e) {
+            throw new ProcessException(e);
+        }
+    }
+
+    private RequestMessage createRequestMessage(Map<Integer, FlowFile> undetermined) throws JsonProcessingException {
+        final RequestMessage requestMessage = new RequestMessage("POST");
+        requestMessage.getHeader().put("Content-Type", "application/json");
+        requestMessage.setContent(generateContent(undetermined));
+        return requestMessage;
+    }
+
+    private String generateContent(final Map<Integer, FlowFile> undetermined) throws JsonProcessingException {
+        final EventIndexStatusRequest splunkRequest = new EventIndexStatusRequest();
+        splunkRequest.setAcks(undetermined.keySet().stream().collect(Collectors.toList()));

Review comment:
       Should probably use `splunkRequest.setAcks(new ArrayList<>(undetermined.keySet()));` - the call to `.stream()` is quite expensive.

##########
File path: nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/QuerySplunkIndexingStatus.java
##########
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.splunk;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.splunk.RequestMessage;
+import com.splunk.ResponseMessage;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.dto.splunk.EventIndexStatusRequest;
+import org.apache.nifi.dto.splunk.EventIndexStatusResponse;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Tags({"splunk", "logs", "http", "acknowledgement"})
+@CapabilityDescription("Queries Splunk server in order to acquire the status of indexing acknowledgement.")
+public class QuerySplunkIndexingStatus extends SplunkAPICall {
+    private static final String ENDPOINT = "/services/collector/ack";
+
+    static final Relationship RELATIONSHIP_ACKNOWLEDGED = new Relationship.Builder()
+            .name("success")
+            .description("A FlowFile is transferred into this relationship when the acknowledgement was successful.")
+            .build();
+
+    static final Relationship RELATIONSHIP_UNACKNOWLEDGED = new Relationship.Builder()
+            .name("unacknowledged")
+            .description("A FlowFile is transferred into this relationship when the acknowledgement was not successful.")
+            .build();
+
+    static final Relationship RELATIONSHIP_UNDETERMINED = new Relationship.Builder()
+            .name("undetermined")
+            .description(
+                    "A FlowFile is transferred into this relationship when the acknowledgement state is not determined. " +
+                    "Flow files transferred into this relationship might be penalized!")
+            .build();
+
+    static final Relationship RELATIONSHIP_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("A FlowFile is transferred into this relationship when the acknowledgement was not successful.")
+            .build();
+
+    private static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
+            RELATIONSHIP_ACKNOWLEDGED,
+            RELATIONSHIP_UNACKNOWLEDGED,
+            RELATIONSHIP_UNDETERMINED,
+            RELATIONSHIP_FAILURE
+    )));
+
+    static final PropertyDescriptor TTL = new PropertyDescriptor.Builder()
+            .name("ttl")
+            .displayName("Maximum waiting time")
+            .description(
+                    "The maximum time the service tries to acquire acknowledgement confirmation for an index, from the point of registration. " +
+                    "After the given amount of time, the service considers the index as not acknowledged and moves it into the output buffer as failed acknowledgement.")
+            .defaultValue("100 secs")
+            .required(true)
+            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor MAX_QUERY_SIZE = new PropertyDescriptor.Builder()
+            .name("max-query-size")
+            .displayName("Maximum Query Size")
+            .description(
+                    "The maximum number of acknowledgement identifiers the service query status for in one batch. " +
+                    "It is suggested to not set it too low in order to reduce network communication.")
+            .defaultValue("100")
+            .required(true)
+            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+            .build();
+
+    private volatile Integer maxQuerySize;
+    private volatile Integer ttl;
+
+    @Override
+    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        final List<PropertyDescriptor> result = new ArrayList<>();
+        final List<PropertyDescriptor> common = super.getSupportedPropertyDescriptors();
+        result.addAll(common);
+        result.add(TTL);
+        result.add(MAX_QUERY_SIZE);
+        return result;
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return RELATIONSHIPS;
+    }
+
+    @OnScheduled
+    public void onScheduled(final ProcessContext context) {
+        super.onScheduled(context);
+        maxQuerySize = context.getProperty(MAX_QUERY_SIZE).asInteger();
+        ttl = context.getProperty(TTL).asTimePeriod(TimeUnit.MILLISECONDS).intValue();
+    }
+
+    @OnUnscheduled
+    public void onUnscheduled() {
+        super.onUnscheduled();
+        maxQuerySize = null;
+        ttl = null;
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+        final long currentTime = System.currentTimeMillis();
+        final List<FlowFile> flowFiles = session.get(maxQuerySize);
+        final Map<Integer, FlowFile> undetermined = new HashMap<>();
+
+        for (final FlowFile flowFile : flowFiles)  {
+            final String insertedAt = flowFile.getAttribute(insertedAtAttributeName);
+            final String ackId = flowFile.getAttribute(ackIdAttributeName);
+
+            if (ackId == null || insertedAt == null) {
+                getLogger().error("Flow file attributes \"" + insertedAtAttributeName + "\" and \"" + ackIdAttributeName + "\" are needed!");

Review comment:
       Should indicate the FlowFile in the error message, and should use `{}` rather than concatenating strings.

##########
File path: nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/QuerySplunkIndexingStatus.java
##########
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.splunk;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.splunk.RequestMessage;
+import com.splunk.ResponseMessage;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.dto.splunk.EventIndexStatusRequest;
+import org.apache.nifi.dto.splunk.EventIndexStatusResponse;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Tags({"splunk", "logs", "http", "acknowledgement"})
+@CapabilityDescription("Queries Splunk server in order to acquire the status of indexing acknowledgement.")
+public class QuerySplunkIndexingStatus extends SplunkAPICall {
+    private static final String ENDPOINT = "/services/collector/ack";
+
+    static final Relationship RELATIONSHIP_ACKNOWLEDGED = new Relationship.Builder()
+            .name("success")
+            .description("A FlowFile is transferred into this relationship when the acknowledgement was successful.")
+            .build();
+
+    static final Relationship RELATIONSHIP_UNACKNOWLEDGED = new Relationship.Builder()
+            .name("unacknowledged")
+            .description("A FlowFile is transferred into this relationship when the acknowledgement was not successful.")
+            .build();
+
+    static final Relationship RELATIONSHIP_UNDETERMINED = new Relationship.Builder()
+            .name("undetermined")
+            .description(
+                    "A FlowFile is transferred into this relationship when the acknowledgement state is not determined. " +
+                    "Flow files transferred into this relationship might be penalized!")
+            .build();
+
+    static final Relationship RELATIONSHIP_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("A FlowFile is transferred into this relationship when the acknowledgement was not successful.")
+            .build();
+
+    private static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
+            RELATIONSHIP_ACKNOWLEDGED,
+            RELATIONSHIP_UNACKNOWLEDGED,
+            RELATIONSHIP_UNDETERMINED,
+            RELATIONSHIP_FAILURE
+    )));
+
+    static final PropertyDescriptor TTL = new PropertyDescriptor.Builder()
+            .name("ttl")
+            .displayName("Maximum waiting time")
+            .description(
+                    "The maximum time the service tries to acquire acknowledgement confirmation for an index, from the point of registration. " +
+                    "After the given amount of time, the service considers the index as not acknowledged and moves it into the output buffer as failed acknowledgement.")
+            .defaultValue("100 secs")
+            .required(true)
+            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor MAX_QUERY_SIZE = new PropertyDescriptor.Builder()
+            .name("max-query-size")
+            .displayName("Maximum Query Size")
+            .description(
+                    "The maximum number of acknowledgement identifiers the service query status for in one batch. " +
+                    "It is suggested to not set it too low in order to reduce network communication.")
+            .defaultValue("100")
+            .required(true)
+            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+            .build();
+
+    private volatile Integer maxQuerySize;
+    private volatile Integer ttl;
+
+    @Override
+    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        final List<PropertyDescriptor> result = new ArrayList<>();
+        final List<PropertyDescriptor> common = super.getSupportedPropertyDescriptors();
+        result.addAll(common);
+        result.add(TTL);
+        result.add(MAX_QUERY_SIZE);
+        return result;
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return RELATIONSHIPS;
+    }
+
+    @OnScheduled
+    public void onScheduled(final ProcessContext context) {
+        super.onScheduled(context);
+        maxQuerySize = context.getProperty(MAX_QUERY_SIZE).asInteger();
+        ttl = context.getProperty(TTL).asTimePeriod(TimeUnit.MILLISECONDS).intValue();
+    }
+
+    @OnUnscheduled
+    public void onUnscheduled() {
+        super.onUnscheduled();
+        maxQuerySize = null;
+        ttl = null;
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+        final long currentTime = System.currentTimeMillis();
+        final List<FlowFile> flowFiles = session.get(maxQuerySize);
+        final Map<Integer, FlowFile> undetermined = new HashMap<>();
+
+        for (final FlowFile flowFile : flowFiles)  {
+            final String insertedAt = flowFile.getAttribute(insertedAtAttributeName);
+            final String ackId = flowFile.getAttribute(ackIdAttributeName);
+
+            if (ackId == null || insertedAt == null) {
+                getLogger().error("Flow file attributes \"" + insertedAtAttributeName + "\" and \"" + ackIdAttributeName + "\" are needed!");
+                session.transfer(flowFile, RELATIONSHIP_FAILURE);
+            } else if (Long.valueOf(insertedAt) + ttl < currentTime) {
+                session.transfer(flowFile, RELATIONSHIP_UNACKNOWLEDGED);
+            } else {
+                undetermined.put(Integer.valueOf(flowFile.getAttribute(ackIdAttributeName)), flowFile);
+            }
+        }
+
+        if (undetermined.isEmpty()) {
+            getLogger().info("There was no eligible flow file to send request to Splunk.");
+            return;
+        }
+
+        try {
+            final RequestMessage requestMessage = createRequestMessage(undetermined);
+
+            final ResponseMessage responseMessage = call(ENDPOINT, requestMessage);
+
+            if (responseMessage.getStatus() == 200) {
+                final EventIndexStatusResponse splunkResponse = extractResult(responseMessage.getContent(), EventIndexStatusResponse.class);
+
+                splunkResponse.getAcks().entrySet().stream().forEach(result -> {

Review comment:
       Can avoid the call to `stream()` and just use `...entrySet().forEach(result -> {`. Changing to a Stream doesn't buy us anything here but is expensive.

##########
File path: nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/PutSplunkHTTP.java
##########
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.splunk;
+
+import com.splunk.RequestMessage;
+import com.splunk.ResponseMessage;
+import org.apache.commons.io.IOUtils;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.dto.splunk.SendRawDataResponse;
+import org.apache.nifi.dto.splunk.SendRawDataSuccessResponse;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import java.io.StringWriter;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Tags({"splunk", "logs", "http"})
+@CapabilityDescription("Sends events to the specified Splunk server over HTTP or HTTPS. Supports HEC Index Acknowledgement.")
+public class PutSplunkHTTP extends SplunkAPICall {
+    private static final String ENDPOINT = "/services/collector/raw";
+
+    static final PropertyDescriptor SOURCE = new PropertyDescriptor.Builder()
+            .name("source")
+            .displayName("Source")
+            .description("User-defined event source. Sets a default for all events when unspecified.")
+            .required(false)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    static final PropertyDescriptor SOURCE_TYPE = new PropertyDescriptor.Builder()
+            .name("source-type")
+            .displayName("Source Type")
+            .description("User-defined event sourcetype. Sets a default for all events when unspecified.")
+            .required(false)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    static final PropertyDescriptor HOST = new PropertyDescriptor.Builder()
+            .name("host")
+            .displayName("Host")
+            .description("Specify with the host query string parameter. Sets a default for all events when unspecified.")
+            .required(false)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    static final PropertyDescriptor INDEX = new PropertyDescriptor.Builder()
+            .name("index")
+            .displayName("Index")
+            .description("Index name. Specify with the index query string parameter. Sets a default for all events when unspecified.")
+            .required(false)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder()
+            .name("character-set")
+            .displayName("Character Set")
+            .description("The name of the character set.")
+            .required(true)
+            .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
+            .defaultValue(Charset.defaultCharset().name())
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    static final PropertyDescriptor CONTENT_TYPE = new PropertyDescriptor.Builder()
+            .name("content-type")
+            .displayName("Content Type")
+            .description(
+                    "The media type of the event sent to Splunk. " +
+                    "If not set, \"mime.type\" flow file attribute will be used. " +
+                    "In case of neither of them is specified, this information will not be sent to the server.")
+            .required(false)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    static final Relationship RELATIONSHIP_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("FlowFiles that are sent successfully to the destination are sent out this relationship.")
+            .build();
+
+    static final Relationship RELATIONSHIP_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("FlowFiles that failed to send to the destination are sent out this relationship.")
+            .build();
+
+    private static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
+            RELATIONSHIP_SUCCESS,
+            RELATIONSHIP_FAILURE)));
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return RELATIONSHIPS;
+    }
+
+    @Override
+    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        final List<PropertyDescriptor> result = new ArrayList<>(super.getSupportedPropertyDescriptors());
+        result.add(SOURCE);
+        result.add(SOURCE_TYPE);
+        result.add(HOST);
+        result.add(INDEX);
+        result.add(CONTENT_TYPE);
+        result.add(CHARSET);
+        return result;
+    }
+
+    private volatile String endpoint;
+    private volatile String contentType;
+    private volatile String charset;
+
+    @OnScheduled
+    public void onScheduled(final ProcessContext context) {
+        super.onScheduled(context);
+
+        if (context.getProperty(CONTENT_TYPE).isSet()) {
+            contentType = context.getProperty(CONTENT_TYPE).evaluateAttributeExpressions().getValue();
+        }
+
+        charset = context.getProperty(CHARSET).evaluateAttributeExpressions().getValue();
+
+        final Map<String, String> queryParameters = new HashMap<>();
+
+        if (context.getProperty(SOURCE_TYPE).isSet()) {
+            queryParameters.put("sourcetype", context.getProperty(SOURCE_TYPE).evaluateAttributeExpressions().getValue());
+        }
+
+        if (context.getProperty(SOURCE).isSet()) {
+            queryParameters.put("source", context.getProperty(SOURCE).evaluateAttributeExpressions().getValue());
+        }
+
+        if (context.getProperty(HOST).isSet()) {
+            queryParameters.put("host", context.getProperty(HOST).evaluateAttributeExpressions().getValue());
+        }
+
+        if (context.getProperty(INDEX).isSet()) {
+            queryParameters.put("index", context.getProperty(INDEX).evaluateAttributeExpressions().getValue());
+        }
+
+        if (queryParameters.isEmpty()) {
+            endpoint = ENDPOINT;
+        } else {
+            endpoint = ENDPOINT + '?' + queryParameters.entrySet().stream().map(e -> e.getKey() + '=' + e.getValue()).collect(Collectors.joining("&"));
+        }
+    }
+
+    @OnUnscheduled
+    public void onUnscheduled() {
+        super.onUnscheduled();
+        contentType = null;
+        charset = null;
+        endpoint = null;
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+        final FlowFile incomingFlowFile = session.get();
+        FlowFile outgoingFlowFile = incomingFlowFile;
+        boolean success = false;
+
+        try {
+            final RequestMessage requestMessage = createRequestMessage(session, incomingFlowFile);
+            final ResponseMessage responseMessage = call(endpoint, requestMessage);
+            outgoingFlowFile = session.putAttribute(outgoingFlowFile, "splunk_status_code", String.valueOf(responseMessage.getStatus()));

Review comment:
       Best to stick with the dotted-name convention (splunk.status.code) as that's the most common for nifi attribute (mime.type, for example). Also, should add a @WritesAttribute annotation documenting that this attribute is added.

##########
File path: nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/PutSplunkHTTP.java
##########
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.splunk;
+
+import com.splunk.RequestMessage;
+import com.splunk.ResponseMessage;
+import org.apache.commons.io.IOUtils;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.dto.splunk.SendRawDataResponse;
+import org.apache.nifi.dto.splunk.SendRawDataSuccessResponse;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import java.io.StringWriter;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Tags({"splunk", "logs", "http"})
+@CapabilityDescription("Sends events to the specified Splunk server over HTTP or HTTPS. Supports HEC Index Acknowledgement.")
+public class PutSplunkHTTP extends SplunkAPICall {
+    private static final String ENDPOINT = "/services/collector/raw";
+
+    static final PropertyDescriptor SOURCE = new PropertyDescriptor.Builder()
+            .name("source")
+            .displayName("Source")
+            .description("User-defined event source. Sets a default for all events when unspecified.")
+            .required(false)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    static final PropertyDescriptor SOURCE_TYPE = new PropertyDescriptor.Builder()
+            .name("source-type")
+            .displayName("Source Type")
+            .description("User-defined event sourcetype. Sets a default for all events when unspecified.")
+            .required(false)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    static final PropertyDescriptor HOST = new PropertyDescriptor.Builder()
+            .name("host")
+            .displayName("Host")
+            .description("Specify with the host query string parameter. Sets a default for all events when unspecified.")
+            .required(false)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    static final PropertyDescriptor INDEX = new PropertyDescriptor.Builder()
+            .name("index")
+            .displayName("Index")
+            .description("Index name. Specify with the index query string parameter. Sets a default for all events when unspecified.")
+            .required(false)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder()
+            .name("character-set")
+            .displayName("Character Set")
+            .description("The name of the character set.")
+            .required(true)
+            .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
+            .defaultValue(Charset.defaultCharset().name())
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    static final PropertyDescriptor CONTENT_TYPE = new PropertyDescriptor.Builder()
+            .name("content-type")
+            .displayName("Content Type")
+            .description(
+                    "The media type of the event sent to Splunk. " +
+                    "If not set, \"mime.type\" flow file attribute will be used. " +
+                    "In case of neither of them is specified, this information will not be sent to the server.")
+            .required(false)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    static final Relationship RELATIONSHIP_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("FlowFiles that are sent successfully to the destination are sent out this relationship.")
+            .build();
+
+    static final Relationship RELATIONSHIP_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("FlowFiles that failed to send to the destination are sent out this relationship.")
+            .build();
+
+    private static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
+            RELATIONSHIP_SUCCESS,
+            RELATIONSHIP_FAILURE)));
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return RELATIONSHIPS;
+    }
+
+    @Override
+    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        final List<PropertyDescriptor> result = new ArrayList<>(super.getSupportedPropertyDescriptors());
+        result.add(SOURCE);
+        result.add(SOURCE_TYPE);
+        result.add(HOST);
+        result.add(INDEX);
+        result.add(CONTENT_TYPE);
+        result.add(CHARSET);
+        return result;
+    }
+
+    private volatile String endpoint;
+    private volatile String contentType;
+    private volatile String charset;
+
+    @OnScheduled
+    public void onScheduled(final ProcessContext context) {
+        super.onScheduled(context);
+
+        if (context.getProperty(CONTENT_TYPE).isSet()) {
+            contentType = context.getProperty(CONTENT_TYPE).evaluateAttributeExpressions().getValue();
+        }
+
+        charset = context.getProperty(CHARSET).evaluateAttributeExpressions().getValue();
+
+        final Map<String, String> queryParameters = new HashMap<>();
+
+        if (context.getProperty(SOURCE_TYPE).isSet()) {
+            queryParameters.put("sourcetype", context.getProperty(SOURCE_TYPE).evaluateAttributeExpressions().getValue());
+        }
+
+        if (context.getProperty(SOURCE).isSet()) {
+            queryParameters.put("source", context.getProperty(SOURCE).evaluateAttributeExpressions().getValue());
+        }
+
+        if (context.getProperty(HOST).isSet()) {
+            queryParameters.put("host", context.getProperty(HOST).evaluateAttributeExpressions().getValue());
+        }
+
+        if (context.getProperty(INDEX).isSet()) {
+            queryParameters.put("index", context.getProperty(INDEX).evaluateAttributeExpressions().getValue());
+        }
+
+        if (queryParameters.isEmpty()) {
+            endpoint = ENDPOINT;
+        } else {
+            endpoint = ENDPOINT + '?' + queryParameters.entrySet().stream().map(e -> e.getKey() + '=' + e.getValue()).collect(Collectors.joining("&"));
+        }
+    }
+
+    @OnUnscheduled
+    public void onUnscheduled() {
+        super.onUnscheduled();
+        contentType = null;
+        charset = null;
+        endpoint = null;
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+        final FlowFile incomingFlowFile = session.get();
+        FlowFile outgoingFlowFile = incomingFlowFile;
+        boolean success = false;
+
+        try {
+            final RequestMessage requestMessage = createRequestMessage(session, incomingFlowFile);
+            final ResponseMessage responseMessage = call(endpoint, requestMessage);
+            outgoingFlowFile = session.putAttribute(outgoingFlowFile, "splunk_status_code", String.valueOf(responseMessage.getStatus()));
+
+            switch (responseMessage.getStatus()) {
+                case 200:
+                    final SendRawDataSuccessResponse successResponse = extractResult(responseMessage.getContent(), SendRawDataSuccessResponse.class);
+
+                    if (successResponse.getCode() == 0) {
+                        outgoingFlowFile = enrichFlowFile(session, outgoingFlowFile, successResponse.getAckId());
+                        success = true;
+                    } else {
+                        outgoingFlowFile = session.putAttribute(outgoingFlowFile, "splunk_response_code", String.valueOf(successResponse.getCode()));

Review comment:
       Again with try to avoid sticking to the typical convention of splunk.response.code

##########
File path: nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/PutSplunkHTTP.java
##########
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.splunk;
+
+import com.splunk.RequestMessage;
+import com.splunk.ResponseMessage;
+import org.apache.commons.io.IOUtils;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.dto.splunk.SendRawDataResponse;
+import org.apache.nifi.dto.splunk.SendRawDataSuccessResponse;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import java.io.StringWriter;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Tags({"splunk", "logs", "http"})
+@CapabilityDescription("Sends events to the specified Splunk server over HTTP or HTTPS. Supports HEC Index Acknowledgement.")
+public class PutSplunkHTTP extends SplunkAPICall {
+    private static final String ENDPOINT = "/services/collector/raw";
+
+    static final PropertyDescriptor SOURCE = new PropertyDescriptor.Builder()
+            .name("source")
+            .displayName("Source")
+            .description("User-defined event source. Sets a default for all events when unspecified.")
+            .required(false)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    static final PropertyDescriptor SOURCE_TYPE = new PropertyDescriptor.Builder()
+            .name("source-type")
+            .displayName("Source Type")
+            .description("User-defined event sourcetype. Sets a default for all events when unspecified.")
+            .required(false)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    static final PropertyDescriptor HOST = new PropertyDescriptor.Builder()
+            .name("host")
+            .displayName("Host")
+            .description("Specify with the host query string parameter. Sets a default for all events when unspecified.")
+            .required(false)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    static final PropertyDescriptor INDEX = new PropertyDescriptor.Builder()
+            .name("index")
+            .displayName("Index")
+            .description("Index name. Specify with the index query string parameter. Sets a default for all events when unspecified.")
+            .required(false)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder()
+            .name("character-set")
+            .displayName("Character Set")
+            .description("The name of the character set.")
+            .required(true)
+            .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
+            .defaultValue(Charset.defaultCharset().name())
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    static final PropertyDescriptor CONTENT_TYPE = new PropertyDescriptor.Builder()
+            .name("content-type")
+            .displayName("Content Type")
+            .description(
+                    "The media type of the event sent to Splunk. " +
+                    "If not set, \"mime.type\" flow file attribute will be used. " +
+                    "In case of neither of them is specified, this information will not be sent to the server.")
+            .required(false)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    static final Relationship RELATIONSHIP_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("FlowFiles that are sent successfully to the destination are sent out this relationship.")
+            .build();
+
+    static final Relationship RELATIONSHIP_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("FlowFiles that failed to send to the destination are sent out this relationship.")
+            .build();
+
+    private static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
+            RELATIONSHIP_SUCCESS,
+            RELATIONSHIP_FAILURE)));
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return RELATIONSHIPS;
+    }
+
+    @Override
+    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        final List<PropertyDescriptor> result = new ArrayList<>(super.getSupportedPropertyDescriptors());
+        result.add(SOURCE);
+        result.add(SOURCE_TYPE);
+        result.add(HOST);
+        result.add(INDEX);
+        result.add(CONTENT_TYPE);
+        result.add(CHARSET);
+        return result;
+    }
+
+    private volatile String endpoint;
+    private volatile String contentType;
+    private volatile String charset;
+
+    @OnScheduled
+    public void onScheduled(final ProcessContext context) {
+        super.onScheduled(context);
+
+        if (context.getProperty(CONTENT_TYPE).isSet()) {
+            contentType = context.getProperty(CONTENT_TYPE).evaluateAttributeExpressions().getValue();
+        }
+
+        charset = context.getProperty(CHARSET).evaluateAttributeExpressions().getValue();
+
+        final Map<String, String> queryParameters = new HashMap<>();
+
+        if (context.getProperty(SOURCE_TYPE).isSet()) {
+            queryParameters.put("sourcetype", context.getProperty(SOURCE_TYPE).evaluateAttributeExpressions().getValue());
+        }
+
+        if (context.getProperty(SOURCE).isSet()) {
+            queryParameters.put("source", context.getProperty(SOURCE).evaluateAttributeExpressions().getValue());
+        }
+
+        if (context.getProperty(HOST).isSet()) {
+            queryParameters.put("host", context.getProperty(HOST).evaluateAttributeExpressions().getValue());
+        }
+
+        if (context.getProperty(INDEX).isSet()) {
+            queryParameters.put("index", context.getProperty(INDEX).evaluateAttributeExpressions().getValue());
+        }
+
+        if (queryParameters.isEmpty()) {
+            endpoint = ENDPOINT;
+        } else {
+            endpoint = ENDPOINT + '?' + queryParameters.entrySet().stream().map(e -> e.getKey() + '=' + e.getValue()).collect(Collectors.joining("&"));
+        }
+    }
+
+    @OnUnscheduled
+    public void onUnscheduled() {
+        super.onUnscheduled();
+        contentType = null;
+        charset = null;
+        endpoint = null;
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+        final FlowFile incomingFlowFile = session.get();
+        FlowFile outgoingFlowFile = incomingFlowFile;

Review comment:
       Is there a reason for declaring an incomingFlowFile, and then an outgoingFlowFile? It appears that the two represent the same FlowFile. So the more typical pattern would be to simply use `flowFile` as a variable:
   ```
   FlowFile flowFile = session.get();
   if (flowFIle == null) {
     return;
   }
   ```

##########
File path: nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/QuerySplunkIndexingStatus.java
##########
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.splunk;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.splunk.RequestMessage;
+import com.splunk.ResponseMessage;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.dto.splunk.EventIndexStatusRequest;
+import org.apache.nifi.dto.splunk.EventIndexStatusResponse;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Tags({"splunk", "logs", "http", "acknowledgement"})
+@CapabilityDescription("Queries Splunk server in order to acquire the status of indexing acknowledgement.")
+public class QuerySplunkIndexingStatus extends SplunkAPICall {
+    private static final String ENDPOINT = "/services/collector/ack";
+
+    static final Relationship RELATIONSHIP_ACKNOWLEDGED = new Relationship.Builder()
+            .name("success")
+            .description("A FlowFile is transferred into this relationship when the acknowledgement was successful.")
+            .build();
+
+    static final Relationship RELATIONSHIP_UNACKNOWLEDGED = new Relationship.Builder()
+            .name("unacknowledged")
+            .description("A FlowFile is transferred into this relationship when the acknowledgement was not successful.")
+            .build();
+
+    static final Relationship RELATIONSHIP_UNDETERMINED = new Relationship.Builder()
+            .name("undetermined")
+            .description(
+                    "A FlowFile is transferred into this relationship when the acknowledgement state is not determined. " +
+                    "Flow files transferred into this relationship might be penalized!")
+            .build();
+
+    static final Relationship RELATIONSHIP_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("A FlowFile is transferred into this relationship when the acknowledgement was not successful.")
+            .build();
+
+    private static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
+            RELATIONSHIP_ACKNOWLEDGED,
+            RELATIONSHIP_UNACKNOWLEDGED,
+            RELATIONSHIP_UNDETERMINED,
+            RELATIONSHIP_FAILURE
+    )));
+
+    static final PropertyDescriptor TTL = new PropertyDescriptor.Builder()
+            .name("ttl")
+            .displayName("Maximum waiting time")

Review comment:
       Best to keep consistent Title Case for property names (i.e., Maximum Waiting Time)

##########
File path: nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/PutSplunkHTTP.java
##########
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.splunk;
+
+import com.splunk.RequestMessage;
+import com.splunk.ResponseMessage;
+import org.apache.commons.io.IOUtils;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.dto.splunk.SendRawDataResponse;
+import org.apache.nifi.dto.splunk.SendRawDataSuccessResponse;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import java.io.StringWriter;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Tags({"splunk", "logs", "http"})
+@CapabilityDescription("Sends events to the specified Splunk server over HTTP or HTTPS. Supports HEC Index Acknowledgement.")
+public class PutSplunkHTTP extends SplunkAPICall {
+    private static final String ENDPOINT = "/services/collector/raw";
+
+    static final PropertyDescriptor SOURCE = new PropertyDescriptor.Builder()
+            .name("source")
+            .displayName("Source")
+            .description("User-defined event source. Sets a default for all events when unspecified.")
+            .required(false)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    static final PropertyDescriptor SOURCE_TYPE = new PropertyDescriptor.Builder()
+            .name("source-type")
+            .displayName("Source Type")
+            .description("User-defined event sourcetype. Sets a default for all events when unspecified.")
+            .required(false)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    static final PropertyDescriptor HOST = new PropertyDescriptor.Builder()
+            .name("host")
+            .displayName("Host")
+            .description("Specify with the host query string parameter. Sets a default for all events when unspecified.")
+            .required(false)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    static final PropertyDescriptor INDEX = new PropertyDescriptor.Builder()
+            .name("index")
+            .displayName("Index")
+            .description("Index name. Specify with the index query string parameter. Sets a default for all events when unspecified.")
+            .required(false)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder()
+            .name("character-set")
+            .displayName("Character Set")
+            .description("The name of the character set.")
+            .required(true)
+            .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
+            .defaultValue(Charset.defaultCharset().name())
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    static final PropertyDescriptor CONTENT_TYPE = new PropertyDescriptor.Builder()
+            .name("content-type")
+            .displayName("Content Type")
+            .description(
+                    "The media type of the event sent to Splunk. " +
+                    "If not set, \"mime.type\" flow file attribute will be used. " +
+                    "In case of neither of them is specified, this information will not be sent to the server.")
+            .required(false)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    static final Relationship RELATIONSHIP_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("FlowFiles that are sent successfully to the destination are sent out this relationship.")
+            .build();
+
+    static final Relationship RELATIONSHIP_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("FlowFiles that failed to send to the destination are sent out this relationship.")
+            .build();
+
+    private static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
+            RELATIONSHIP_SUCCESS,
+            RELATIONSHIP_FAILURE)));
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return RELATIONSHIPS;
+    }
+
+    @Override
+    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        final List<PropertyDescriptor> result = new ArrayList<>(super.getSupportedPropertyDescriptors());
+        result.add(SOURCE);
+        result.add(SOURCE_TYPE);
+        result.add(HOST);
+        result.add(INDEX);
+        result.add(CONTENT_TYPE);
+        result.add(CHARSET);
+        return result;
+    }
+
+    private volatile String endpoint;
+    private volatile String contentType;
+    private volatile String charset;
+
+    @OnScheduled
+    public void onScheduled(final ProcessContext context) {
+        super.onScheduled(context);
+
+        if (context.getProperty(CONTENT_TYPE).isSet()) {
+            contentType = context.getProperty(CONTENT_TYPE).evaluateAttributeExpressions().getValue();
+        }
+
+        charset = context.getProperty(CHARSET).evaluateAttributeExpressions().getValue();
+
+        final Map<String, String> queryParameters = new HashMap<>();
+
+        if (context.getProperty(SOURCE_TYPE).isSet()) {
+            queryParameters.put("sourcetype", context.getProperty(SOURCE_TYPE).evaluateAttributeExpressions().getValue());
+        }
+
+        if (context.getProperty(SOURCE).isSet()) {
+            queryParameters.put("source", context.getProperty(SOURCE).evaluateAttributeExpressions().getValue());
+        }
+
+        if (context.getProperty(HOST).isSet()) {
+            queryParameters.put("host", context.getProperty(HOST).evaluateAttributeExpressions().getValue());
+        }
+
+        if (context.getProperty(INDEX).isSet()) {
+            queryParameters.put("index", context.getProperty(INDEX).evaluateAttributeExpressions().getValue());
+        }
+
+        if (queryParameters.isEmpty()) {
+            endpoint = ENDPOINT;
+        } else {
+            endpoint = ENDPOINT + '?' + queryParameters.entrySet().stream().map(e -> e.getKey() + '=' + e.getValue()).collect(Collectors.joining("&"));
+        }
+    }
+
+    @OnUnscheduled
+    public void onUnscheduled() {
+        super.onUnscheduled();
+        contentType = null;
+        charset = null;
+        endpoint = null;
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+        final FlowFile incomingFlowFile = session.get();
+        FlowFile outgoingFlowFile = incomingFlowFile;
+        boolean success = false;
+
+        try {
+            final RequestMessage requestMessage = createRequestMessage(session, incomingFlowFile);
+            final ResponseMessage responseMessage = call(endpoint, requestMessage);
+            outgoingFlowFile = session.putAttribute(outgoingFlowFile, "splunk_status_code", String.valueOf(responseMessage.getStatus()));
+
+            switch (responseMessage.getStatus()) {
+                case 200:
+                    final SendRawDataSuccessResponse successResponse = extractResult(responseMessage.getContent(), SendRawDataSuccessResponse.class);
+
+                    if (successResponse.getCode() == 0) {
+                        outgoingFlowFile = enrichFlowFile(session, outgoingFlowFile, successResponse.getAckId());
+                        success = true;
+                    } else {
+                        outgoingFlowFile = session.putAttribute(outgoingFlowFile, "splunk_response_code", String.valueOf(successResponse.getCode()));
+                        getLogger().error("Putting data into Splunk was not successful: (" + successResponse.getCode() + ") " + successResponse.getText());
+                    }
+
+                    break;
+                case 503 : // HEC is unhealthy, queues are full
+                    context.yield();

Review comment:
       I assume that it is intentional that there is no `break` here and you intend for the code to fall-through? If so, can you add a simple 1-line comment indicating that the fall-through is intentional? Otherwise, it looks suspiciously like a bug.

##########
File path: nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/PutSplunkHTTP.java
##########
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.splunk;
+
+import com.splunk.RequestMessage;
+import com.splunk.ResponseMessage;
+import org.apache.commons.io.IOUtils;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.dto.splunk.SendRawDataResponse;
+import org.apache.nifi.dto.splunk.SendRawDataSuccessResponse;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import java.io.StringWriter;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Tags({"splunk", "logs", "http"})
+@CapabilityDescription("Sends events to the specified Splunk server over HTTP or HTTPS. Supports HEC Index Acknowledgement.")
+public class PutSplunkHTTP extends SplunkAPICall {
+    private static final String ENDPOINT = "/services/collector/raw";
+
+    static final PropertyDescriptor SOURCE = new PropertyDescriptor.Builder()
+            .name("source")
+            .displayName("Source")
+            .description("User-defined event source. Sets a default for all events when unspecified.")
+            .required(false)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    static final PropertyDescriptor SOURCE_TYPE = new PropertyDescriptor.Builder()
+            .name("source-type")
+            .displayName("Source Type")
+            .description("User-defined event sourcetype. Sets a default for all events when unspecified.")
+            .required(false)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    static final PropertyDescriptor HOST = new PropertyDescriptor.Builder()
+            .name("host")
+            .displayName("Host")
+            .description("Specify with the host query string parameter. Sets a default for all events when unspecified.")
+            .required(false)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    static final PropertyDescriptor INDEX = new PropertyDescriptor.Builder()
+            .name("index")
+            .displayName("Index")
+            .description("Index name. Specify with the index query string parameter. Sets a default for all events when unspecified.")
+            .required(false)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder()
+            .name("character-set")
+            .displayName("Character Set")
+            .description("The name of the character set.")
+            .required(true)
+            .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
+            .defaultValue(Charset.defaultCharset().name())
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    static final PropertyDescriptor CONTENT_TYPE = new PropertyDescriptor.Builder()
+            .name("content-type")
+            .displayName("Content Type")
+            .description(
+                    "The media type of the event sent to Splunk. " +
+                    "If not set, \"mime.type\" flow file attribute will be used. " +
+                    "In case of neither of them is specified, this information will not be sent to the server.")
+            .required(false)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    static final Relationship RELATIONSHIP_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("FlowFiles that are sent successfully to the destination are sent out this relationship.")
+            .build();
+
+    static final Relationship RELATIONSHIP_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("FlowFiles that failed to send to the destination are sent out this relationship.")
+            .build();
+
+    private static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
+            RELATIONSHIP_SUCCESS,
+            RELATIONSHIP_FAILURE)));
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return RELATIONSHIPS;
+    }
+
+    @Override
+    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        final List<PropertyDescriptor> result = new ArrayList<>(super.getSupportedPropertyDescriptors());
+        result.add(SOURCE);
+        result.add(SOURCE_TYPE);
+        result.add(HOST);
+        result.add(INDEX);
+        result.add(CONTENT_TYPE);
+        result.add(CHARSET);
+        return result;
+    }
+
+    private volatile String endpoint;
+    private volatile String contentType;
+    private volatile String charset;
+
+    @OnScheduled
+    public void onScheduled(final ProcessContext context) {
+        super.onScheduled(context);
+
+        if (context.getProperty(CONTENT_TYPE).isSet()) {
+            contentType = context.getProperty(CONTENT_TYPE).evaluateAttributeExpressions().getValue();
+        }
+
+        charset = context.getProperty(CHARSET).evaluateAttributeExpressions().getValue();
+
+        final Map<String, String> queryParameters = new HashMap<>();
+
+        if (context.getProperty(SOURCE_TYPE).isSet()) {
+            queryParameters.put("sourcetype", context.getProperty(SOURCE_TYPE).evaluateAttributeExpressions().getValue());
+        }
+
+        if (context.getProperty(SOURCE).isSet()) {
+            queryParameters.put("source", context.getProperty(SOURCE).evaluateAttributeExpressions().getValue());
+        }
+
+        if (context.getProperty(HOST).isSet()) {
+            queryParameters.put("host", context.getProperty(HOST).evaluateAttributeExpressions().getValue());
+        }
+
+        if (context.getProperty(INDEX).isSet()) {
+            queryParameters.put("index", context.getProperty(INDEX).evaluateAttributeExpressions().getValue());
+        }
+
+        if (queryParameters.isEmpty()) {
+            endpoint = ENDPOINT;
+        } else {
+            endpoint = ENDPOINT + '?' + queryParameters.entrySet().stream().map(e -> e.getKey() + '=' + e.getValue()).collect(Collectors.joining("&"));
+        }
+    }
+
+    @OnUnscheduled
+    public void onUnscheduled() {
+        super.onUnscheduled();
+        contentType = null;
+        charset = null;
+        endpoint = null;
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+        final FlowFile incomingFlowFile = session.get();

Review comment:
       Need to check for `null` and return if so.

##########
File path: nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/QuerySplunkIndexingStatus.java
##########
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.splunk;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.splunk.RequestMessage;
+import com.splunk.ResponseMessage;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.dto.splunk.EventIndexStatusRequest;
+import org.apache.nifi.dto.splunk.EventIndexStatusResponse;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Tags({"splunk", "logs", "http", "acknowledgement"})
+@CapabilityDescription("Queries Splunk server in order to acquire the status of indexing acknowledgement.")
+public class QuerySplunkIndexingStatus extends SplunkAPICall {
+    private static final String ENDPOINT = "/services/collector/ack";
+
+    static final Relationship RELATIONSHIP_ACKNOWLEDGED = new Relationship.Builder()
+            .name("success")
+            .description("A FlowFile is transferred into this relationship when the acknowledgement was successful.")
+            .build();
+
+    static final Relationship RELATIONSHIP_UNACKNOWLEDGED = new Relationship.Builder()
+            .name("unacknowledged")
+            .description("A FlowFile is transferred into this relationship when the acknowledgement was not successful.")
+            .build();
+
+    static final Relationship RELATIONSHIP_UNDETERMINED = new Relationship.Builder()
+            .name("undetermined")
+            .description(
+                    "A FlowFile is transferred into this relationship when the acknowledgement state is not determined. " +
+                    "Flow files transferred into this relationship might be penalized!")

Review comment:
       Under what conditions might they be penalized?

##########
File path: nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/QuerySplunkIndexingStatus.java
##########
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.splunk;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.splunk.RequestMessage;
+import com.splunk.ResponseMessage;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.dto.splunk.EventIndexStatusRequest;
+import org.apache.nifi.dto.splunk.EventIndexStatusResponse;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Tags({"splunk", "logs", "http", "acknowledgement"})
+@CapabilityDescription("Queries Splunk server in order to acquire the status of indexing acknowledgement.")
+public class QuerySplunkIndexingStatus extends SplunkAPICall {
+    private static final String ENDPOINT = "/services/collector/ack";
+
+    static final Relationship RELATIONSHIP_ACKNOWLEDGED = new Relationship.Builder()
+            .name("success")
+            .description("A FlowFile is transferred into this relationship when the acknowledgement was successful.")
+            .build();
+
+    static final Relationship RELATIONSHIP_UNACKNOWLEDGED = new Relationship.Builder()
+            .name("unacknowledged")
+            .description("A FlowFile is transferred into this relationship when the acknowledgement was not successful.")
+            .build();
+
+    static final Relationship RELATIONSHIP_UNDETERMINED = new Relationship.Builder()
+            .name("undetermined")
+            .description(
+                    "A FlowFile is transferred into this relationship when the acknowledgement state is not determined. " +
+                    "Flow files transferred into this relationship might be penalized!")
+            .build();
+
+    static final Relationship RELATIONSHIP_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("A FlowFile is transferred into this relationship when the acknowledgement was not successful.")
+            .build();
+
+    private static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
+            RELATIONSHIP_ACKNOWLEDGED,
+            RELATIONSHIP_UNACKNOWLEDGED,
+            RELATIONSHIP_UNDETERMINED,
+            RELATIONSHIP_FAILURE
+    )));
+
+    static final PropertyDescriptor TTL = new PropertyDescriptor.Builder()
+            .name("ttl")
+            .displayName("Maximum waiting time")
+            .description(
+                    "The maximum time the service tries to acquire acknowledgement confirmation for an index, from the point of registration. " +
+                    "After the given amount of time, the service considers the index as not acknowledged and moves it into the output buffer as failed acknowledgement.")
+            .defaultValue("100 secs")
+            .required(true)
+            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor MAX_QUERY_SIZE = new PropertyDescriptor.Builder()
+            .name("max-query-size")
+            .displayName("Maximum Query Size")
+            .description(
+                    "The maximum number of acknowledgement identifiers the service query status for in one batch. " +
+                    "It is suggested to not set it too low in order to reduce network communication.")

Review comment:
       It is suggested not to set it too low. Is there any trade-off? Many users will see this and decided to set it to `999999999` as there's no indication that a higher value may result in any concern.

##########
File path: nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/QuerySplunkIndexingStatus.java
##########
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.splunk;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.splunk.RequestMessage;
+import com.splunk.ResponseMessage;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.dto.splunk.EventIndexStatusRequest;
+import org.apache.nifi.dto.splunk.EventIndexStatusResponse;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Tags({"splunk", "logs", "http", "acknowledgement"})
+@CapabilityDescription("Queries Splunk server in order to acquire the status of indexing acknowledgement.")
+public class QuerySplunkIndexingStatus extends SplunkAPICall {
+    private static final String ENDPOINT = "/services/collector/ack";
+
+    static final Relationship RELATIONSHIP_ACKNOWLEDGED = new Relationship.Builder()
+            .name("success")
+            .description("A FlowFile is transferred into this relationship when the acknowledgement was successful.")
+            .build();
+
+    static final Relationship RELATIONSHIP_UNACKNOWLEDGED = new Relationship.Builder()
+            .name("unacknowledged")
+            .description("A FlowFile is transferred into this relationship when the acknowledgement was not successful.")
+            .build();
+
+    static final Relationship RELATIONSHIP_UNDETERMINED = new Relationship.Builder()
+            .name("undetermined")
+            .description(
+                    "A FlowFile is transferred into this relationship when the acknowledgement state is not determined. " +
+                    "Flow files transferred into this relationship might be penalized!")
+            .build();
+
+    static final Relationship RELATIONSHIP_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("A FlowFile is transferred into this relationship when the acknowledgement was not successful.")
+            .build();
+
+    private static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
+            RELATIONSHIP_ACKNOWLEDGED,
+            RELATIONSHIP_UNACKNOWLEDGED,
+            RELATIONSHIP_UNDETERMINED,
+            RELATIONSHIP_FAILURE
+    )));
+
+    static final PropertyDescriptor TTL = new PropertyDescriptor.Builder()
+            .name("ttl")
+            .displayName("Maximum waiting time")
+            .description(
+                    "The maximum time the service tries to acquire acknowledgement confirmation for an index, from the point of registration. " +
+                    "After the given amount of time, the service considers the index as not acknowledged and moves it into the output buffer as failed acknowledgement.")
+            .defaultValue("100 secs")
+            .required(true)
+            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor MAX_QUERY_SIZE = new PropertyDescriptor.Builder()
+            .name("max-query-size")
+            .displayName("Maximum Query Size")
+            .description(
+                    "The maximum number of acknowledgement identifiers the service query status for in one batch. " +
+                    "It is suggested to not set it too low in order to reduce network communication.")
+            .defaultValue("100")
+            .required(true)
+            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+            .build();
+
+    private volatile Integer maxQuerySize;
+    private volatile Integer ttl;
+
+    @Override
+    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        final List<PropertyDescriptor> result = new ArrayList<>();
+        final List<PropertyDescriptor> common = super.getSupportedPropertyDescriptors();
+        result.addAll(common);
+        result.add(TTL);
+        result.add(MAX_QUERY_SIZE);
+        return result;
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return RELATIONSHIPS;
+    }
+
+    @OnScheduled
+    public void onScheduled(final ProcessContext context) {
+        super.onScheduled(context);
+        maxQuerySize = context.getProperty(MAX_QUERY_SIZE).asInteger();
+        ttl = context.getProperty(TTL).asTimePeriod(TimeUnit.MILLISECONDS).intValue();
+    }
+
+    @OnUnscheduled
+    public void onUnscheduled() {
+        super.onUnscheduled();
+        maxQuerySize = null;
+        ttl = null;
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+        final long currentTime = System.currentTimeMillis();
+        final List<FlowFile> flowFiles = session.get(maxQuerySize);

Review comment:
       Should check if `flowFiles.isEmpty()` and if so return immediately to avoid any necessary processing. Would even more the call to `System.currentTimeMillis();` after checking that, as there's no need to make a system call if we're not going to use the result.

##########
File path: nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/SplunkAPICall.java
##########
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.splunk;
+
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.splunk.RequestMessage;
+import com.splunk.ResponseMessage;
+import com.splunk.SSLSecurityProtocol;
+import com.splunk.Service;
+import com.splunk.ServiceArgs;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Arrays;
+import java.util.List;
+
+abstract class SplunkAPICall extends AbstractProcessor {
+    private static final String REQUEST_CHANNEL_HEADER_NAME = "X-Splunk-Request-Channel";
+
+    private static final String HTTP_SCHEME = "http";
+    private static final String HTTPS_SCHEME = "https";
+
+    private static final AllowableValue TLS_1_2_VALUE = new AllowableValue(SSLSecurityProtocol.TLSv1_2.name(), SSLSecurityProtocol.TLSv1_2.name());
+    private static final AllowableValue TLS_1_1_VALUE = new AllowableValue(SSLSecurityProtocol.TLSv1_1.name(), SSLSecurityProtocol.TLSv1_1.name());
+    private static final AllowableValue TLS_1_VALUE = new AllowableValue(SSLSecurityProtocol.TLSv1.name(), SSLSecurityProtocol.TLSv1.name());
+    private static final AllowableValue SSL_3_VALUE = new AllowableValue(SSLSecurityProtocol.SSLv3.name(), SSLSecurityProtocol.SSLv3.name());
+
+    static final PropertyDescriptor SCHEME = new PropertyDescriptor.Builder()
+            .name("Scheme")
+            .description("The scheme for connecting to Splunk.")
+            .allowableValues(HTTPS_SCHEME, HTTP_SCHEME)
+            .defaultValue(HTTPS_SCHEME)
+            .required(true)
+            .build();
+
+    static final PropertyDescriptor HOSTNAME = new PropertyDescriptor.Builder()
+            .name("Hostname")
+            .description("The ip address or hostname of the Splunk server.")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .defaultValue("localhost")
+            .required(true)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    static final PropertyDescriptor PORT = new PropertyDescriptor
+            .Builder().name("Port")
+            .description("The HTTP Port Number of the Splunk server.")
+            .required(true)
+            .addValidator(StandardValidators.PORT_VALIDATOR)
+            .defaultValue("9088")
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    static final PropertyDescriptor SECURITY_PROTOCOL = new PropertyDescriptor.Builder()
+            .name("Security Protocol")
+            .description("The security protocol to use for communicating with Splunk.")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .allowableValues(TLS_1_2_VALUE, TLS_1_1_VALUE, TLS_1_VALUE, SSL_3_VALUE)
+            .defaultValue(TLS_1_2_VALUE.getValue())
+            .build();
+
+    static final PropertyDescriptor OWNER = new PropertyDescriptor.Builder()
+            .name("Owner")
+            .description("The owner to pass to Splunk.")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(false)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    static final PropertyDescriptor TOKEN = new PropertyDescriptor.Builder()
+            .name("Token")
+            .description("The token to pass to Splunk.")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(false)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    static final PropertyDescriptor USERNAME = new PropertyDescriptor.Builder()
+            .name("Username")
+            .description("The username to authenticate to Splunk.")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(false)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    static final PropertyDescriptor PASSWORD = new PropertyDescriptor.Builder()
+            .name("Password")
+            .description("The password to authenticate to Splunk.")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(false)
+            .sensitive(true)
+            .build();
+
+    static final PropertyDescriptor REQUEST_CHANNEL = new PropertyDescriptor.Builder()
+            .name("request-channel")
+            .displayName("Splunk Request Channel")
+            .description("Identifier of the used request channel.")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    static final PropertyDescriptor SPLUNK_ACK_ID_ATTRIBUTE = new PropertyDescriptor.Builder()
+            .name("splunk-ack-id-attribute-name")
+            .displayName("Splunk Acknowledgement Id Attribute Name")
+            .description("Specifies which flow file attribute will be used to store the Splunk acknowledgement id.")
+            .defaultValue("splunk_acknowledgement_id")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    static final PropertyDescriptor SPLUNK_SENT_AT_ATTRIBUTE = new PropertyDescriptor.Builder()
+            .name("splunk-sent-at-attribute")
+            .displayName("Splunk Sent At Attribute Name")
+            .description("Specifies which flow file attribute will be used to store the time of sending the event into Splunk.")
+            .defaultValue("splunk_send_at")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    protected static final List<PropertyDescriptor> PROPERTIES = Arrays.asList(
+            SCHEME,
+            HOSTNAME,
+            PORT,
+            SECURITY_PROTOCOL,
+            OWNER,
+            TOKEN,
+            USERNAME,
+            PASSWORD,
+            REQUEST_CHANNEL,
+            SPLUNK_ACK_ID_ATTRIBUTE,
+            SPLUNK_SENT_AT_ATTRIBUTE
+    );
+
+    protected final JsonFactory jsonFactory = new JsonFactory();
+    protected final ObjectMapper jsonObjectMapper = new ObjectMapper(jsonFactory);
+
+    protected volatile ServiceArgs splunkServiceArguments;
+    protected volatile Service splunkService;
+    protected volatile String requestChannel;
+    protected volatile String ackIdAttributeName;
+    protected volatile String insertedAtAttributeName;
+
+    @Override
+    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return SplunkAPICall.PROPERTIES;
+    }
+
+    @OnScheduled
+    public void onScheduled(final ProcessContext context) {
+        splunkServiceArguments = getSplunkServiceArgs(context);
+        splunkService = getSplunkService(splunkServiceArguments);
+        requestChannel = context.getProperty(SplunkAPICall.REQUEST_CHANNEL).evaluateAttributeExpressions().getValue();
+        ackIdAttributeName = context.getProperty(SplunkAPICall.SPLUNK_ACK_ID_ATTRIBUTE).evaluateAttributeExpressions().getValue();
+        insertedAtAttributeName = context.getProperty(SplunkAPICall.SPLUNK_SENT_AT_ATTRIBUTE).evaluateAttributeExpressions().getValue();
+    }
+
+    private ServiceArgs getSplunkServiceArgs(final ProcessContext context) {
+        final ServiceArgs splunkServiceArguments = new ServiceArgs();
+
+        splunkServiceArguments.setScheme(context.getProperty(SCHEME).getValue());
+        splunkServiceArguments.setHost(context.getProperty(HOSTNAME).evaluateAttributeExpressions().getValue());
+        splunkServiceArguments.setPort(context.getProperty(PORT).evaluateAttributeExpressions().asInteger());
+
+        if (context.getProperty(OWNER).isSet()) {
+            splunkServiceArguments.setOwner(context.getProperty(OWNER).evaluateAttributeExpressions().getValue());
+        }
+
+        if (context.getProperty(TOKEN).isSet()) {
+            splunkServiceArguments.setToken(context.getProperty(TOKEN).evaluateAttributeExpressions().getValue());
+        }
+
+        if (context.getProperty(USERNAME).isSet()) {
+            splunkServiceArguments.setUsername(context.getProperty(USERNAME).evaluateAttributeExpressions().getValue());
+        }
+
+        if (context.getProperty(PASSWORD).isSet()) {
+            splunkServiceArguments.setPassword(context.getProperty(PASSWORD).getValue());
+        }
+
+        if (HTTPS_SCHEME.equals(context.getProperty(SCHEME).getValue()) && context.getProperty(SECURITY_PROTOCOL).isSet()) {
+            splunkServiceArguments.setSSLSecurityProtocol(SSLSecurityProtocol.valueOf(context.getProperty(SECURITY_PROTOCOL).getValue()));
+        }
+
+        return splunkServiceArguments;
+    }
+
+    protected Service getSplunkService(final ServiceArgs splunkServiceArguments) {
+        return Service.connect(splunkServiceArguments);
+    }
+
+    @OnUnscheduled
+    public void onUnscheduled() {

Review comment:
       Again, needs @OnStopped annotation, not @OnUnscheduled

##########
File path: nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/resources/docs/org.apache.nifi.processors.splunk.PutSplunkHTTP/additionalDetails.html
##########
@@ -0,0 +1,76 @@
+<!DOCTYPE html>
+<html lang="en">
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+      http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<head>
+    <meta charset="utf-8" />
+    <title>PutSplunkHTTP</title>
+    <link rel="stylesheet" href="/nifi-docs/css/component-usage.css" type="text/css" />
+</head>
+
+<body>
+<h2>PutSplunkHTTP</h2>
+
+<p>
+    This processor serves as a counterpart for PutSplunk processor. While the later solves communication using TCP and
+    UDP protocols, PutSplunkHTTP aims to send events into Splunk via HTTP or HTTPS. In this fashion, this processor
+    shows similarities with GetSplunk processor and the properties relevant to the connection with Splunk server are
+    identical. There are however some aspects unique for this processor:
+</p>
+
+<h3>Content details</h3>
+
+<p>
+    PutSplunkHTTP allows the user to specify some metadata about the event is being sent to the Splunk. These include: the
+    "Character Type" and the "Content Type" of the flow file content, using the matching properties. If the incoming

Review comment:
       I think that should read the Character Set, rather than Character Type

##########
File path: nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/QuerySplunkIndexingStatus.java
##########
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.splunk;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.splunk.RequestMessage;
+import com.splunk.ResponseMessage;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.dto.splunk.EventIndexStatusRequest;
+import org.apache.nifi.dto.splunk.EventIndexStatusResponse;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Tags({"splunk", "logs", "http", "acknowledgement"})
+@CapabilityDescription("Queries Splunk server in order to acquire the status of indexing acknowledgement.")
+public class QuerySplunkIndexingStatus extends SplunkAPICall {
+    private static final String ENDPOINT = "/services/collector/ack";
+
+    static final Relationship RELATIONSHIP_ACKNOWLEDGED = new Relationship.Builder()
+            .name("success")
+            .description("A FlowFile is transferred into this relationship when the acknowledgement was successful.")
+            .build();
+
+    static final Relationship RELATIONSHIP_UNACKNOWLEDGED = new Relationship.Builder()
+            .name("unacknowledged")
+            .description("A FlowFile is transferred into this relationship when the acknowledgement was not successful.")
+            .build();
+
+    static final Relationship RELATIONSHIP_UNDETERMINED = new Relationship.Builder()
+            .name("undetermined")
+            .description(
+                    "A FlowFile is transferred into this relationship when the acknowledgement state is not determined. " +
+                    "Flow files transferred into this relationship might be penalized!")
+            .build();
+
+    static final Relationship RELATIONSHIP_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("A FlowFile is transferred into this relationship when the acknowledgement was not successful.")
+            .build();
+
+    private static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
+            RELATIONSHIP_ACKNOWLEDGED,
+            RELATIONSHIP_UNACKNOWLEDGED,
+            RELATIONSHIP_UNDETERMINED,
+            RELATIONSHIP_FAILURE
+    )));
+
+    static final PropertyDescriptor TTL = new PropertyDescriptor.Builder()
+            .name("ttl")
+            .displayName("Maximum waiting time")
+            .description(
+                    "The maximum time the service tries to acquire acknowledgement confirmation for an index, from the point of registration. " +
+                    "After the given amount of time, the service considers the index as not acknowledged and moves it into the output buffer as failed acknowledgement.")
+            .defaultValue("100 secs")
+            .required(true)
+            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor MAX_QUERY_SIZE = new PropertyDescriptor.Builder()
+            .name("max-query-size")
+            .displayName("Maximum Query Size")
+            .description(
+                    "The maximum number of acknowledgement identifiers the service query status for in one batch. " +
+                    "It is suggested to not set it too low in order to reduce network communication.")
+            .defaultValue("100")
+            .required(true)
+            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+            .build();
+
+    private volatile Integer maxQuerySize;
+    private volatile Integer ttl;
+
+    @Override
+    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        final List<PropertyDescriptor> result = new ArrayList<>();
+        final List<PropertyDescriptor> common = super.getSupportedPropertyDescriptors();
+        result.addAll(common);
+        result.add(TTL);
+        result.add(MAX_QUERY_SIZE);
+        return result;
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return RELATIONSHIPS;
+    }
+
+    @OnScheduled
+    public void onScheduled(final ProcessContext context) {
+        super.onScheduled(context);
+        maxQuerySize = context.getProperty(MAX_QUERY_SIZE).asInteger();
+        ttl = context.getProperty(TTL).asTimePeriod(TimeUnit.MILLISECONDS).intValue();
+    }
+
+    @OnUnscheduled
+    public void onUnscheduled() {
+        super.onUnscheduled();
+        maxQuerySize = null;
+        ttl = null;
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+        final long currentTime = System.currentTimeMillis();
+        final List<FlowFile> flowFiles = session.get(maxQuerySize);
+        final Map<Integer, FlowFile> undetermined = new HashMap<>();
+
+        for (final FlowFile flowFile : flowFiles)  {
+            final String insertedAt = flowFile.getAttribute(insertedAtAttributeName);
+            final String ackId = flowFile.getAttribute(ackIdAttributeName);
+
+            if (ackId == null || insertedAt == null) {
+                getLogger().error("Flow file attributes \"" + insertedAtAttributeName + "\" and \"" + ackIdAttributeName + "\" are needed!");
+                session.transfer(flowFile, RELATIONSHIP_FAILURE);
+            } else if (Long.valueOf(insertedAt) + ttl < currentTime) {
+                session.transfer(flowFile, RELATIONSHIP_UNACKNOWLEDGED);
+            } else {
+                undetermined.put(Integer.valueOf(flowFile.getAttribute(ackIdAttributeName)), flowFile);
+            }
+        }
+
+        if (undetermined.isEmpty()) {
+            getLogger().info("There was no eligible flow file to send request to Splunk.");
+            return;
+        }
+
+        try {
+            final RequestMessage requestMessage = createRequestMessage(undetermined);
+
+            final ResponseMessage responseMessage = call(ENDPOINT, requestMessage);
+
+            if (responseMessage.getStatus() == 200) {
+                final EventIndexStatusResponse splunkResponse = extractResult(responseMessage.getContent(), EventIndexStatusResponse.class);
+
+                splunkResponse.getAcks().entrySet().stream().forEach(result -> {
+                    if (result.getValue()) {
+                        session.transfer(undetermined.get(result.getKey()), RELATIONSHIP_ACKNOWLEDGED);

Review comment:
       Several calls to `undetermined.get(result.getKey())` here. Probably best to just extract that out:
   ```
   FlowFile toTransfer = undetermined.get(result.getKey());
   ```
   and then reference that variable.

##########
File path: nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/resources/docs/org.apache.nifi.processors.splunk.PutSplunkHTTP/additionalDetails.html
##########
@@ -0,0 +1,76 @@
+<!DOCTYPE html>
+<html lang="en">
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+      http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<head>
+    <meta charset="utf-8" />
+    <title>PutSplunkHTTP</title>
+    <link rel="stylesheet" href="/nifi-docs/css/component-usage.css" type="text/css" />
+</head>
+
+<body>
+<h2>PutSplunkHTTP</h2>
+
+<p>
+    This processor serves as a counterpart for PutSplunk processor. While the later solves communication using TCP and
+    UDP protocols, PutSplunkHTTP aims to send events into Splunk via HTTP or HTTPS. In this fashion, this processor
+    shows similarities with GetSplunk processor and the properties relevant to the connection with Splunk server are
+    identical. There are however some aspects unique for this processor:
+</p>
+
+<h3>Content details</h3>
+
+<p>
+    PutSplunkHTTP allows the user to specify some metadata about the event is being sent to the Splunk. These include: the
+    "Character Type" and the "Content Type" of the flow file content, using the matching properties. If the incoming
+    flow file has "mime.type" attribute, the processor will use it, expect the "Content Type" property is set, in which

Review comment:
       "expect" here i think should have been "unless"

##########
File path: nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/QuerySplunkIndexingStatus.java
##########
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.splunk;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.splunk.RequestMessage;
+import com.splunk.ResponseMessage;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.dto.splunk.EventIndexStatusRequest;
+import org.apache.nifi.dto.splunk.EventIndexStatusResponse;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Tags({"splunk", "logs", "http", "acknowledgement"})
+@CapabilityDescription("Queries Splunk server in order to acquire the status of indexing acknowledgement.")
+public class QuerySplunkIndexingStatus extends SplunkAPICall {
+    private static final String ENDPOINT = "/services/collector/ack";
+
+    static final Relationship RELATIONSHIP_ACKNOWLEDGED = new Relationship.Builder()
+            .name("success")
+            .description("A FlowFile is transferred into this relationship when the acknowledgement was successful.")
+            .build();
+
+    static final Relationship RELATIONSHIP_UNACKNOWLEDGED = new Relationship.Builder()
+            .name("unacknowledged")
+            .description("A FlowFile is transferred into this relationship when the acknowledgement was not successful.")
+            .build();
+
+    static final Relationship RELATIONSHIP_UNDETERMINED = new Relationship.Builder()
+            .name("undetermined")
+            .description(
+                    "A FlowFile is transferred into this relationship when the acknowledgement state is not determined. " +
+                    "Flow files transferred into this relationship might be penalized!")
+            .build();
+
+    static final Relationship RELATIONSHIP_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("A FlowFile is transferred into this relationship when the acknowledgement was not successful.")
+            .build();
+
+    private static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
+            RELATIONSHIP_ACKNOWLEDGED,
+            RELATIONSHIP_UNACKNOWLEDGED,
+            RELATIONSHIP_UNDETERMINED,
+            RELATIONSHIP_FAILURE
+    )));
+
+    static final PropertyDescriptor TTL = new PropertyDescriptor.Builder()
+            .name("ttl")
+            .displayName("Maximum waiting time")
+            .description(
+                    "The maximum time the service tries to acquire acknowledgement confirmation for an index, from the point of registration. " +
+                    "After the given amount of time, the service considers the index as not acknowledged and moves it into the output buffer as failed acknowledgement.")
+            .defaultValue("100 secs")
+            .required(true)
+            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor MAX_QUERY_SIZE = new PropertyDescriptor.Builder()
+            .name("max-query-size")
+            .displayName("Maximum Query Size")
+            .description(
+                    "The maximum number of acknowledgement identifiers the service query status for in one batch. " +
+                    "It is suggested to not set it too low in order to reduce network communication.")
+            .defaultValue("100")
+            .required(true)
+            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+            .build();
+
+    private volatile Integer maxQuerySize;
+    private volatile Integer ttl;
+
+    @Override
+    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        final List<PropertyDescriptor> result = new ArrayList<>();
+        final List<PropertyDescriptor> common = super.getSupportedPropertyDescriptors();
+        result.addAll(common);
+        result.add(TTL);
+        result.add(MAX_QUERY_SIZE);
+        return result;
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return RELATIONSHIPS;
+    }
+
+    @OnScheduled
+    public void onScheduled(final ProcessContext context) {
+        super.onScheduled(context);
+        maxQuerySize = context.getProperty(MAX_QUERY_SIZE).asInteger();
+        ttl = context.getProperty(TTL).asTimePeriod(TimeUnit.MILLISECONDS).intValue();
+    }
+
+    @OnUnscheduled
+    public void onUnscheduled() {
+        super.onUnscheduled();
+        maxQuerySize = null;
+        ttl = null;
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+        final long currentTime = System.currentTimeMillis();
+        final List<FlowFile> flowFiles = session.get(maxQuerySize);
+        final Map<Integer, FlowFile> undetermined = new HashMap<>();
+
+        for (final FlowFile flowFile : flowFiles)  {
+            final String insertedAt = flowFile.getAttribute(insertedAtAttributeName);
+            final String ackId = flowFile.getAttribute(ackIdAttributeName);
+
+            if (ackId == null || insertedAt == null) {
+                getLogger().error("Flow file attributes \"" + insertedAtAttributeName + "\" and \"" + ackIdAttributeName + "\" are needed!");
+                session.transfer(flowFile, RELATIONSHIP_FAILURE);
+            } else if (Long.valueOf(insertedAt) + ttl < currentTime) {
+                session.transfer(flowFile, RELATIONSHIP_UNACKNOWLEDGED);
+            } else {
+                undetermined.put(Integer.valueOf(flowFile.getAttribute(ackIdAttributeName)), flowFile);
+            }
+        }
+
+        if (undetermined.isEmpty()) {
+            getLogger().info("There was no eligible flow file to send request to Splunk.");
+            return;
+        }
+
+        try {
+            final RequestMessage requestMessage = createRequestMessage(undetermined);
+
+            final ResponseMessage responseMessage = call(ENDPOINT, requestMessage);
+
+            if (responseMessage.getStatus() == 200) {
+                final EventIndexStatusResponse splunkResponse = extractResult(responseMessage.getContent(), EventIndexStatusResponse.class);
+
+                splunkResponse.getAcks().entrySet().stream().forEach(result -> {
+                    if (result.getValue()) {
+                        session.transfer(undetermined.get(result.getKey()), RELATIONSHIP_ACKNOWLEDGED);
+                    } else {
+                        session.penalize(undetermined.get(result.getKey()));
+                        session.transfer(undetermined.get(result.getKey()), RELATIONSHIP_UNDETERMINED);
+                    }
+                });
+            } else {
+                getLogger().error("Query index status was not successful because of (" + responseMessage.getStatus() + ") " + responseMessage.getContent());
+                context.yield();
+                session.transfer(undetermined.values(), RELATIONSHIP_UNDETERMINED);
+            }
+        } catch (final IOException e) {
+            throw new ProcessException(e);

Review comment:
       Rather than throw a ProcessException here, we should transfer the FlowFiles to failure. If there's a timeout or Splunk is unreachable, we don't necessarily want to just backlog the data indefinitely but rather give the user to choice so that they can retry X number of times, etc.

##########
File path: nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/QuerySplunkIndexingStatus.java
##########
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.splunk;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.splunk.RequestMessage;
+import com.splunk.ResponseMessage;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.dto.splunk.EventIndexStatusRequest;
+import org.apache.nifi.dto.splunk.EventIndexStatusResponse;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Tags({"splunk", "logs", "http", "acknowledgement"})
+@CapabilityDescription("Queries Splunk server in order to acquire the status of indexing acknowledgement.")
+public class QuerySplunkIndexingStatus extends SplunkAPICall {
+    private static final String ENDPOINT = "/services/collector/ack";
+
+    static final Relationship RELATIONSHIP_ACKNOWLEDGED = new Relationship.Builder()
+            .name("success")
+            .description("A FlowFile is transferred into this relationship when the acknowledgement was successful.")
+            .build();
+
+    static final Relationship RELATIONSHIP_UNACKNOWLEDGED = new Relationship.Builder()
+            .name("unacknowledged")
+            .description("A FlowFile is transferred into this relationship when the acknowledgement was not successful.")
+            .build();
+
+    static final Relationship RELATIONSHIP_UNDETERMINED = new Relationship.Builder()
+            .name("undetermined")
+            .description(
+                    "A FlowFile is transferred into this relationship when the acknowledgement state is not determined. " +
+                    "Flow files transferred into this relationship might be penalized!")
+            .build();
+
+    static final Relationship RELATIONSHIP_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("A FlowFile is transferred into this relationship when the acknowledgement was not successful.")
+            .build();
+
+    private static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
+            RELATIONSHIP_ACKNOWLEDGED,
+            RELATIONSHIP_UNACKNOWLEDGED,
+            RELATIONSHIP_UNDETERMINED,
+            RELATIONSHIP_FAILURE
+    )));
+
+    static final PropertyDescriptor TTL = new PropertyDescriptor.Builder()
+            .name("ttl")
+            .displayName("Maximum waiting time")
+            .description(
+                    "The maximum time the service tries to acquire acknowledgement confirmation for an index, from the point of registration. " +
+                    "After the given amount of time, the service considers the index as not acknowledged and moves it into the output buffer as failed acknowledgement.")
+            .defaultValue("100 secs")
+            .required(true)
+            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor MAX_QUERY_SIZE = new PropertyDescriptor.Builder()
+            .name("max-query-size")
+            .displayName("Maximum Query Size")
+            .description(
+                    "The maximum number of acknowledgement identifiers the service query status for in one batch. " +
+                    "It is suggested to not set it too low in order to reduce network communication.")
+            .defaultValue("100")
+            .required(true)
+            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+            .build();
+
+    private volatile Integer maxQuerySize;
+    private volatile Integer ttl;
+
+    @Override
+    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        final List<PropertyDescriptor> result = new ArrayList<>();
+        final List<PropertyDescriptor> common = super.getSupportedPropertyDescriptors();
+        result.addAll(common);
+        result.add(TTL);
+        result.add(MAX_QUERY_SIZE);
+        return result;
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return RELATIONSHIPS;
+    }
+
+    @OnScheduled
+    public void onScheduled(final ProcessContext context) {
+        super.onScheduled(context);
+        maxQuerySize = context.getProperty(MAX_QUERY_SIZE).asInteger();
+        ttl = context.getProperty(TTL).asTimePeriod(TimeUnit.MILLISECONDS).intValue();
+    }
+
+    @OnUnscheduled
+    public void onUnscheduled() {
+        super.onUnscheduled();
+        maxQuerySize = null;

Review comment:
       Need to avoid null-ing out values in @OnUnscheduled. Use @OnStopped instead.

##########
File path: nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/QuerySplunkIndexingStatus.java
##########
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.splunk;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.splunk.RequestMessage;
+import com.splunk.ResponseMessage;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.dto.splunk.EventIndexStatusRequest;
+import org.apache.nifi.dto.splunk.EventIndexStatusResponse;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Tags({"splunk", "logs", "http", "acknowledgement"})
+@CapabilityDescription("Queries Splunk server in order to acquire the status of indexing acknowledgement.")
+public class QuerySplunkIndexingStatus extends SplunkAPICall {
+    private static final String ENDPOINT = "/services/collector/ack";
+
+    static final Relationship RELATIONSHIP_ACKNOWLEDGED = new Relationship.Builder()
+            .name("success")
+            .description("A FlowFile is transferred into this relationship when the acknowledgement was successful.")
+            .build();
+
+    static final Relationship RELATIONSHIP_UNACKNOWLEDGED = new Relationship.Builder()
+            .name("unacknowledged")
+            .description("A FlowFile is transferred into this relationship when the acknowledgement was not successful.")
+            .build();
+
+    static final Relationship RELATIONSHIP_UNDETERMINED = new Relationship.Builder()
+            .name("undetermined")
+            .description(
+                    "A FlowFile is transferred into this relationship when the acknowledgement state is not determined. " +
+                    "Flow files transferred into this relationship might be penalized!")
+            .build();
+
+    static final Relationship RELATIONSHIP_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("A FlowFile is transferred into this relationship when the acknowledgement was not successful.")
+            .build();
+
+    private static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
+            RELATIONSHIP_ACKNOWLEDGED,
+            RELATIONSHIP_UNACKNOWLEDGED,
+            RELATIONSHIP_UNDETERMINED,
+            RELATIONSHIP_FAILURE
+    )));
+
+    static final PropertyDescriptor TTL = new PropertyDescriptor.Builder()
+            .name("ttl")
+            .displayName("Maximum waiting time")
+            .description(
+                    "The maximum time the service tries to acquire acknowledgement confirmation for an index, from the point of registration. " +
+                    "After the given amount of time, the service considers the index as not acknowledged and moves it into the output buffer as failed acknowledgement.")
+            .defaultValue("100 secs")
+            .required(true)
+            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor MAX_QUERY_SIZE = new PropertyDescriptor.Builder()
+            .name("max-query-size")
+            .displayName("Maximum Query Size")
+            .description(
+                    "The maximum number of acknowledgement identifiers the service query status for in one batch. " +
+                    "It is suggested to not set it too low in order to reduce network communication.")
+            .defaultValue("100")
+            .required(true)
+            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+            .build();
+
+    private volatile Integer maxQuerySize;
+    private volatile Integer ttl;
+
+    @Override
+    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        final List<PropertyDescriptor> result = new ArrayList<>();
+        final List<PropertyDescriptor> common = super.getSupportedPropertyDescriptors();
+        result.addAll(common);
+        result.add(TTL);
+        result.add(MAX_QUERY_SIZE);
+        return result;
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return RELATIONSHIPS;
+    }
+
+    @OnScheduled
+    public void onScheduled(final ProcessContext context) {
+        super.onScheduled(context);
+        maxQuerySize = context.getProperty(MAX_QUERY_SIZE).asInteger();
+        ttl = context.getProperty(TTL).asTimePeriod(TimeUnit.MILLISECONDS).intValue();
+    }
+
+    @OnUnscheduled
+    public void onUnscheduled() {
+        super.onUnscheduled();
+        maxQuerySize = null;
+        ttl = null;
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+        final long currentTime = System.currentTimeMillis();
+        final List<FlowFile> flowFiles = session.get(maxQuerySize);
+        final Map<Integer, FlowFile> undetermined = new HashMap<>();
+
+        for (final FlowFile flowFile : flowFiles)  {
+            final String insertedAt = flowFile.getAttribute(insertedAtAttributeName);
+            final String ackId = flowFile.getAttribute(ackIdAttributeName);
+
+            if (ackId == null || insertedAt == null) {
+                getLogger().error("Flow file attributes \"" + insertedAtAttributeName + "\" and \"" + ackIdAttributeName + "\" are needed!");
+                session.transfer(flowFile, RELATIONSHIP_FAILURE);
+            } else if (Long.valueOf(insertedAt) + ttl < currentTime) {
+                session.transfer(flowFile, RELATIONSHIP_UNACKNOWLEDGED);
+            } else {
+                undetermined.put(Integer.valueOf(flowFile.getAttribute(ackIdAttributeName)), flowFile);
+            }
+        }
+
+        if (undetermined.isEmpty()) {
+            getLogger().info("There was no eligible flow file to send request to Splunk.");

Review comment:
       Probably best to make that a debug-level log message

##########
File path: nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/SplunkAPICall.java
##########
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.splunk;
+
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.splunk.RequestMessage;
+import com.splunk.ResponseMessage;
+import com.splunk.SSLSecurityProtocol;
+import com.splunk.Service;
+import com.splunk.ServiceArgs;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Arrays;
+import java.util.List;
+
+abstract class SplunkAPICall extends AbstractProcessor {
+    private static final String REQUEST_CHANNEL_HEADER_NAME = "X-Splunk-Request-Channel";
+
+    private static final String HTTP_SCHEME = "http";
+    private static final String HTTPS_SCHEME = "https";
+
+    private static final AllowableValue TLS_1_2_VALUE = new AllowableValue(SSLSecurityProtocol.TLSv1_2.name(), SSLSecurityProtocol.TLSv1_2.name());
+    private static final AllowableValue TLS_1_1_VALUE = new AllowableValue(SSLSecurityProtocol.TLSv1_1.name(), SSLSecurityProtocol.TLSv1_1.name());
+    private static final AllowableValue TLS_1_VALUE = new AllowableValue(SSLSecurityProtocol.TLSv1.name(), SSLSecurityProtocol.TLSv1.name());
+    private static final AllowableValue SSL_3_VALUE = new AllowableValue(SSLSecurityProtocol.SSLv3.name(), SSLSecurityProtocol.SSLv3.name());
+
+    static final PropertyDescriptor SCHEME = new PropertyDescriptor.Builder()
+            .name("Scheme")
+            .description("The scheme for connecting to Splunk.")
+            .allowableValues(HTTPS_SCHEME, HTTP_SCHEME)
+            .defaultValue(HTTPS_SCHEME)
+            .required(true)
+            .build();
+
+    static final PropertyDescriptor HOSTNAME = new PropertyDescriptor.Builder()
+            .name("Hostname")
+            .description("The ip address or hostname of the Splunk server.")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .defaultValue("localhost")
+            .required(true)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    static final PropertyDescriptor PORT = new PropertyDescriptor
+            .Builder().name("Port")
+            .description("The HTTP Port Number of the Splunk server.")
+            .required(true)
+            .addValidator(StandardValidators.PORT_VALIDATOR)
+            .defaultValue("9088")
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    static final PropertyDescriptor SECURITY_PROTOCOL = new PropertyDescriptor.Builder()
+            .name("Security Protocol")
+            .description("The security protocol to use for communicating with Splunk.")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .allowableValues(TLS_1_2_VALUE, TLS_1_1_VALUE, TLS_1_VALUE, SSL_3_VALUE)
+            .defaultValue(TLS_1_2_VALUE.getValue())
+            .build();
+
+    static final PropertyDescriptor OWNER = new PropertyDescriptor.Builder()
+            .name("Owner")
+            .description("The owner to pass to Splunk.")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(false)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    static final PropertyDescriptor TOKEN = new PropertyDescriptor.Builder()
+            .name("Token")
+            .description("The token to pass to Splunk.")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(false)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    static final PropertyDescriptor USERNAME = new PropertyDescriptor.Builder()
+            .name("Username")
+            .description("The username to authenticate to Splunk.")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(false)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    static final PropertyDescriptor PASSWORD = new PropertyDescriptor.Builder()
+            .name("Password")
+            .description("The password to authenticate to Splunk.")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(false)
+            .sensitive(true)
+            .build();
+
+    static final PropertyDescriptor REQUEST_CHANNEL = new PropertyDescriptor.Builder()
+            .name("request-channel")
+            .displayName("Splunk Request Channel")
+            .description("Identifier of the used request channel.")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    static final PropertyDescriptor SPLUNK_ACK_ID_ATTRIBUTE = new PropertyDescriptor.Builder()
+            .name("splunk-ack-id-attribute-name")
+            .displayName("Splunk Acknowledgement Id Attribute Name")
+            .description("Specifies which flow file attribute will be used to store the Splunk acknowledgement id.")
+            .defaultValue("splunk_acknowledgement_id")

Review comment:
       Should use `splunk.ackknowledgment.id`. To be honest, I would actually remove this property all together and use a well-known attribute name. I can't think of a scenario where the user would really need to change the value, and adding to the number of properties only makes things more confusing.

##########
File path: nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/QuerySplunkIndexingStatus.java
##########
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.splunk;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.splunk.RequestMessage;
+import com.splunk.ResponseMessage;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.dto.splunk.EventIndexStatusRequest;
+import org.apache.nifi.dto.splunk.EventIndexStatusResponse;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Tags({"splunk", "logs", "http", "acknowledgement"})
+@CapabilityDescription("Queries Splunk server in order to acquire the status of indexing acknowledgement.")
+public class QuerySplunkIndexingStatus extends SplunkAPICall {

Review comment:
       Should probably add a `@SeeAlso` annotation pointing to the PutSplunkHTTP processor

##########
File path: nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/SplunkAPICall.java
##########
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.splunk;
+
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.splunk.RequestMessage;
+import com.splunk.ResponseMessage;
+import com.splunk.SSLSecurityProtocol;
+import com.splunk.Service;
+import com.splunk.ServiceArgs;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Arrays;
+import java.util.List;
+
+abstract class SplunkAPICall extends AbstractProcessor {
+    private static final String REQUEST_CHANNEL_HEADER_NAME = "X-Splunk-Request-Channel";
+
+    private static final String HTTP_SCHEME = "http";
+    private static final String HTTPS_SCHEME = "https";
+
+    private static final AllowableValue TLS_1_2_VALUE = new AllowableValue(SSLSecurityProtocol.TLSv1_2.name(), SSLSecurityProtocol.TLSv1_2.name());
+    private static final AllowableValue TLS_1_1_VALUE = new AllowableValue(SSLSecurityProtocol.TLSv1_1.name(), SSLSecurityProtocol.TLSv1_1.name());
+    private static final AllowableValue TLS_1_VALUE = new AllowableValue(SSLSecurityProtocol.TLSv1.name(), SSLSecurityProtocol.TLSv1.name());
+    private static final AllowableValue SSL_3_VALUE = new AllowableValue(SSLSecurityProtocol.SSLv3.name(), SSLSecurityProtocol.SSLv3.name());
+
+    static final PropertyDescriptor SCHEME = new PropertyDescriptor.Builder()
+            .name("Scheme")
+            .description("The scheme for connecting to Splunk.")
+            .allowableValues(HTTPS_SCHEME, HTTP_SCHEME)
+            .defaultValue(HTTPS_SCHEME)
+            .required(true)
+            .build();
+
+    static final PropertyDescriptor HOSTNAME = new PropertyDescriptor.Builder()
+            .name("Hostname")
+            .description("The ip address or hostname of the Splunk server.")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .defaultValue("localhost")
+            .required(true)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    static final PropertyDescriptor PORT = new PropertyDescriptor
+            .Builder().name("Port")
+            .description("The HTTP Port Number of the Splunk server.")
+            .required(true)
+            .addValidator(StandardValidators.PORT_VALIDATOR)
+            .defaultValue("9088")
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    static final PropertyDescriptor SECURITY_PROTOCOL = new PropertyDescriptor.Builder()
+            .name("Security Protocol")
+            .description("The security protocol to use for communicating with Splunk.")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .allowableValues(TLS_1_2_VALUE, TLS_1_1_VALUE, TLS_1_VALUE, SSL_3_VALUE)
+            .defaultValue(TLS_1_2_VALUE.getValue())
+            .build();
+
+    static final PropertyDescriptor OWNER = new PropertyDescriptor.Builder()
+            .name("Owner")
+            .description("The owner to pass to Splunk.")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(false)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    static final PropertyDescriptor TOKEN = new PropertyDescriptor.Builder()
+            .name("Token")
+            .description("The token to pass to Splunk.")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(false)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    static final PropertyDescriptor USERNAME = new PropertyDescriptor.Builder()
+            .name("Username")
+            .description("The username to authenticate to Splunk.")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(false)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    static final PropertyDescriptor PASSWORD = new PropertyDescriptor.Builder()
+            .name("Password")
+            .description("The password to authenticate to Splunk.")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(false)
+            .sensitive(true)
+            .build();
+
+    static final PropertyDescriptor REQUEST_CHANNEL = new PropertyDescriptor.Builder()
+            .name("request-channel")
+            .displayName("Splunk Request Channel")
+            .description("Identifier of the used request channel.")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    static final PropertyDescriptor SPLUNK_ACK_ID_ATTRIBUTE = new PropertyDescriptor.Builder()
+            .name("splunk-ack-id-attribute-name")
+            .displayName("Splunk Acknowledgement Id Attribute Name")
+            .description("Specifies which flow file attribute will be used to store the Splunk acknowledgement id.")
+            .defaultValue("splunk_acknowledgement_id")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    static final PropertyDescriptor SPLUNK_SENT_AT_ATTRIBUTE = new PropertyDescriptor.Builder()

Review comment:
       Again, same comments as above. Should use splunk.send.at or even splunk.send.timestamp. But really doesn't need to be a Property - just use a well-known attribute that both processors understand.

##########
File path: nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/QuerySplunkIndexingStatus.java
##########
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.splunk;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.splunk.RequestMessage;
+import com.splunk.ResponseMessage;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.dto.splunk.EventIndexStatusRequest;
+import org.apache.nifi.dto.splunk.EventIndexStatusResponse;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Tags({"splunk", "logs", "http", "acknowledgement"})
+@CapabilityDescription("Queries Splunk server in order to acquire the status of indexing acknowledgement.")
+public class QuerySplunkIndexingStatus extends SplunkAPICall {
+    private static final String ENDPOINT = "/services/collector/ack";
+
+    static final Relationship RELATIONSHIP_ACKNOWLEDGED = new Relationship.Builder()
+            .name("success")
+            .description("A FlowFile is transferred into this relationship when the acknowledgement was successful.")
+            .build();
+
+    static final Relationship RELATIONSHIP_UNACKNOWLEDGED = new Relationship.Builder()
+            .name("unacknowledged")
+            .description("A FlowFile is transferred into this relationship when the acknowledgement was not successful.")
+            .build();
+
+    static final Relationship RELATIONSHIP_UNDETERMINED = new Relationship.Builder()
+            .name("undetermined")
+            .description(
+                    "A FlowFile is transferred into this relationship when the acknowledgement state is not determined. " +
+                    "Flow files transferred into this relationship might be penalized!")
+            .build();
+
+    static final Relationship RELATIONSHIP_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("A FlowFile is transferred into this relationship when the acknowledgement was not successful.")
+            .build();
+
+    private static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
+            RELATIONSHIP_ACKNOWLEDGED,
+            RELATIONSHIP_UNACKNOWLEDGED,
+            RELATIONSHIP_UNDETERMINED,
+            RELATIONSHIP_FAILURE
+    )));
+
+    static final PropertyDescriptor TTL = new PropertyDescriptor.Builder()
+            .name("ttl")
+            .displayName("Maximum waiting time")
+            .description(
+                    "The maximum time the service tries to acquire acknowledgement confirmation for an index, from the point of registration. " +
+                    "After the given amount of time, the service considers the index as not acknowledged and moves it into the output buffer as failed acknowledgement.")
+            .defaultValue("100 secs")
+            .required(true)
+            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor MAX_QUERY_SIZE = new PropertyDescriptor.Builder()
+            .name("max-query-size")
+            .displayName("Maximum Query Size")
+            .description(
+                    "The maximum number of acknowledgement identifiers the service query status for in one batch. " +
+                    "It is suggested to not set it too low in order to reduce network communication.")
+            .defaultValue("100")
+            .required(true)
+            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+            .build();
+
+    private volatile Integer maxQuerySize;
+    private volatile Integer ttl;
+
+    @Override
+    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        final List<PropertyDescriptor> result = new ArrayList<>();
+        final List<PropertyDescriptor> common = super.getSupportedPropertyDescriptors();
+        result.addAll(common);
+        result.add(TTL);
+        result.add(MAX_QUERY_SIZE);
+        return result;
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return RELATIONSHIPS;
+    }
+
+    @OnScheduled
+    public void onScheduled(final ProcessContext context) {
+        super.onScheduled(context);
+        maxQuerySize = context.getProperty(MAX_QUERY_SIZE).asInteger();
+        ttl = context.getProperty(TTL).asTimePeriod(TimeUnit.MILLISECONDS).intValue();
+    }
+
+    @OnUnscheduled
+    public void onUnscheduled() {
+        super.onUnscheduled();
+        maxQuerySize = null;
+        ttl = null;
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+        final long currentTime = System.currentTimeMillis();
+        final List<FlowFile> flowFiles = session.get(maxQuerySize);
+        final Map<Integer, FlowFile> undetermined = new HashMap<>();
+
+        for (final FlowFile flowFile : flowFiles)  {
+            final String insertedAt = flowFile.getAttribute(insertedAtAttributeName);
+            final String ackId = flowFile.getAttribute(ackIdAttributeName);
+
+            if (ackId == null || insertedAt == null) {
+                getLogger().error("Flow file attributes \"" + insertedAtAttributeName + "\" and \"" + ackIdAttributeName + "\" are needed!");
+                session.transfer(flowFile, RELATIONSHIP_FAILURE);
+            } else if (Long.valueOf(insertedAt) + ttl < currentTime) {
+                session.transfer(flowFile, RELATIONSHIP_UNACKNOWLEDGED);
+            } else {
+                undetermined.put(Integer.valueOf(flowFile.getAttribute(ackIdAttributeName)), flowFile);

Review comment:
       Should just use `ackId` rather than `flowFile.getAttribute()` again. Also should consider that if the attribute is not an integer, it will result in a rather unclear error message. Same as the `insertedAt` above. Might make sense to check that and provide a clean error message, such as "The 'splunk.inserted.at' attribute is expected to be an integer value but was <xyz>"

##########
File path: nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/PutSplunkHTTP.java
##########
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.splunk;
+
+import com.splunk.RequestMessage;
+import com.splunk.ResponseMessage;
+import org.apache.commons.io.IOUtils;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.dto.splunk.SendRawDataResponse;
+import org.apache.nifi.dto.splunk.SendRawDataSuccessResponse;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import java.io.StringWriter;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Tags({"splunk", "logs", "http"})
+@CapabilityDescription("Sends events to the specified Splunk server over HTTP or HTTPS. Supports HEC Index Acknowledgement.")
+public class PutSplunkHTTP extends SplunkAPICall {

Review comment:
       Should probably add a `@SeeAlso` annotation pointing to QuerySplunkIndexingStatus.class

##########
File path: nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/SplunkAPICall.java
##########
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.splunk;
+
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.splunk.RequestMessage;
+import com.splunk.ResponseMessage;
+import com.splunk.SSLSecurityProtocol;
+import com.splunk.Service;
+import com.splunk.ServiceArgs;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Arrays;
+import java.util.List;
+
+abstract class SplunkAPICall extends AbstractProcessor {
+    private static final String REQUEST_CHANNEL_HEADER_NAME = "X-Splunk-Request-Channel";
+
+    private static final String HTTP_SCHEME = "http";
+    private static final String HTTPS_SCHEME = "https";
+
+    private static final AllowableValue TLS_1_2_VALUE = new AllowableValue(SSLSecurityProtocol.TLSv1_2.name(), SSLSecurityProtocol.TLSv1_2.name());
+    private static final AllowableValue TLS_1_1_VALUE = new AllowableValue(SSLSecurityProtocol.TLSv1_1.name(), SSLSecurityProtocol.TLSv1_1.name());
+    private static final AllowableValue TLS_1_VALUE = new AllowableValue(SSLSecurityProtocol.TLSv1.name(), SSLSecurityProtocol.TLSv1.name());
+    private static final AllowableValue SSL_3_VALUE = new AllowableValue(SSLSecurityProtocol.SSLv3.name(), SSLSecurityProtocol.SSLv3.name());
+
+    static final PropertyDescriptor SCHEME = new PropertyDescriptor.Builder()
+            .name("Scheme")
+            .description("The scheme for connecting to Splunk.")
+            .allowableValues(HTTPS_SCHEME, HTTP_SCHEME)
+            .defaultValue(HTTPS_SCHEME)
+            .required(true)
+            .build();
+
+    static final PropertyDescriptor HOSTNAME = new PropertyDescriptor.Builder()
+            .name("Hostname")
+            .description("The ip address or hostname of the Splunk server.")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .defaultValue("localhost")
+            .required(true)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    static final PropertyDescriptor PORT = new PropertyDescriptor
+            .Builder().name("Port")
+            .description("The HTTP Port Number of the Splunk server.")
+            .required(true)
+            .addValidator(StandardValidators.PORT_VALIDATOR)
+            .defaultValue("9088")
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    static final PropertyDescriptor SECURITY_PROTOCOL = new PropertyDescriptor.Builder()
+            .name("Security Protocol")
+            .description("The security protocol to use for communicating with Splunk.")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .allowableValues(TLS_1_2_VALUE, TLS_1_1_VALUE, TLS_1_VALUE, SSL_3_VALUE)
+            .defaultValue(TLS_1_2_VALUE.getValue())
+            .build();
+
+    static final PropertyDescriptor OWNER = new PropertyDescriptor.Builder()
+            .name("Owner")
+            .description("The owner to pass to Splunk.")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(false)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    static final PropertyDescriptor TOKEN = new PropertyDescriptor.Builder()
+            .name("Token")
+            .description("The token to pass to Splunk.")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(false)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    static final PropertyDescriptor USERNAME = new PropertyDescriptor.Builder()
+            .name("Username")
+            .description("The username to authenticate to Splunk.")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(false)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    static final PropertyDescriptor PASSWORD = new PropertyDescriptor.Builder()
+            .name("Password")
+            .description("The password to authenticate to Splunk.")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(false)
+            .sensitive(true)
+            .build();
+
+    static final PropertyDescriptor REQUEST_CHANNEL = new PropertyDescriptor.Builder()
+            .name("request-channel")
+            .displayName("Splunk Request Channel")
+            .description("Identifier of the used request channel.")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    static final PropertyDescriptor SPLUNK_ACK_ID_ATTRIBUTE = new PropertyDescriptor.Builder()
+            .name("splunk-ack-id-attribute-name")
+            .displayName("Splunk Acknowledgement Id Attribute Name")
+            .description("Specifies which flow file attribute will be used to store the Splunk acknowledgement id.")
+            .defaultValue("splunk_acknowledgement_id")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    static final PropertyDescriptor SPLUNK_SENT_AT_ATTRIBUTE = new PropertyDescriptor.Builder()
+            .name("splunk-sent-at-attribute")
+            .displayName("Splunk Sent At Attribute Name")
+            .description("Specifies which flow file attribute will be used to store the time of sending the event into Splunk.")
+            .defaultValue("splunk_send_at")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    protected static final List<PropertyDescriptor> PROPERTIES = Arrays.asList(
+            SCHEME,
+            HOSTNAME,
+            PORT,
+            SECURITY_PROTOCOL,
+            OWNER,
+            TOKEN,
+            USERNAME,
+            PASSWORD,
+            REQUEST_CHANNEL,
+            SPLUNK_ACK_ID_ATTRIBUTE,
+            SPLUNK_SENT_AT_ATTRIBUTE
+    );
+
+    protected final JsonFactory jsonFactory = new JsonFactory();

Review comment:
       Member variables should be private. If needed in subclasses, they should be exposed through protected methods.

##########
File path: nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/resources/docs/org.apache.nifi.processors.splunk.PutSplunkHTTP/additionalDetails.html
##########
@@ -0,0 +1,76 @@
+<!DOCTYPE html>
+<html lang="en">
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+      http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<head>
+    <meta charset="utf-8" />
+    <title>PutSplunkHTTP</title>
+    <link rel="stylesheet" href="/nifi-docs/css/component-usage.css" type="text/css" />
+</head>
+
+<body>
+<h2>PutSplunkHTTP</h2>
+
+<p>
+    This processor serves as a counterpart for PutSplunk processor. While the later solves communication using TCP and
+    UDP protocols, PutSplunkHTTP aims to send events into Splunk via HTTP or HTTPS. In this fashion, this processor
+    shows similarities with GetSplunk processor and the properties relevant to the connection with Splunk server are
+    identical. There are however some aspects unique for this processor:
+</p>
+
+<h3>Content details</h3>
+
+<p>
+    PutSplunkHTTP allows the user to specify some metadata about the event is being sent to the Splunk. These include: the
+    "Character Type" and the "Content Type" of the flow file content, using the matching properties. If the incoming
+    flow file has "mime.type" attribute, the processor will use it, expect the "Content Type" property is set, in which
+    case the property will override the flow file attribute.
+</p>
+
+<h3>Event parameters</h3>
+
+<p>
+    The "Source", "Source Type", "Host" and "Index" properties are optional and will be set by Splunk if unspecified. If set,
+    the default values will be overwritten by user specified ones. For more details about the Splunk API, please visit
+    <a href="https://docs.splunk.com/Documentation/Splunk/LATEST/RESTREF/RESTinput#services.2Fcollector.2Fraw">this documentation</a>.
+</p>
+
+<h3>Acknowledgements</h3>
+
+<p>
+    HTTP Event Collector (HEC) in Splunk provides the possibility of index acknowledgement, which can be used to monitor
+    the indexing status of the individual events. PutSplunkHTTP supports this feature by enriching the outgoing flow file
+    with the necessary information, making it possible for a later processor to poll the status based on. The necessary
+    information for this is stored within the flow file attributes, specified by the "Splunk Acknowledgement Id Attribute Name"
+    and "Splunk Sent At Attribute Name".
+</p>
+
+<p>
+    For further steps of acknowledgement handling in NiFi side, please refer to QuerySplunkIndexingStatus processor. For more
+    details about the index acknowledgement, please visit <a href="https://docs.splunk.com/Documentation/Splunk/LATEST/Data/AboutHECIDXAck">this documentation</a>.
+</p>
+
+<h3>Error information</h3>
+
+<p>
+    For more refined processing, flow files are extended with additional information if possible. The information is stored

Review comment:
       I think 'enriched' is probably the right word here, rather than 'extended'. Or even 'updated'.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org