You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by GitBox <gi...@apache.org> on 2020/12/08 19:37:40 UTC

[GitHub] [nifi] exceptionfactory commented on a change in pull request #4714: NIFI-7801 Adding support for HTTP based Splunk put and indexed acknowledgement

exceptionfactory commented on a change in pull request #4714:
URL: https://github.com/apache/nifi/pull/4714#discussion_r538744071



##########
File path: nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/SplunkAPICall.java
##########
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.splunk;
+
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.splunk.RequestMessage;
+import com.splunk.ResponseMessage;
+import com.splunk.SSLSecurityProtocol;
+import com.splunk.Service;
+import com.splunk.ServiceArgs;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Arrays;
+import java.util.List;
+
+abstract class SplunkAPICall extends AbstractProcessor {
+    private static final String REQUEST_CHANNEL_HEADER_NAME = "X-Splunk-Request-Channel";
+
+    private static final String HTTP_SCHEME = "http";
+    private static final String HTTPS_SCHEME = "https";
+
+    private static final AllowableValue TLS_1_2_VALUE = new AllowableValue(SSLSecurityProtocol.TLSv1_2.name(), SSLSecurityProtocol.TLSv1_2.name());
+    private static final AllowableValue TLS_1_1_VALUE = new AllowableValue(SSLSecurityProtocol.TLSv1_1.name(), SSLSecurityProtocol.TLSv1_1.name());
+    private static final AllowableValue TLS_1_VALUE = new AllowableValue(SSLSecurityProtocol.TLSv1.name(), SSLSecurityProtocol.TLSv1.name());
+    private static final AllowableValue SSL_3_VALUE = new AllowableValue(SSLSecurityProtocol.SSLv3.name(), SSLSecurityProtocol.SSLv3.name());
+
+    static final PropertyDescriptor SCHEME = new PropertyDescriptor.Builder()
+            .name("Scheme")
+            .description("The scheme for connecting to Splunk.")
+            .allowableValues(HTTPS_SCHEME, HTTP_SCHEME)
+            .defaultValue(HTTPS_SCHEME)
+            .required(true)
+            .build();
+
+    static final PropertyDescriptor HOSTNAME = new PropertyDescriptor.Builder()
+            .name("Hostname")
+            .description("The ip address or hostname of the Splunk server.")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .defaultValue("localhost")
+            .required(true)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    static final PropertyDescriptor PORT = new PropertyDescriptor
+            .Builder().name("Port")
+            .description("The HTTP Port Number of the Splunk server.")
+            .required(true)
+            .addValidator(StandardValidators.PORT_VALIDATOR)
+            .defaultValue("9088")
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    static final PropertyDescriptor SECURITY_PROTOCOL = new PropertyDescriptor.Builder()
+            .name("Security Protocol")
+            .description("The security protocol to use for communicating with Splunk.")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .allowableValues(TLS_1_2_VALUE, TLS_1_1_VALUE, TLS_1_VALUE, SSL_3_VALUE)
+            .defaultValue(TLS_1_2_VALUE.getValue())
+            .build();
+
+    static final PropertyDescriptor OWNER = new PropertyDescriptor.Builder()
+            .name("Owner")
+            .description("The owner to pass to Splunk.")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(false)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    static final PropertyDescriptor TOKEN = new PropertyDescriptor.Builder()
+            .name("Token")
+            .description("The token to pass to Splunk.")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(false)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    static final PropertyDescriptor USERNAME = new PropertyDescriptor.Builder()
+            .name("Username")
+            .description("The username to authenticate to Splunk.")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(false)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    static final PropertyDescriptor PASSWORD = new PropertyDescriptor.Builder()
+            .name("Password")
+            .description("The password to authenticate to Splunk.")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(false)
+            .sensitive(true)
+            .build();
+
+    static final PropertyDescriptor REQUEST_CHANNEL = new PropertyDescriptor.Builder()
+            .name("request-channel")
+            .displayName("Splunk Request Channel")
+            .description("Identifier of the used request channel.")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    static final PropertyDescriptor SPLUNK_ACK_ID_ATTRIBUTE = new PropertyDescriptor.Builder()
+            .name("splunk-ack-id-attribute-name")
+            .displayName("Splunk Acknowledgement Id Attribute Name")
+            .description("Specifies which flow file attribute will be used to store the Splunk acknowledgement id.")
+            .defaultValue("splunk_acknowledgement_id")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    static final PropertyDescriptor SPLUNK_SENT_AT_ATTRIBUTE = new PropertyDescriptor.Builder()
+            .name("splunk-sent-at-attribute")
+            .displayName("Splunk Sent At Attribute Name")
+            .description("Specifies which flow file attribute will be used to store the time of sending the event into Splunk.")
+            .defaultValue("splunk_send_at")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    protected static final List<PropertyDescriptor> PROPERTIES = Arrays.asList(
+            SCHEME,
+            HOSTNAME,
+            PORT,
+            SECURITY_PROTOCOL,
+            OWNER,
+            TOKEN,
+            USERNAME,
+            PASSWORD,
+            REQUEST_CHANNEL,
+            SPLUNK_ACK_ID_ATTRIBUTE,
+            SPLUNK_SENT_AT_ATTRIBUTE
+    );
+
+    protected final JsonFactory jsonFactory = new JsonFactory();
+    protected final ObjectMapper jsonObjectMapper = new ObjectMapper(jsonFactory);
+
+    protected volatile ServiceArgs splunkServiceArguments;
+    protected volatile Service splunkService;
+    protected volatile String requestChannel;
+    protected volatile String ackIdAttributeName;
+    protected volatile String insertedAtAttributeName;
+
+    @Override
+    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return SplunkAPICall.PROPERTIES;
+    }
+
+    @OnScheduled
+    public void onScheduled(final ProcessContext context) {
+        splunkServiceArguments = getSplunkServiceArgs(context);
+        splunkService = getSplunkService(splunkServiceArguments);
+        requestChannel = context.getProperty(SplunkAPICall.REQUEST_CHANNEL).evaluateAttributeExpressions().getValue();
+        ackIdAttributeName = context.getProperty(SplunkAPICall.SPLUNK_ACK_ID_ATTRIBUTE).evaluateAttributeExpressions().getValue();
+        insertedAtAttributeName = context.getProperty(SplunkAPICall.SPLUNK_SENT_AT_ATTRIBUTE).evaluateAttributeExpressions().getValue();
+    }
+
+    private ServiceArgs getSplunkServiceArgs(final ProcessContext context) {
+        final ServiceArgs splunkServiceArguments = new ServiceArgs();
+
+        splunkServiceArguments.setScheme(context.getProperty(SCHEME).getValue());
+        splunkServiceArguments.setHost(context.getProperty(HOSTNAME).evaluateAttributeExpressions().getValue());
+        splunkServiceArguments.setPort(context.getProperty(PORT).evaluateAttributeExpressions().asInteger());
+
+        if (context.getProperty(OWNER).isSet()) {
+            splunkServiceArguments.setOwner(context.getProperty(OWNER).evaluateAttributeExpressions().getValue());
+        }
+
+        if (context.getProperty(TOKEN).isSet()) {
+            splunkServiceArguments.setToken(context.getProperty(TOKEN).evaluateAttributeExpressions().getValue());
+        }
+
+        if (context.getProperty(USERNAME).isSet()) {
+            splunkServiceArguments.setUsername(context.getProperty(USERNAME).evaluateAttributeExpressions().getValue());
+        }
+
+        if (context.getProperty(PASSWORD).isSet()) {
+            splunkServiceArguments.setPassword(context.getProperty(PASSWORD).getValue());
+        }
+
+        if (HTTPS_SCHEME.equals(context.getProperty(SCHEME).getValue()) && context.getProperty(SECURITY_PROTOCOL).isSet()) {
+            splunkServiceArguments.setSSLSecurityProtocol(SSLSecurityProtocol.valueOf(context.getProperty(SECURITY_PROTOCOL).getValue()));

Review comment:
       The Splunk Service object has a setSSLSocketFactory method that would allow leveraging the NiFi StandardRestrictedSSLContextService instead of providing custom SSL properties.  Did you evaluate using that method instead of providing this Security Protocol property?

##########
File path: nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/SplunkAPICall.java
##########
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.splunk;
+
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.splunk.RequestMessage;
+import com.splunk.ResponseMessage;
+import com.splunk.SSLSecurityProtocol;
+import com.splunk.Service;
+import com.splunk.ServiceArgs;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Arrays;
+import java.util.List;
+
+abstract class SplunkAPICall extends AbstractProcessor {
+    private static final String REQUEST_CHANNEL_HEADER_NAME = "X-Splunk-Request-Channel";
+
+    private static final String HTTP_SCHEME = "http";
+    private static final String HTTPS_SCHEME = "https";
+
+    private static final AllowableValue TLS_1_2_VALUE = new AllowableValue(SSLSecurityProtocol.TLSv1_2.name(), SSLSecurityProtocol.TLSv1_2.name());
+    private static final AllowableValue TLS_1_1_VALUE = new AllowableValue(SSLSecurityProtocol.TLSv1_1.name(), SSLSecurityProtocol.TLSv1_1.name());
+    private static final AllowableValue TLS_1_VALUE = new AllowableValue(SSLSecurityProtocol.TLSv1.name(), SSLSecurityProtocol.TLSv1.name());
+    private static final AllowableValue SSL_3_VALUE = new AllowableValue(SSLSecurityProtocol.SSLv3.name(), SSLSecurityProtocol.SSLv3.name());
+
+    static final PropertyDescriptor SCHEME = new PropertyDescriptor.Builder()
+            .name("Scheme")
+            .description("The scheme for connecting to Splunk.")
+            .allowableValues(HTTPS_SCHEME, HTTP_SCHEME)
+            .defaultValue(HTTPS_SCHEME)
+            .required(true)
+            .build();
+
+    static final PropertyDescriptor HOSTNAME = new PropertyDescriptor.Builder()
+            .name("Hostname")
+            .description("The ip address or hostname of the Splunk server.")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .defaultValue("localhost")
+            .required(true)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    static final PropertyDescriptor PORT = new PropertyDescriptor
+            .Builder().name("Port")
+            .description("The HTTP Port Number of the Splunk server.")
+            .required(true)
+            .addValidator(StandardValidators.PORT_VALIDATOR)
+            .defaultValue("9088")
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    static final PropertyDescriptor SECURITY_PROTOCOL = new PropertyDescriptor.Builder()
+            .name("Security Protocol")
+            .description("The security protocol to use for communicating with Splunk.")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .allowableValues(TLS_1_2_VALUE, TLS_1_1_VALUE, TLS_1_VALUE, SSL_3_VALUE)
+            .defaultValue(TLS_1_2_VALUE.getValue())
+            .build();
+
+    static final PropertyDescriptor OWNER = new PropertyDescriptor.Builder()
+            .name("Owner")
+            .description("The owner to pass to Splunk.")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(false)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    static final PropertyDescriptor TOKEN = new PropertyDescriptor.Builder()
+            .name("Token")
+            .description("The token to pass to Splunk.")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(false)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    static final PropertyDescriptor USERNAME = new PropertyDescriptor.Builder()
+            .name("Username")
+            .description("The username to authenticate to Splunk.")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(false)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    static final PropertyDescriptor PASSWORD = new PropertyDescriptor.Builder()
+            .name("Password")
+            .description("The password to authenticate to Splunk.")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(false)
+            .sensitive(true)
+            .build();
+
+    static final PropertyDescriptor REQUEST_CHANNEL = new PropertyDescriptor.Builder()
+            .name("request-channel")
+            .displayName("Splunk Request Channel")
+            .description("Identifier of the used request channel.")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    static final PropertyDescriptor SPLUNK_ACK_ID_ATTRIBUTE = new PropertyDescriptor.Builder()
+            .name("splunk-ack-id-attribute-name")
+            .displayName("Splunk Acknowledgement Id Attribute Name")
+            .description("Specifies which flow file attribute will be used to store the Splunk acknowledgement id.")
+            .defaultValue("splunk_acknowledgement_id")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    static final PropertyDescriptor SPLUNK_SENT_AT_ATTRIBUTE = new PropertyDescriptor.Builder()
+            .name("splunk-sent-at-attribute")
+            .displayName("Splunk Sent At Attribute Name")
+            .description("Specifies which flow file attribute will be used to store the time of sending the event into Splunk.")
+            .defaultValue("splunk_send_at")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    protected static final List<PropertyDescriptor> PROPERTIES = Arrays.asList(
+            SCHEME,
+            HOSTNAME,
+            PORT,
+            SECURITY_PROTOCOL,
+            OWNER,
+            TOKEN,
+            USERNAME,
+            PASSWORD,
+            REQUEST_CHANNEL,
+            SPLUNK_ACK_ID_ATTRIBUTE,
+            SPLUNK_SENT_AT_ATTRIBUTE
+    );
+
+    protected final JsonFactory jsonFactory = new JsonFactory();
+    protected final ObjectMapper jsonObjectMapper = new ObjectMapper(jsonFactory);
+
+    protected volatile ServiceArgs splunkServiceArguments;
+    protected volatile Service splunkService;
+    protected volatile String requestChannel;
+    protected volatile String ackIdAttributeName;
+    protected volatile String insertedAtAttributeName;
+
+    @Override
+    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return SplunkAPICall.PROPERTIES;
+    }
+
+    @OnScheduled
+    public void onScheduled(final ProcessContext context) {
+        splunkServiceArguments = getSplunkServiceArgs(context);
+        splunkService = getSplunkService(splunkServiceArguments);
+        requestChannel = context.getProperty(SplunkAPICall.REQUEST_CHANNEL).evaluateAttributeExpressions().getValue();
+        ackIdAttributeName = context.getProperty(SplunkAPICall.SPLUNK_ACK_ID_ATTRIBUTE).evaluateAttributeExpressions().getValue();
+        insertedAtAttributeName = context.getProperty(SplunkAPICall.SPLUNK_SENT_AT_ATTRIBUTE).evaluateAttributeExpressions().getValue();
+    }
+
+    private ServiceArgs getSplunkServiceArgs(final ProcessContext context) {
+        final ServiceArgs splunkServiceArguments = new ServiceArgs();
+
+        splunkServiceArguments.setScheme(context.getProperty(SCHEME).getValue());
+        splunkServiceArguments.setHost(context.getProperty(HOSTNAME).evaluateAttributeExpressions().getValue());
+        splunkServiceArguments.setPort(context.getProperty(PORT).evaluateAttributeExpressions().asInteger());
+
+        if (context.getProperty(OWNER).isSet()) {
+            splunkServiceArguments.setOwner(context.getProperty(OWNER).evaluateAttributeExpressions().getValue());
+        }
+
+        if (context.getProperty(TOKEN).isSet()) {
+            splunkServiceArguments.setToken(context.getProperty(TOKEN).evaluateAttributeExpressions().getValue());
+        }
+
+        if (context.getProperty(USERNAME).isSet()) {
+            splunkServiceArguments.setUsername(context.getProperty(USERNAME).evaluateAttributeExpressions().getValue());
+        }
+
+        if (context.getProperty(PASSWORD).isSet()) {
+            splunkServiceArguments.setPassword(context.getProperty(PASSWORD).getValue());
+        }
+
+        if (HTTPS_SCHEME.equals(context.getProperty(SCHEME).getValue()) && context.getProperty(SECURITY_PROTOCOL).isSet()) {
+            splunkServiceArguments.setSSLSecurityProtocol(SSLSecurityProtocol.valueOf(context.getProperty(SECURITY_PROTOCOL).getValue()));
+        }
+
+        return splunkServiceArguments;
+    }
+
+    protected Service getSplunkService(final ServiceArgs splunkServiceArguments) {
+        return Service.connect(splunkServiceArguments);
+    }
+
+    @OnUnscheduled
+    public void onUnscheduled() {
+        if (splunkService != null) {
+            splunkService.logout();
+            splunkService = null;
+        }
+
+        requestChannel = null;
+        ackIdAttributeName = null;
+        insertedAtAttributeName = null;
+        splunkServiceArguments = null;
+    }
+
+    protected ResponseMessage call(final String endpoint, final RequestMessage request)  {
+        request.getHeader().put(REQUEST_CHANNEL_HEADER_NAME, requestChannel);
+
+        try {
+            return splunkService.send(endpoint, request);
+            //Catch Stale connection exception, reinitialize, and retry
+        } catch (final com.splunk.HttpException e) {

Review comment:
       Is there a reason for using the qualified class as opposed to importing it?

##########
File path: nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/test/java/org/apache/nifi/processors/splunk/TestQuerySplunkIndexingStatus.java
##########
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.splunk;
+
+import com.splunk.RequestMessage;
+import com.splunk.ResponseMessage;
+import com.splunk.Service;
+import com.splunk.ServiceArgs;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import java.io.ByteArrayInputStream;
+import java.io.InputStream;
+import java.io.UnsupportedEncodingException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+@RunWith(MockitoJUnitRunner.class)
+public class TestQuerySplunkIndexingStatus {
+    private static final String EVENT = "{\"a\"=\"b\",\"c\"=\"d\",\"e\"=\"f\"}";
+
+    @Mock
+    private Service service;
+
+    @Mock
+    private ResponseMessage response;
+
+    private MockedQuerySplunkIndexingStatus processor;
+    private TestRunner testRunner;
+
+    private ArgumentCaptor<String> path;
+    private ArgumentCaptor<RequestMessage> request;
+
+    @Before
+    public void setUp() {
+        processor = new MockedQuerySplunkIndexingStatus(service);
+        testRunner = TestRunners.newTestRunner(processor);
+        testRunner.setProperty(SplunkAPICall.SCHEME, "http");
+        testRunner.setProperty(SplunkAPICall.TOKEN, "Splunk 888c5a81-8777-49a0-a3af-f76e050ab5d9");
+        testRunner.setProperty(SplunkAPICall.REQUEST_CHANNEL, "22bd7414-0d77-4c73-936d-c8f5d1b21862");
+
+        path = ArgumentCaptor.forClass(String.class);
+        request = ArgumentCaptor.forClass(RequestMessage.class);
+        Mockito.when(service.send(path.capture(), request.capture())).thenReturn(response);
+    }
+
+    @After
+    public void tearDown() {
+        testRunner.shutdown();
+    }
+
+    @Test
+    public void testHappyPath() throws Exception {

Review comment:
       Recommend renaming to testRunSuccess()

##########
File path: nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/SplunkAPICall.java
##########
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.splunk;
+
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.splunk.RequestMessage;
+import com.splunk.ResponseMessage;
+import com.splunk.SSLSecurityProtocol;
+import com.splunk.Service;
+import com.splunk.ServiceArgs;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Arrays;
+import java.util.List;
+
+abstract class SplunkAPICall extends AbstractProcessor {
+    private static final String REQUEST_CHANNEL_HEADER_NAME = "X-Splunk-Request-Channel";
+
+    private static final String HTTP_SCHEME = "http";
+    private static final String HTTPS_SCHEME = "https";
+
+    private static final AllowableValue TLS_1_2_VALUE = new AllowableValue(SSLSecurityProtocol.TLSv1_2.name(), SSLSecurityProtocol.TLSv1_2.name());
+    private static final AllowableValue TLS_1_1_VALUE = new AllowableValue(SSLSecurityProtocol.TLSv1_1.name(), SSLSecurityProtocol.TLSv1_1.name());
+    private static final AllowableValue TLS_1_VALUE = new AllowableValue(SSLSecurityProtocol.TLSv1.name(), SSLSecurityProtocol.TLSv1.name());
+    private static final AllowableValue SSL_3_VALUE = new AllowableValue(SSLSecurityProtocol.SSLv3.name(), SSLSecurityProtocol.SSLv3.name());

Review comment:
       Recommend eliminating TLSv1 and SSLv3 as options since they are generally considered to vulnerable protocols.  SSLv3 is also disabled on most recent Java installations, so it may not work even if selected.  As mentioned in a separate comment, refactoring to leverage the NiFi SSLContextService would delegate the protocol selection to that service as opposed to introducing options specific to this Processor.

##########
File path: nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/SplunkAPICall.java
##########
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.splunk;
+
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.splunk.RequestMessage;
+import com.splunk.ResponseMessage;
+import com.splunk.SSLSecurityProtocol;
+import com.splunk.Service;
+import com.splunk.ServiceArgs;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Arrays;
+import java.util.List;
+
+abstract class SplunkAPICall extends AbstractProcessor {
+    private static final String REQUEST_CHANNEL_HEADER_NAME = "X-Splunk-Request-Channel";
+
+    private static final String HTTP_SCHEME = "http";
+    private static final String HTTPS_SCHEME = "https";
+
+    private static final AllowableValue TLS_1_2_VALUE = new AllowableValue(SSLSecurityProtocol.TLSv1_2.name(), SSLSecurityProtocol.TLSv1_2.name());
+    private static final AllowableValue TLS_1_1_VALUE = new AllowableValue(SSLSecurityProtocol.TLSv1_1.name(), SSLSecurityProtocol.TLSv1_1.name());
+    private static final AllowableValue TLS_1_VALUE = new AllowableValue(SSLSecurityProtocol.TLSv1.name(), SSLSecurityProtocol.TLSv1.name());
+    private static final AllowableValue SSL_3_VALUE = new AllowableValue(SSLSecurityProtocol.SSLv3.name(), SSLSecurityProtocol.SSLv3.name());
+
+    static final PropertyDescriptor SCHEME = new PropertyDescriptor.Builder()
+            .name("Scheme")
+            .description("The scheme for connecting to Splunk.")
+            .allowableValues(HTTPS_SCHEME, HTTP_SCHEME)
+            .defaultValue(HTTPS_SCHEME)
+            .required(true)
+            .build();
+
+    static final PropertyDescriptor HOSTNAME = new PropertyDescriptor.Builder()
+            .name("Hostname")
+            .description("The ip address or hostname of the Splunk server.")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .defaultValue("localhost")
+            .required(true)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    static final PropertyDescriptor PORT = new PropertyDescriptor

Review comment:
       Although the Scheme, Hostname, and Port properties mirror the options for the Splunk Service, did you consider providing the NiFi property definition as a URL?  Providing the NiFi property as a URL would still allow for retrieving the scheme, hostname, and port elements for instantiation, and could also simplify the NiFi Processor configuration.

##########
File path: nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/SplunkAPICall.java
##########
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.splunk;
+
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.splunk.RequestMessage;
+import com.splunk.ResponseMessage;
+import com.splunk.SSLSecurityProtocol;
+import com.splunk.Service;
+import com.splunk.ServiceArgs;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Arrays;
+import java.util.List;
+
+abstract class SplunkAPICall extends AbstractProcessor {
+    private static final String REQUEST_CHANNEL_HEADER_NAME = "X-Splunk-Request-Channel";
+
+    private static final String HTTP_SCHEME = "http";
+    private static final String HTTPS_SCHEME = "https";
+
+    private static final AllowableValue TLS_1_2_VALUE = new AllowableValue(SSLSecurityProtocol.TLSv1_2.name(), SSLSecurityProtocol.TLSv1_2.name());
+    private static final AllowableValue TLS_1_1_VALUE = new AllowableValue(SSLSecurityProtocol.TLSv1_1.name(), SSLSecurityProtocol.TLSv1_1.name());
+    private static final AllowableValue TLS_1_VALUE = new AllowableValue(SSLSecurityProtocol.TLSv1.name(), SSLSecurityProtocol.TLSv1.name());
+    private static final AllowableValue SSL_3_VALUE = new AllowableValue(SSLSecurityProtocol.SSLv3.name(), SSLSecurityProtocol.SSLv3.name());
+
+    static final PropertyDescriptor SCHEME = new PropertyDescriptor.Builder()
+            .name("Scheme")
+            .description("The scheme for connecting to Splunk.")
+            .allowableValues(HTTPS_SCHEME, HTTP_SCHEME)
+            .defaultValue(HTTPS_SCHEME)
+            .required(true)
+            .build();
+
+    static final PropertyDescriptor HOSTNAME = new PropertyDescriptor.Builder()
+            .name("Hostname")
+            .description("The ip address or hostname of the Splunk server.")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .defaultValue("localhost")
+            .required(true)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    static final PropertyDescriptor PORT = new PropertyDescriptor
+            .Builder().name("Port")
+            .description("The HTTP Port Number of the Splunk server.")
+            .required(true)
+            .addValidator(StandardValidators.PORT_VALIDATOR)
+            .defaultValue("9088")
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    static final PropertyDescriptor SECURITY_PROTOCOL = new PropertyDescriptor.Builder()
+            .name("Security Protocol")
+            .description("The security protocol to use for communicating with Splunk.")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .allowableValues(TLS_1_2_VALUE, TLS_1_1_VALUE, TLS_1_VALUE, SSL_3_VALUE)
+            .defaultValue(TLS_1_2_VALUE.getValue())
+            .build();
+
+    static final PropertyDescriptor OWNER = new PropertyDescriptor.Builder()
+            .name("Owner")
+            .description("The owner to pass to Splunk.")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(false)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    static final PropertyDescriptor TOKEN = new PropertyDescriptor.Builder()
+            .name("Token")
+            .description("The token to pass to Splunk.")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(false)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    static final PropertyDescriptor USERNAME = new PropertyDescriptor.Builder()
+            .name("Username")
+            .description("The username to authenticate to Splunk.")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(false)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    static final PropertyDescriptor PASSWORD = new PropertyDescriptor.Builder()
+            .name("Password")
+            .description("The password to authenticate to Splunk.")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(false)
+            .sensitive(true)
+            .build();
+
+    static final PropertyDescriptor REQUEST_CHANNEL = new PropertyDescriptor.Builder()
+            .name("request-channel")
+            .displayName("Splunk Request Channel")
+            .description("Identifier of the used request channel.")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    static final PropertyDescriptor SPLUNK_ACK_ID_ATTRIBUTE = new PropertyDescriptor.Builder()
+            .name("splunk-ack-id-attribute-name")
+            .displayName("Splunk Acknowledgement Id Attribute Name")
+            .description("Specifies which flow file attribute will be used to store the Splunk acknowledgement id.")
+            .defaultValue("splunk_acknowledgement_id")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    static final PropertyDescriptor SPLUNK_SENT_AT_ATTRIBUTE = new PropertyDescriptor.Builder()
+            .name("splunk-sent-at-attribute")
+            .displayName("Splunk Sent At Attribute Name")
+            .description("Specifies which flow file attribute will be used to store the time of sending the event into Splunk.")
+            .defaultValue("splunk_send_at")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    protected static final List<PropertyDescriptor> PROPERTIES = Arrays.asList(
+            SCHEME,
+            HOSTNAME,
+            PORT,
+            SECURITY_PROTOCOL,
+            OWNER,
+            TOKEN,
+            USERNAME,
+            PASSWORD,
+            REQUEST_CHANNEL,
+            SPLUNK_ACK_ID_ATTRIBUTE,
+            SPLUNK_SENT_AT_ATTRIBUTE
+    );
+
+    protected final JsonFactory jsonFactory = new JsonFactory();
+    protected final ObjectMapper jsonObjectMapper = new ObjectMapper(jsonFactory);
+
+    protected volatile ServiceArgs splunkServiceArguments;
+    protected volatile Service splunkService;
+    protected volatile String requestChannel;
+    protected volatile String ackIdAttributeName;
+    protected volatile String insertedAtAttributeName;
+
+    @Override
+    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return SplunkAPICall.PROPERTIES;
+    }
+
+    @OnScheduled
+    public void onScheduled(final ProcessContext context) {
+        splunkServiceArguments = getSplunkServiceArgs(context);
+        splunkService = getSplunkService(splunkServiceArguments);
+        requestChannel = context.getProperty(SplunkAPICall.REQUEST_CHANNEL).evaluateAttributeExpressions().getValue();
+        ackIdAttributeName = context.getProperty(SplunkAPICall.SPLUNK_ACK_ID_ATTRIBUTE).evaluateAttributeExpressions().getValue();
+        insertedAtAttributeName = context.getProperty(SplunkAPICall.SPLUNK_SENT_AT_ATTRIBUTE).evaluateAttributeExpressions().getValue();
+    }
+
+    private ServiceArgs getSplunkServiceArgs(final ProcessContext context) {
+        final ServiceArgs splunkServiceArguments = new ServiceArgs();
+
+        splunkServiceArguments.setScheme(context.getProperty(SCHEME).getValue());
+        splunkServiceArguments.setHost(context.getProperty(HOSTNAME).evaluateAttributeExpressions().getValue());
+        splunkServiceArguments.setPort(context.getProperty(PORT).evaluateAttributeExpressions().asInteger());
+
+        if (context.getProperty(OWNER).isSet()) {
+            splunkServiceArguments.setOwner(context.getProperty(OWNER).evaluateAttributeExpressions().getValue());
+        }
+
+        if (context.getProperty(TOKEN).isSet()) {
+            splunkServiceArguments.setToken(context.getProperty(TOKEN).evaluateAttributeExpressions().getValue());
+        }
+
+        if (context.getProperty(USERNAME).isSet()) {
+            splunkServiceArguments.setUsername(context.getProperty(USERNAME).evaluateAttributeExpressions().getValue());
+        }
+
+        if (context.getProperty(PASSWORD).isSet()) {
+            splunkServiceArguments.setPassword(context.getProperty(PASSWORD).getValue());
+        }
+
+        if (HTTPS_SCHEME.equals(context.getProperty(SCHEME).getValue()) && context.getProperty(SECURITY_PROTOCOL).isSet()) {
+            splunkServiceArguments.setSSLSecurityProtocol(SSLSecurityProtocol.valueOf(context.getProperty(SECURITY_PROTOCOL).getValue()));
+        }
+
+        return splunkServiceArguments;
+    }
+
+    protected Service getSplunkService(final ServiceArgs splunkServiceArguments) {
+        return Service.connect(splunkServiceArguments);
+    }
+
+    @OnUnscheduled
+    public void onUnscheduled() {
+        if (splunkService != null) {
+            splunkService.logout();
+            splunkService = null;
+        }
+
+        requestChannel = null;
+        ackIdAttributeName = null;
+        insertedAtAttributeName = null;
+        splunkServiceArguments = null;
+    }
+
+    protected ResponseMessage call(final String endpoint, final RequestMessage request)  {
+        request.getHeader().put(REQUEST_CHANNEL_HEADER_NAME, requestChannel);
+
+        try {
+            return splunkService.send(endpoint, request);
+            //Catch Stale connection exception, reinitialize, and retry
+        } catch (final com.splunk.HttpException e) {
+            getLogger().error("Splunk request status code:" + e.getStatus() + " Retrying the request.");

Review comment:
       Recommend adjusting the logging statement to use parameters:
   ```suggestion
               getLogger().error("Splunk failed with HTTP {}: Retrying the request", new Object[]{ e.getStatus() });
   ```

##########
File path: nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/SplunkAPICall.java
##########
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.splunk;
+
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.splunk.RequestMessage;
+import com.splunk.ResponseMessage;
+import com.splunk.SSLSecurityProtocol;
+import com.splunk.Service;
+import com.splunk.ServiceArgs;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Arrays;
+import java.util.List;
+
+abstract class SplunkAPICall extends AbstractProcessor {
+    private static final String REQUEST_CHANNEL_HEADER_NAME = "X-Splunk-Request-Channel";
+
+    private static final String HTTP_SCHEME = "http";
+    private static final String HTTPS_SCHEME = "https";
+
+    private static final AllowableValue TLS_1_2_VALUE = new AllowableValue(SSLSecurityProtocol.TLSv1_2.name(), SSLSecurityProtocol.TLSv1_2.name());
+    private static final AllowableValue TLS_1_1_VALUE = new AllowableValue(SSLSecurityProtocol.TLSv1_1.name(), SSLSecurityProtocol.TLSv1_1.name());
+    private static final AllowableValue TLS_1_VALUE = new AllowableValue(SSLSecurityProtocol.TLSv1.name(), SSLSecurityProtocol.TLSv1.name());
+    private static final AllowableValue SSL_3_VALUE = new AllowableValue(SSLSecurityProtocol.SSLv3.name(), SSLSecurityProtocol.SSLv3.name());
+
+    static final PropertyDescriptor SCHEME = new PropertyDescriptor.Builder()
+            .name("Scheme")
+            .description("The scheme for connecting to Splunk.")
+            .allowableValues(HTTPS_SCHEME, HTTP_SCHEME)
+            .defaultValue(HTTPS_SCHEME)
+            .required(true)
+            .build();
+
+    static final PropertyDescriptor HOSTNAME = new PropertyDescriptor.Builder()
+            .name("Hostname")
+            .description("The ip address or hostname of the Splunk server.")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .defaultValue("localhost")
+            .required(true)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    static final PropertyDescriptor PORT = new PropertyDescriptor
+            .Builder().name("Port")
+            .description("The HTTP Port Number of the Splunk server.")
+            .required(true)
+            .addValidator(StandardValidators.PORT_VALIDATOR)
+            .defaultValue("9088")
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    static final PropertyDescriptor SECURITY_PROTOCOL = new PropertyDescriptor.Builder()
+            .name("Security Protocol")
+            .description("The security protocol to use for communicating with Splunk.")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .allowableValues(TLS_1_2_VALUE, TLS_1_1_VALUE, TLS_1_VALUE, SSL_3_VALUE)
+            .defaultValue(TLS_1_2_VALUE.getValue())
+            .build();
+
+    static final PropertyDescriptor OWNER = new PropertyDescriptor.Builder()
+            .name("Owner")
+            .description("The owner to pass to Splunk.")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(false)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    static final PropertyDescriptor TOKEN = new PropertyDescriptor.Builder()
+            .name("Token")
+            .description("The token to pass to Splunk.")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(false)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    static final PropertyDescriptor USERNAME = new PropertyDescriptor.Builder()
+            .name("Username")
+            .description("The username to authenticate to Splunk.")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(false)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    static final PropertyDescriptor PASSWORD = new PropertyDescriptor.Builder()
+            .name("Password")
+            .description("The password to authenticate to Splunk.")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(false)
+            .sensitive(true)
+            .build();
+
+    static final PropertyDescriptor REQUEST_CHANNEL = new PropertyDescriptor.Builder()
+            .name("request-channel")
+            .displayName("Splunk Request Channel")
+            .description("Identifier of the used request channel.")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    static final PropertyDescriptor SPLUNK_ACK_ID_ATTRIBUTE = new PropertyDescriptor.Builder()
+            .name("splunk-ack-id-attribute-name")
+            .displayName("Splunk Acknowledgement Id Attribute Name")
+            .description("Specifies which flow file attribute will be used to store the Splunk acknowledgement id.")
+            .defaultValue("splunk_acknowledgement_id")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    static final PropertyDescriptor SPLUNK_SENT_AT_ATTRIBUTE = new PropertyDescriptor.Builder()
+            .name("splunk-sent-at-attribute")
+            .displayName("Splunk Sent At Attribute Name")
+            .description("Specifies which flow file attribute will be used to store the time of sending the event into Splunk.")
+            .defaultValue("splunk_send_at")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    protected static final List<PropertyDescriptor> PROPERTIES = Arrays.asList(
+            SCHEME,
+            HOSTNAME,
+            PORT,
+            SECURITY_PROTOCOL,
+            OWNER,
+            TOKEN,
+            USERNAME,
+            PASSWORD,
+            REQUEST_CHANNEL,
+            SPLUNK_ACK_ID_ATTRIBUTE,
+            SPLUNK_SENT_AT_ATTRIBUTE
+    );
+
+    protected final JsonFactory jsonFactory = new JsonFactory();
+    protected final ObjectMapper jsonObjectMapper = new ObjectMapper(jsonFactory);
+
+    protected volatile ServiceArgs splunkServiceArguments;
+    protected volatile Service splunkService;
+    protected volatile String requestChannel;
+    protected volatile String ackIdAttributeName;
+    protected volatile String insertedAtAttributeName;
+
+    @Override
+    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return SplunkAPICall.PROPERTIES;
+    }
+
+    @OnScheduled
+    public void onScheduled(final ProcessContext context) {
+        splunkServiceArguments = getSplunkServiceArgs(context);
+        splunkService = getSplunkService(splunkServiceArguments);
+        requestChannel = context.getProperty(SplunkAPICall.REQUEST_CHANNEL).evaluateAttributeExpressions().getValue();
+        ackIdAttributeName = context.getProperty(SplunkAPICall.SPLUNK_ACK_ID_ATTRIBUTE).evaluateAttributeExpressions().getValue();
+        insertedAtAttributeName = context.getProperty(SplunkAPICall.SPLUNK_SENT_AT_ATTRIBUTE).evaluateAttributeExpressions().getValue();
+    }
+
+    private ServiceArgs getSplunkServiceArgs(final ProcessContext context) {
+        final ServiceArgs splunkServiceArguments = new ServiceArgs();
+
+        splunkServiceArguments.setScheme(context.getProperty(SCHEME).getValue());
+        splunkServiceArguments.setHost(context.getProperty(HOSTNAME).evaluateAttributeExpressions().getValue());
+        splunkServiceArguments.setPort(context.getProperty(PORT).evaluateAttributeExpressions().asInteger());
+
+        if (context.getProperty(OWNER).isSet()) {
+            splunkServiceArguments.setOwner(context.getProperty(OWNER).evaluateAttributeExpressions().getValue());
+        }
+
+        if (context.getProperty(TOKEN).isSet()) {
+            splunkServiceArguments.setToken(context.getProperty(TOKEN).evaluateAttributeExpressions().getValue());
+        }
+
+        if (context.getProperty(USERNAME).isSet()) {
+            splunkServiceArguments.setUsername(context.getProperty(USERNAME).evaluateAttributeExpressions().getValue());
+        }
+
+        if (context.getProperty(PASSWORD).isSet()) {
+            splunkServiceArguments.setPassword(context.getProperty(PASSWORD).getValue());
+        }
+
+        if (HTTPS_SCHEME.equals(context.getProperty(SCHEME).getValue()) && context.getProperty(SECURITY_PROTOCOL).isSet()) {
+            splunkServiceArguments.setSSLSecurityProtocol(SSLSecurityProtocol.valueOf(context.getProperty(SECURITY_PROTOCOL).getValue()));
+        }
+
+        return splunkServiceArguments;
+    }
+
+    protected Service getSplunkService(final ServiceArgs splunkServiceArguments) {
+        return Service.connect(splunkServiceArguments);
+    }
+
+    @OnUnscheduled
+    public void onUnscheduled() {
+        if (splunkService != null) {
+            splunkService.logout();
+            splunkService = null;
+        }
+
+        requestChannel = null;
+        ackIdAttributeName = null;
+        insertedAtAttributeName = null;
+        splunkServiceArguments = null;
+    }
+
+    protected ResponseMessage call(final String endpoint, final RequestMessage request)  {
+        request.getHeader().put(REQUEST_CHANNEL_HEADER_NAME, requestChannel);
+
+        try {
+            return splunkService.send(endpoint, request);
+            //Catch Stale connection exception, reinitialize, and retry
+        } catch (final com.splunk.HttpException e) {
+            getLogger().error("Splunk request status code:" + e.getStatus() + " Retrying the request.");
+            splunkService.logout();
+            splunkService = getSplunkService(splunkServiceArguments);
+            return splunkService.send(endpoint, request);
+        }
+    }
+
+    protected <T> T extractResult(final InputStream responseBody, final Class<T> type) throws IOException {
+        final JsonParser jsonParser = jsonFactory.createParser(responseBody);
+        jsonParser.setCodec(jsonObjectMapper);
+        return jsonParser.readValueAs(type);

Review comment:
       Is there a reason for not using `jsonObjectMapper.readValue(responseBody, type)`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org