You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ex...@apache.org on 2022/04/20 19:03:30 UTC

[nifi] branch main updated: NIFI-9720 Added QuerySalesforceObject Processor

This is an automated email from the ASF dual-hosted git repository.

exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 73ee1a9a1e NIFI-9720 Added QuerySalesforceObject Processor
73ee1a9a1e is described below

commit 73ee1a9a1e355d31f348a82acdfff4a387ef485b
Author: Lehel <Le...@hotmail.com>
AuthorDate: Wed Feb 23 16:07:00 2022 +0100

    NIFI-9720 Added QuerySalesforceObject Processor
    
    This closes #5802
    
    Signed-off-by: David Handermann <ex...@apache.org>
---
 nifi-assembly/pom.xml                              |   6 +
 .../nifi-salesforce-nar/pom.xml                    |  46 ++
 .../nifi-salesforce-processors/pom.xml             | 122 ++++++
 .../salesforce/QuerySalesforceObject.java          | 486 +++++++++++++++++++++
 .../salesforce/util/SalesforceRestService.java     |  91 ++++
 .../util/SalesforceToRecordSchemaConverter.java    | 120 +++++
 .../services/org.apache.nifi.processor.Processor   |  15 +
 .../salesforce/QuerySalesforceObjectIT.java        |  79 ++++
 .../salesforce/util/SalesforceConfigAware.java     |  57 +++
 .../salesforce/util/SalesforceRestServiceIT.java   |  80 ++++
 .../SalesforceToRecordSchemaConverterTest.java     | 169 +++++++
 .../resources/converter/complex_sf_schema.json     |  16 +
 .../test/resources/converter/simple_sf_schema.json |  76 ++++
 .../converter/unknown_type_sf_schema.json          |  11 +
 nifi-nar-bundles/nifi-salesforce-bundle/pom.xml    |  32 ++
 nifi-nar-bundles/pom.xml                           |   1 +
 16 files changed, 1407 insertions(+)

diff --git a/nifi-assembly/pom.xml b/nifi-assembly/pom.xml
index becd82c40e..0501212288 100644
--- a/nifi-assembly/pom.xml
+++ b/nifi-assembly/pom.xml
@@ -839,6 +839,12 @@ language governing permissions and limitations under the License. -->
             <version>1.17.0-SNAPSHOT</version>
             <type>nar</type>
         </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-salesforce-nar</artifactId>
+            <version>1.17.0-SNAPSHOT</version>
+            <type>nar</type>
+        </dependency>
         <!-- dependencies for jaxb/activation/annotation for running NiFi on Java 11 -->
         <!-- TODO: remove these once minimum Java version is 11 -->
         <dependency>
diff --git a/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-nar/pom.xml b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-nar/pom.xml
new file mode 100644
index 0000000000..68baa4d07d
--- /dev/null
+++ b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-nar/pom.xml
@@ -0,0 +1,46 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements. See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License. You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>nifi-salesforce-bundle</artifactId>
+        <groupId>org.apache.nifi</groupId>
+        <version>1.17.0-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>nifi-salesforce-nar</artifactId>
+
+    <packaging>nar</packaging>
+    <properties>
+        <maven.javadoc.skip>true</maven.javadoc.skip>
+        <source.skip>true</source.skip>
+    </properties>
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-salesforce-processors</artifactId>
+            <version>1.17.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-record-serialization-services-nar</artifactId>
+            <version>1.17.0-SNAPSHOT</version>
+            <type>nar</type>
+        </dependency>
+    </dependencies>
+</project>
diff --git a/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/pom.xml b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/pom.xml
new file mode 100644
index 0000000000..07071778ee
--- /dev/null
+++ b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/pom.xml
@@ -0,0 +1,122 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements. See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License. You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xmlns="http://maven.apache.org/POM/4.0.0"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>nifi-salesforce-bundle</artifactId>
+        <groupId>org.apache.nifi</groupId>
+        <version>1.17.0-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>nifi-salesforce-processors</artifactId>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.squareup.okhttp3</groupId>
+            <artifactId>okhttp</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-oauth2-provider-api</artifactId>
+            <version>1.17.0-SNAPSHOT</version>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-record</artifactId>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.camel</groupId>
+            <artifactId>camel-salesforce</artifactId>
+            <version>3.14.1</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>*</groupId>
+                    <artifactId>*</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>com.fasterxml.jackson.datatype</groupId>
+            <artifactId>jackson-datatype-jsr310</artifactId>
+            <version>${jackson.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>com.fasterxml.jackson.module</groupId>
+            <artifactId>jackson-module-jsonSchema</artifactId>
+            <version>${jackson.version}</version>
+        </dependency>
+        <!-- Provided from parent nar -->
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-record-serialization-service-api</artifactId>
+            <version>1.17.0-SNAPSHOT</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-record-serialization-services</artifactId>
+            <version>1.17.0-SNAPSHOT</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>com.squareup.okhttp3</groupId>
+            <artifactId>mockwebserver</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <!-- Test dependencies -->
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-mock</artifactId>
+            <version>1.17.0-SNAPSHOT</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-oauth2-provider-service</artifactId>
+            <version>1.17.0-SNAPSHOT</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-mock-record-utils</artifactId>
+            <version>1.17.0-SNAPSHOT</version>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.rat</groupId>
+                <artifactId>apache-rat-plugin</artifactId>
+                <configuration>
+                    <excludes>
+                        <!-- test data -->
+                        <exclude>src/test/resources/**/*</exclude>
+                    </excludes>
+                </configuration>
+            </plugin>
+        </plugins>
+    </build>
+</project>
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/QuerySalesforceObject.java b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/QuerySalesforceObject.java
new file mode 100644
index 0000000000..89d675e1fc
--- /dev/null
+++ b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/QuerySalesforceObject.java
@@ -0,0 +1,486 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.salesforce;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.PrimaryNodeOnly;
+import org.apache.nifi.annotation.behavior.Stateful;
+import org.apache.nifi.annotation.behavior.TriggerSerially;
+import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.components.state.StateMap;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.json.JsonTreeRowRecordReader;
+import org.apache.nifi.json.StartingFieldStrategy;
+import org.apache.nifi.oauth2.OAuth2AccessTokenProvider;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.salesforce.util.SalesforceRestService;
+import org.apache.nifi.processors.salesforce.util.SalesforceToRecordSchemaConverter;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.WriteResult;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.UncheckedIOException;
+import java.time.OffsetDateTime;
+import java.time.format.DateTimeFormatter;
+import java.time.temporal.ChronoUnit;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+@PrimaryNodeOnly
+@TriggerSerially
+@TriggerWhenEmpty
+@InputRequirement(Requirement.INPUT_FORBIDDEN)
+@Tags({"salesforce", "sobject", "soql", "query"})
+@CapabilityDescription("Retrieves records from a Salesforce sObject. Users can add arbitrary filter conditions by setting the 'Custom WHERE Condition' property."
+        + " Supports incremental retrieval: users can define a field in the 'Age Field' property that will be used to determine when the record was created."
+        + " When this property is set the processor will retrieve new records. It's also possible to define an initial cutoff value for the age, fitering out all older records"
+        + " even for the first run. This processor is intended to be run on the Primary Node only."
+        + " FlowFile attribute 'record.count' indicates how many records were retrieved and written to the output.")
+@Stateful(scopes = Scope.CLUSTER, description = "When 'Age Field' is set, after performing a query the time of execution is stored. Subsequent queries will be augmented"
+        + " with an additional condition so that only records that are newer than the stored execution time (adjusted with the optional value of 'Age Delay') will be retrieved."
+        + " State is stored across the cluster so that this Processor can be run on Primary Node only and if a new Primary Node is selected,"
+        + " the new node can pick up where the previous node left off, without duplicating the data.")
+@WritesAttributes({
+        @WritesAttribute(attribute = "mime.type", description = "Sets the mime.type attribute to the MIME Type specified by the Record Writer."),
+        @WritesAttribute(attribute = "record.count", description = "Sets the number of records in the FlowFile.")
+})
+public class QuerySalesforceObject extends AbstractProcessor {
+
+    static final PropertyDescriptor API_URL = new PropertyDescriptor.Builder()
+            .name("salesforce-url")
+            .displayName("URL")
+            .description("The URL for the Salesforce REST API including the domain without additional path information, such as https://MyDomainName.my.salesforce.com")
+            .required(true)
+            .addValidator(StandardValidators.URL_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    static final PropertyDescriptor API_VERSION = new PropertyDescriptor.Builder()
+            .name("salesforce-api-version")
+            .displayName("API Version")
+            .description("The version number of the Salesforce REST API appended to the URL after the services/data path. See Salesforce documentation for supported versions")
+            .required(true)
+            .addValidator(StandardValidators.NUMBER_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .defaultValue("54.0")
+            .build();
+
+    static final PropertyDescriptor SOBJECT_NAME = new PropertyDescriptor.Builder()
+            .name("sobject-name")
+            .displayName("sObject Name")
+            .description("The Salesforce sObject to be queried")
+            .required(true)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor FIELD_NAMES = new PropertyDescriptor.Builder()
+            .name("field-names")
+            .displayName("Field Names")
+            .description("Comma-separated list of field names requested from the sObject to be queried")
+            .required(true)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor READ_TIMEOUT = new PropertyDescriptor.Builder()
+            .name("read-timeout")
+            .displayName("Read Timeout")
+            .description("Maximum time allowed for reading a response from the Salesforce REST API")
+            .required(true)
+            .defaultValue("15 s")
+            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    static final PropertyDescriptor TOKEN_PROVIDER = new PropertyDescriptor.Builder()
+            .name("oauth2-access-token-provider")
+            .displayName("OAuth2 Access Token Provider")
+            .description("Service providing OAuth2 Access Tokens for authenticating using the HTTP Authorization Header")
+            .identifiesControllerService(OAuth2AccessTokenProvider.class)
+            .required(true)
+            .build();
+
+    static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder()
+            .name("record-writer")
+            .displayName("Record Writer")
+            .description("Service used for writing records returned from the Salesforce REST API")
+            .identifiesControllerService(RecordSetWriterFactory.class)
+            .required(true)
+            .build();
+
+    static final PropertyDescriptor CREATE_ZERO_RECORD_FILES = new PropertyDescriptor.Builder()
+            .name("create-zero-record-files")
+            .displayName("Create Zero Record FlowFiles")
+            .description("Specifies whether or not to create a FlowFile when the Salesforce REST API does not return any records")
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .allowableValues("true", "false")
+            .defaultValue("false")
+            .required(true)
+            .build();
+
+    static final PropertyDescriptor AGE_FIELD = new PropertyDescriptor.Builder()
+            .name("age-field")
+            .displayName("Age Field")
+            .description("The name of a TIMESTAMP field that will be used to limit all and filter already retrieved records."
+                    + " Only records that are older than the previous run time of this processor will be retrieved."
+            )
+            .required(false)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor AGE_DELAY = new PropertyDescriptor.Builder()
+            .name("age-delay")
+            .displayName("Age Delay")
+            .description("When 'Age Field' is set the age-based filter will be adjusted by this amount."
+                    + " Only records that are older than the previous run time of this processor, by at least this amount, will be retrieved."
+            )
+            .required(false)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+            .dependsOn(AGE_FIELD)
+            .build();
+
+    static final PropertyDescriptor INITIAL_AGE_FILTER = new PropertyDescriptor.Builder()
+            .name("initial-age-filter")
+            .displayName("Initial Age Filter")
+            .description("When 'Age Field' is set the value of this property will serve as a filter when this processor runs the first time."
+                    + " Only records that are older than this value be retrieved."
+            )
+            .required(false)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .dependsOn(AGE_FIELD)
+            .build();
+
+    static final PropertyDescriptor CUSTOM_WHERE_CONDITION = new PropertyDescriptor.Builder()
+            .name("custom-where-condition")
+            .displayName("Custom WHERE Condition")
+            .description("A custom expression to be added in the WHERE clause of the query")
+            .required(false)
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("For FlowFiles created as a result of a successful query.")
+            .build();
+
+    private static final String LAST_AGE_FILTER = "last_age_filter";
+    private static final String STARTING_FIELD_NAME = "records";
+    private static final String DATE_FORMAT = "yyyy-MM-dd";
+    private static final String TIME_FORMAT = "HH:mm:ss.SSSX";
+    private static final String DATE_TIME_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSSZZZZ";
+
+    private volatile SalesforceToRecordSchemaConverter salesForceToRecordSchemaConverter;
+    private volatile SalesforceRestService salesforceRestService;
+
+    @OnScheduled
+    public void onScheduled(final ProcessContext context) {
+        salesForceToRecordSchemaConverter = new SalesforceToRecordSchemaConverter(
+                DATE_FORMAT,
+                DATE_TIME_FORMAT,
+                TIME_FORMAT
+        );
+
+        String salesforceVersion = context.getProperty(API_VERSION).getValue();
+        String baseUrl = context.getProperty(API_URL).getValue();
+        OAuth2AccessTokenProvider accessTokenProvider = context.getProperty(TOKEN_PROVIDER).asControllerService(OAuth2AccessTokenProvider.class);
+
+        salesforceRestService = new SalesforceRestService(
+                salesforceVersion,
+                baseUrl,
+                () -> accessTokenProvider.getAccessDetails().getAccessToken(),
+                context.getProperty(READ_TIMEOUT).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS).intValue()
+        );
+    }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return Collections.unmodifiableList(Arrays.asList(
+                API_URL,
+                API_VERSION,
+                SOBJECT_NAME,
+                FIELD_NAMES,
+                READ_TIMEOUT,
+                TOKEN_PROVIDER,
+                RECORD_WRITER,
+                CREATE_ZERO_RECORD_FILES,
+                AGE_FIELD,
+                INITIAL_AGE_FILTER,
+                AGE_DELAY,
+                CUSTOM_WHERE_CONDITION
+        ));
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        final Set<Relationship> relationships = new HashSet<>();
+        relationships.add(REL_SUCCESS);
+        return relationships;
+    }
+
+    @Override
+    protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
+        final List<ValidationResult> results = new ArrayList<>(super.customValidate(validationContext));
+        if (validationContext.getProperty(INITIAL_AGE_FILTER).isSet() && !validationContext.getProperty(AGE_FIELD).isSet()) {
+            results.add(
+                    new ValidationResult.Builder()
+                            .subject(INITIAL_AGE_FILTER.getDisplayName())
+                            .valid(false)
+                            .explanation("it requires " + AGE_FIELD.getDisplayName() + " also to be set.")
+                            .build()
+            );
+        }
+        return results;
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+        String sObject = context.getProperty(SOBJECT_NAME).getValue();
+        String fields = context.getProperty(FIELD_NAMES).getValue();
+        String customWhereClause = context.getProperty(CUSTOM_WHERE_CONDITION).getValue();
+        RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
+        boolean createZeroRecordFlowFiles = context.getProperty(CREATE_ZERO_RECORD_FILES).asBoolean();
+
+        String ageField = context.getProperty(AGE_FIELD).getValue();
+        String initialAgeFilter = context.getProperty(INITIAL_AGE_FILTER).getValue();
+        Long ageDelayMs = context.getProperty(AGE_DELAY).asTimePeriod(TimeUnit.MILLISECONDS);
+
+        String ageFilterLower;
+        StateMap state;
+        try {
+            state = context.getStateManager().getState(Scope.CLUSTER);
+            ageFilterLower = state.get(LAST_AGE_FILTER);
+        } catch (IOException e) {
+            throw new ProcessException("Last Age Filter state retrieval failed", e);
+        }
+
+        String ageFilterUpper;
+        if (ageField == null) {
+            ageFilterUpper = null;
+        } else {
+            OffsetDateTime ageFilterUpperTime;
+            if (ageDelayMs == null) {
+                ageFilterUpperTime = OffsetDateTime.now();
+            } else {
+                ageFilterUpperTime = OffsetDateTime.now().minus(ageDelayMs, ChronoUnit.MILLIS);
+            }
+            ageFilterUpper = ageFilterUpperTime.format(DateTimeFormatter.ISO_OFFSET_DATE_TIME);
+        }
+
+        ConvertedSalesforceSchema convertedSalesforceSchema = getConvertedSalesforceSchema(sObject, fields);
+
+        String querySObject = buildQuery(
+                sObject,
+                fields,
+                customWhereClause,
+                ageField,
+                initialAgeFilter,
+                ageFilterLower,
+                ageFilterUpper
+        );
+
+        FlowFile flowFile = session.create();
+
+        Map<String, String> originalAttributes = flowFile.getAttributes();
+        Map<String, String> attributes = new HashMap<>();
+
+        AtomicInteger recordCountHolder = new AtomicInteger();
+
+        flowFile = session.write(flowFile, out -> {
+            try (
+                    InputStream querySObjectResultInputStream = salesforceRestService.query(querySObject);
+                    JsonTreeRowRecordReader jsonReader = new JsonTreeRowRecordReader(
+                            querySObjectResultInputStream,
+                            getLogger(),
+                            convertedSalesforceSchema.querySObjectResultSchema,
+                            DATE_FORMAT,
+                            TIME_FORMAT,
+                            DATE_TIME_FORMAT,
+                            StartingFieldStrategy.NESTED_FIELD,
+                            STARTING_FIELD_NAME
+                    );
+
+                    RecordSetWriter writer = writerFactory.createWriter(
+                            getLogger(),
+                            writerFactory.getSchema(
+                                    originalAttributes,
+                                    convertedSalesforceSchema.recordSchema
+                            ),
+                            out,
+                            originalAttributes
+                    )
+            ) {
+                writer.beginRecordSet();
+
+                Record querySObjectRecord;
+                while ((querySObjectRecord = jsonReader.nextRecord()) != null) {
+                    writer.write(querySObjectRecord);
+                }
+
+                WriteResult writeResult = writer.finishRecordSet();
+
+                attributes.put("record.count", String.valueOf(writeResult.getRecordCount()));
+                attributes.put(CoreAttributes.MIME_TYPE.key(), writer.getMimeType());
+                attributes.putAll(writeResult.getAttributes());
+
+                recordCountHolder.set(writeResult.getRecordCount());
+
+                if (ageFilterUpper != null) {
+                    Map<String, String> newState = new HashMap<>(state.toMap());
+                    newState.put(LAST_AGE_FILTER, ageFilterUpper);
+                    updateState(context, newState);
+                }
+            } catch (SchemaNotFoundException e) {
+                throw new ProcessException("Couldn't create record writer", e);
+            } catch (MalformedRecordException e) {
+                throw new ProcessException("Couldn't read records from input", e);
+            }
+        });
+
+        int recordCount = recordCountHolder.get();
+
+        if (!createZeroRecordFlowFiles && recordCount == 0) {
+            session.remove(flowFile);
+        } else {
+            flowFile = session.putAllAttributes(flowFile, attributes);
+            session.transfer(flowFile, REL_SUCCESS);
+
+            session.adjustCounter("Records Processed", recordCount, false);
+            getLogger().info("Successfully written {} records for {}", recordCount, flowFile);
+        }
+    }
+
+    private ConvertedSalesforceSchema getConvertedSalesforceSchema(String sObject, String fields) {
+        try (InputStream describeSObjectResult = salesforceRestService.describeSObject(sObject)) {
+            return convertSchema(describeSObjectResult, fields);
+        } catch (IOException e) {
+            throw new UncheckedIOException("Salesforce input stream close failed", e);
+        }
+    }
+
+    private void updateState(ProcessContext context, Map<String, String> newState) {
+        try {
+            context.getStateManager().setState(newState, Scope.CLUSTER);
+        } catch (IOException e) {
+            throw new ProcessException("Last Age Filter state update failed", e);
+        }
+    }
+
+    protected ConvertedSalesforceSchema convertSchema(InputStream describeSObjectResult, String fields) {
+        try {
+            RecordSchema recordSchema = salesForceToRecordSchemaConverter.convertSchema(describeSObjectResult, fields);
+
+            RecordSchema querySObjectResultSchema = new SimpleRecordSchema(Collections.singletonList(
+                    new RecordField(STARTING_FIELD_NAME, RecordFieldType.ARRAY.getArrayDataType(
+                            RecordFieldType.RECORD.getRecordDataType(
+                                    recordSchema
+                            )
+                    ))
+            ));
+
+            return new ConvertedSalesforceSchema(querySObjectResultSchema, recordSchema);
+        } catch (IOException e) {
+            throw new ProcessException("SObject to Record schema conversion failed", e);
+        }
+    }
+
+    protected String buildQuery(
+            String sObject,
+            String fields,
+            String customWhereClause,
+            String ageField,
+            String initialAgeFilter,
+            String ageFilterLower,
+            String ageFilterUpper
+    ) {
+        StringBuilder queryBuilder = new StringBuilder("SELECT ")
+                .append(fields)
+                .append(" FROM ")
+                .append(sObject);
+
+        List<String> whereItems = new ArrayList<>();
+        if (customWhereClause != null) {
+            whereItems.add("( " + customWhereClause + " )");
+        }
+
+        if (ageField != null) {
+            if (ageFilterLower != null) {
+                whereItems.add(ageField + " >= " + ageFilterLower);
+            } else if (initialAgeFilter != null) {
+                whereItems.add(ageField + " >= " + initialAgeFilter);
+            }
+
+            whereItems.add(ageField + " < " + ageFilterUpper);
+        }
+
+        if (!whereItems.isEmpty()) {
+            String finalWhereClause = String.join(" AND ", whereItems);
+            queryBuilder.append(" WHERE ").append(finalWhereClause);
+        }
+
+        return queryBuilder.toString();
+    }
+
+    static class ConvertedSalesforceSchema {
+        RecordSchema querySObjectResultSchema;
+        RecordSchema recordSchema;
+
+        public ConvertedSalesforceSchema(RecordSchema querySObjectResultSchema, RecordSchema recordSchema) {
+            this.querySObjectResultSchema = querySObjectResultSchema;
+            this.recordSchema = recordSchema;
+        }
+    }
+}
diff --git a/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/util/SalesforceRestService.java b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/util/SalesforceRestService.java
new file mode 100644
index 0000000000..bc3f746158
--- /dev/null
+++ b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/util/SalesforceRestService.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.salesforce.util;
+
+import okhttp3.HttpUrl;
+import okhttp3.OkHttpClient;
+import okhttp3.Request;
+import okhttp3.Response;
+import org.apache.nifi.processor.exception.ProcessException;
+
+import java.io.InputStream;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+
+public class SalesforceRestService {
+    private final String version;
+    private final String baseUrl;
+    private final Supplier<String> accessTokenProvider;
+    private final OkHttpClient httpClient;
+
+    public SalesforceRestService(String version, String baseUrl, Supplier<String> accessTokenProvider, int responseTimeoutMillis) {
+        this.version = version;
+        this.baseUrl = baseUrl;
+        this.accessTokenProvider = accessTokenProvider;
+        httpClient = new OkHttpClient.Builder()
+                .readTimeout(responseTimeoutMillis, TimeUnit.MILLISECONDS)
+                .build();
+    }
+
+    public InputStream describeSObject(String sObject) {
+        String url = baseUrl + "/services/data/v" + version + "/sobjects/" + sObject + "/describe?maxRecords=1";
+
+        Request request = new Request.Builder()
+                .addHeader("Authorization", "Bearer " + accessTokenProvider.get())
+                .url(url)
+                .get()
+                .build();
+
+        return request(request);
+    }
+
+    public InputStream query(String query) {
+        String url = baseUrl + "/services/data/v" + version + "/query";
+
+        HttpUrl httpUrl = HttpUrl.get(url).newBuilder()
+                .addQueryParameter("q", query)
+                .build();
+
+        Request request = new Request.Builder()
+                .addHeader("Authorization", "Bearer " + accessTokenProvider.get())
+                .url(httpUrl)
+                .get()
+                .build();
+
+        return request(request);
+    }
+
+    private InputStream request(Request request) {
+        Response response = null;
+        try {
+            response = httpClient.newCall(request).execute();
+            if (response.code() != 200) {
+                throw new ProcessException("Invalid response" +
+                        " Code: " + response.code() +
+                        " Message: " + response.message() +
+                        " Body: " + (response.body() == null ? null : response.body().string())
+                );
+            }
+            return response.body().byteStream();
+        } catch (Exception e) {
+            if (response != null) {
+                response.close();
+            }
+            throw new ProcessException(String.format("Salesforce HTTP request failed [%s]", request.url()), e);
+        }
+    }
+}
diff --git a/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/util/SalesforceToRecordSchemaConverter.java b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/util/SalesforceToRecordSchemaConverter.java
new file mode 100644
index 0000000000..f4396415ea
--- /dev/null
+++ b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/util/SalesforceToRecordSchemaConverter.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.salesforce.util;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.camel.component.salesforce.api.dto.SObjectDescription;
+import org.apache.camel.component.salesforce.api.dto.SObjectField;
+import org.apache.camel.component.salesforce.api.utils.JsonUtils;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class SalesforceToRecordSchemaConverter {
+
+    private final String dateFormat;
+    private final String dateTimeFormat;
+    private final String timeFormat;
+    private final ObjectMapper objectMapper;
+
+    public SalesforceToRecordSchemaConverter(String dateFormat, String dateTimeFormat, String timeFormat) {
+        this.dateFormat = dateFormat;
+        this.dateTimeFormat = dateTimeFormat;
+        this.timeFormat = timeFormat;
+        objectMapper = JsonUtils.createObjectMapper();
+    }
+
+    public RecordSchema convertSchema(final InputStream describeSOjbectResultJsonString, final String fieldNamesOfInterest) throws IOException {
+
+        final SObjectDescription sObjectDescription = objectMapper.readValue(describeSOjbectResultJsonString, SObjectDescription.class);
+        final List<String> listOfFieldNamesOfInterest = Arrays.asList(fieldNamesOfInterest.toLowerCase().split("\\s*,\\s*"));
+        final List<SObjectField> fields = sObjectDescription.getFields()
+                .stream()
+                .filter(sObjectField -> listOfFieldNamesOfInterest.contains(sObjectField.getName().toLowerCase()))
+                .collect(Collectors.toList());
+
+        final List<RecordField> recordFields = new ArrayList<>();
+
+        for (SObjectField field : fields) {
+            final String soapType = field.getSoapType();
+
+            switch (soapType.substring(soapType.indexOf(':') + 1)) {
+                case "ID":
+                case "string":
+                case "json":
+                case "base64Binary":
+                case "anyType":
+                    recordFields.add(new RecordField(field.getName(), RecordFieldType.STRING.getDataType(), field.getDefaultValue(), field.isNillable()));
+                    break;
+                case "int":
+                    recordFields.add(new RecordField(field.getName(), RecordFieldType.INT.getDataType(), field.getDefaultValue(), field.isNillable()));
+                    break;
+                case "long":
+                    recordFields.add(new RecordField(field.getName(), RecordFieldType.LONG.getDataType(), field.getDefaultValue(), field.isNillable()));
+                    break;
+                case "double":
+                    recordFields.add(new RecordField(field.getName(), RecordFieldType.DOUBLE.getDataType(), field.getDefaultValue(), field.isNillable()));
+                    break;
+                case "boolean":
+                    recordFields.add(new RecordField(field.getName(), RecordFieldType.BOOLEAN.getDataType(), field.getDefaultValue(), field.isNillable()));
+                    break;
+                case "date":
+                    recordFields.add(new RecordField(field.getName(), RecordFieldType.DATE.getDataType(dateFormat), field.getDefaultValue(), field.isNillable()));
+                    break;
+                case "dateTime":
+                    recordFields.add(new RecordField(field.getName(), RecordFieldType.TIMESTAMP.getDataType(dateTimeFormat), field.getDefaultValue(), field.isNillable()));
+                    break;
+                case "time":
+                    recordFields.add(new RecordField(field.getName(), RecordFieldType.TIME.getDataType(timeFormat), field.getDefaultValue(), field.isNillable()));
+                    break;
+                case "address":
+                    final RecordSchema addressSchema = new SimpleRecordSchema(Arrays.asList(
+                            new RecordField("city", RecordFieldType.STRING.getDataType(), true),
+                            new RecordField("country", RecordFieldType.STRING.getDataType(), true),
+                            new RecordField("countryCode", RecordFieldType.STRING.getDataType(), true),
+                            new RecordField("postalCode", RecordFieldType.STRING.getDataType(), true),
+                            new RecordField("state", RecordFieldType.STRING.getDataType(), true),
+                            new RecordField("stateCode", RecordFieldType.STRING.getDataType(), true),
+                            new RecordField("street", RecordFieldType.STRING.getDataType(), true),
+                            new RecordField("geocodeAccuracy", RecordFieldType.STRING.getDataType(), true)
+                    ));
+                    recordFields.add(new RecordField(field.getName(), RecordFieldType.RECORD.getRecordDataType(addressSchema), field.getDefaultValue(), field.isNillable()));
+                    break;
+                case "location":
+                    final RecordSchema locationSchema = new SimpleRecordSchema(Arrays.asList(
+                            new RecordField("latitude", RecordFieldType.STRING.getDataType(), true),
+                            new RecordField("longitude", RecordFieldType.STRING.getDataType(), true)
+                    ));
+                    recordFields.add(new RecordField(field.getName(), RecordFieldType.RECORD.getRecordDataType(locationSchema), field.getDefaultValue(), field.isNillable()));
+                    break;
+                default:
+                    throw new IllegalArgumentException(String.format("Could not create determine schema for '%s'. Could not convert field '%s' of soap type '%s'.",
+                            sObjectDescription.getName(), field.getName(), soapType));
+            }
+        }
+
+        return new SimpleRecordSchema(recordFields);
+    }
+}
diff --git a/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
new file mode 100644
index 0000000000..503138fb69
--- /dev/null
+++ b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -0,0 +1,15 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+org.apache.nifi.processors.salesforce.QuerySalesforceObject
diff --git a/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/test/java/org/apache/nifi/processors/salesforce/QuerySalesforceObjectIT.java b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/test/java/org/apache/nifi/processors/salesforce/QuerySalesforceObjectIT.java
new file mode 100644
index 0000000000..547b99d54c
--- /dev/null
+++ b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/test/java/org/apache/nifi/processors/salesforce/QuerySalesforceObjectIT.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.salesforce;
+
+import org.apache.nifi.oauth2.StandardOauth2AccessTokenProvider;
+import org.apache.nifi.processor.Processor;
+import org.apache.nifi.processors.salesforce.util.SalesforceConfigAware;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.record.MockRecordWriter;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
+/**
+ * Set constants in {@link SalesforceConfigAware}
+ */
+class QuerySalesforceObjectIT implements SalesforceConfigAware {
+    private TestRunner runner;
+
+    @BeforeEach
+    void setUp() throws Exception {
+        Processor querySObject = new QuerySalesforceObject();
+
+        runner = TestRunners.newTestRunner(querySObject);
+
+        StandardOauth2AccessTokenProvider oauth2AccessTokenProvider = initOAuth2AccessTokenProvider(runner);
+        runner.setProperty(QuerySalesforceObject.TOKEN_PROVIDER, oauth2AccessTokenProvider.getIdentifier());
+    }
+
+    @AfterEach
+    void tearDown() {
+        runner.shutdown();
+    }
+
+    @Test
+    void retrievesAndWritesRecords() throws Exception {
+        String sObjectName = "Account";
+        String fieldNames = "Id,name,CreatedDate";
+
+        RecordSetWriterFactory writer = new MockRecordWriter();
+        runner.addControllerService("writer", writer);
+        runner.enableControllerService(writer);
+
+        runner.setProperty(QuerySalesforceObject.SOBJECT_NAME, sObjectName);
+        runner.setProperty(QuerySalesforceObject.FIELD_NAMES, fieldNames);
+        runner.setProperty(QuerySalesforceObject.API_VERSION, VERSION);
+        runner.setProperty(QuerySalesforceObject.API_URL, BASE_URL);
+        runner.setProperty(QuerySalesforceObject.RECORD_WRITER, writer.getIdentifier());
+        runner.setProperty(QuerySalesforceObject.AGE_FIELD, "CreatedDate");
+        runner.setProperty(QuerySalesforceObject.INITIAL_AGE_FILTER, "2022-01-06T08:43:24.000+0000");
+
+        runner.run();
+
+        List<MockFlowFile> results = runner.getFlowFilesForRelationship(QuerySalesforceObject.REL_SUCCESS);
+
+        assertNotNull(results.get(0).getContent());
+    }
+}
diff --git a/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/test/java/org/apache/nifi/processors/salesforce/util/SalesforceConfigAware.java b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/test/java/org/apache/nifi/processors/salesforce/util/SalesforceConfigAware.java
new file mode 100644
index 0000000000..c5b73e5897
--- /dev/null
+++ b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/test/java/org/apache/nifi/processors/salesforce/util/SalesforceConfigAware.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.salesforce.util;
+
+import org.apache.nifi.oauth2.StandardOauth2AccessTokenProvider;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.util.TestRunner;
+
+/**
+ * Set the following constants:<p>
+ * VERSION<p>
+ * BASE_URL<p>
+ * USERNAME<p>
+ * PASSWORD<p>
+ * CLIENT_ID<p>
+ * CLIENT_SECRET<p>
+ */
+public interface SalesforceConfigAware {
+    String VERSION = "54.0";
+    String BASE_URL = "https://MyDomainName.my.salesforce.com";
+
+    String AUTHORIZATION_SERVER_URL = BASE_URL + "/services/oauth2/token";
+    String USERNAME = "???";
+    String PASSWORD = "???";
+    String CLIENT_ID = "???";
+    String CLIENT_SECRET = "???";
+
+    default StandardOauth2AccessTokenProvider initOAuth2AccessTokenProvider(TestRunner runner) throws InitializationException {
+        StandardOauth2AccessTokenProvider oauth2AccessTokenProvider = new StandardOauth2AccessTokenProvider();
+
+        runner.addControllerService("oauth2AccessTokenProvider", oauth2AccessTokenProvider);
+
+        runner.setProperty(oauth2AccessTokenProvider, StandardOauth2AccessTokenProvider.AUTHORIZATION_SERVER_URL, AUTHORIZATION_SERVER_URL);
+        runner.setProperty(oauth2AccessTokenProvider, StandardOauth2AccessTokenProvider.USERNAME, USERNAME);
+        runner.setProperty(oauth2AccessTokenProvider, StandardOauth2AccessTokenProvider.PASSWORD, PASSWORD);
+        runner.setProperty(oauth2AccessTokenProvider, StandardOauth2AccessTokenProvider.CLIENT_ID, CLIENT_ID);
+        runner.setProperty(oauth2AccessTokenProvider, StandardOauth2AccessTokenProvider.CLIENT_SECRET, CLIENT_SECRET);
+
+        runner.enableControllerService(oauth2AccessTokenProvider);
+
+        return oauth2AccessTokenProvider;
+    }
+}
diff --git a/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/test/java/org/apache/nifi/processors/salesforce/util/SalesforceRestServiceIT.java b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/test/java/org/apache/nifi/processors/salesforce/util/SalesforceRestServiceIT.java
new file mode 100644
index 0000000000..a82b9ac863
--- /dev/null
+++ b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/test/java/org/apache/nifi/processors/salesforce/util/SalesforceRestServiceIT.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.salesforce.util;
+
+import org.apache.nifi.oauth2.StandardOauth2AccessTokenProvider;
+import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSessionFactory;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
+/**
+ * Set constants in {@link SalesforceConfigAware}
+ */
+class SalesforceRestServiceIT implements SalesforceConfigAware {
+    private TestRunner runner;
+    private SalesforceRestService testSubject;
+
+    @BeforeEach
+    void setUp() throws Exception {
+        runner = TestRunners.newTestRunner(new AbstractSessionFactoryProcessor() {
+            @Override
+            public void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) throws ProcessException {
+            }
+        });
+
+        StandardOauth2AccessTokenProvider oauth2AccessTokenProvider = initOAuth2AccessTokenProvider(runner);
+
+        testSubject = new SalesforceRestService(
+                VERSION,
+                BASE_URL,
+                () -> oauth2AccessTokenProvider.getAccessDetails().getAccessToken(),
+                5_000
+        );
+    }
+
+    @AfterEach
+    void tearDown() {
+        runner.shutdown();
+    }
+
+    @Test
+    void describeSObjectSucceeds() throws IOException {
+        try (InputStream describeSObjectResultJson = testSubject.describeSObject("Account")) {
+            assertNotNull(describeSObjectResultJson);
+        }
+    }
+
+    @Test
+    void querySucceeds() throws IOException {
+        String query = "SELECT id,BillingAddress FROM Account";
+
+        try (InputStream querySObjectRecordsResultJson = testSubject.query(query)) {
+            assertNotNull(querySObjectRecordsResultJson);
+        }
+    }
+}
diff --git a/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/test/java/org/apache/nifi/processors/salesforce/util/SalesforceToRecordSchemaConverterTest.java b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/test/java/org/apache/nifi/processors/salesforce/util/SalesforceToRecordSchemaConverterTest.java
new file mode 100644
index 0000000000..f65506a0aa
--- /dev/null
+++ b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/test/java/org/apache/nifi/processors/salesforce/util/SalesforceToRecordSchemaConverterTest.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.salesforce.util;
+
+import com.fasterxml.jackson.databind.exc.MismatchedInputException;
+import org.apache.commons.io.IOUtils;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.junit.jupiter.api.Test;
+
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.Charset;
+import java.util.Arrays;
+import java.util.Collections;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+class SalesforceToRecordSchemaConverterTest {
+
+    private static final String TEST_PATH = "src/test/resources/converter/";
+    private static final String DATE_FORMAT = "yyyy-mm-dd";
+    private static final String TIME_STAMP_FORMAT = "yyyy-mm-dd / hh:mm:ss";
+    private static final String TIME_FORMAT = "hh:mm:ss";
+
+    private static final SalesforceToRecordSchemaConverter converter = new SalesforceToRecordSchemaConverter(DATE_FORMAT, TIME_STAMP_FORMAT, TIME_FORMAT);
+
+    @Test
+    void testConvertSchema() throws IOException {
+        final String salesforceSchemaFileName = "simple_sf_schema.json";
+        final String fieldNames = "ExampleInt,ExampleLong,ExampleDouble,ExampleBoolean," +
+                "ExampleID,ExampleString,ExampleJson,ExampleBase64Binary,ExampleAnyType," +
+                "ExampleDate,ExampleDateTime,ExampleTime";
+
+        final RecordSchema expected = new SimpleRecordSchema(Arrays.asList(
+                // primitives
+                new RecordField("ExampleInt", RecordFieldType.INT.getDataType()),
+                new RecordField("ExampleLong", RecordFieldType.LONG.getDataType()),
+                new RecordField("ExampleDouble", RecordFieldType.DOUBLE.getDataType()),
+                new RecordField("ExampleBoolean", RecordFieldType.BOOLEAN.getDataType()),
+                // string types
+                new RecordField("ExampleID", RecordFieldType.STRING.getDataType()),
+                new RecordField("ExampleString", RecordFieldType.STRING.getDataType()),
+                new RecordField("ExampleJson", RecordFieldType.STRING.getDataType()),
+                new RecordField("ExampleBase64Binary", RecordFieldType.STRING.getDataType()),
+                new RecordField("ExampleAnyType", RecordFieldType.STRING.getDataType()),
+                // date types
+                new RecordField("ExampleDate", RecordFieldType.DATE.getDataType(DATE_FORMAT)),
+                new RecordField("ExampleDateTime", RecordFieldType.TIMESTAMP.getDataType(TIME_STAMP_FORMAT)),
+                new RecordField("ExampleTime", RecordFieldType.TIME.getDataType(TIME_FORMAT))
+        ));
+
+        try (final InputStream sfSchema = readFile(TEST_PATH + salesforceSchemaFileName)) {
+            final RecordSchema actual = converter.convertSchema(sfSchema, fieldNames);
+
+            assertEquals(expected, actual);
+        }
+    }
+
+    @Test
+    void testConvertComplexTypes() throws IOException {
+        final String salesforceSchemaFileName = "complex_sf_schema.json";
+        final String fieldNames = "ExampleAddress,ExampleLocation";
+
+        final RecordSchema addressSchema = new SimpleRecordSchema(Arrays.asList(
+                new RecordField("city", RecordFieldType.STRING.getDataType(), true),
+                new RecordField("country", RecordFieldType.STRING.getDataType(), true),
+                new RecordField("countryCode", RecordFieldType.STRING.getDataType(), true),
+                new RecordField("postalCode", RecordFieldType.STRING.getDataType(), true),
+                new RecordField("state", RecordFieldType.STRING.getDataType(), true),
+                new RecordField("stateCode", RecordFieldType.STRING.getDataType(), true),
+                new RecordField("street", RecordFieldType.STRING.getDataType(), true),
+                new RecordField("geocodeAccuracy", RecordFieldType.STRING.getDataType(), true)
+        ));
+
+        final RecordSchema locationSchema = new SimpleRecordSchema(Arrays.asList(
+                new RecordField("latitude", RecordFieldType.STRING.getDataType(), true),
+                new RecordField("longitude", RecordFieldType.STRING.getDataType(), true)
+        ));
+
+        RecordSchema expected = new SimpleRecordSchema(Arrays.asList(
+                new RecordField("ExampleAddress", RecordFieldType.RECORD.getRecordDataType(addressSchema)),
+                new RecordField("ExampleLocation", RecordFieldType.RECORD.getRecordDataType(locationSchema))
+        ));
+
+        try (final InputStream sfSchema = readFile(TEST_PATH + salesforceSchemaFileName)) {
+            final RecordSchema actual = converter.convertSchema(sfSchema, fieldNames);
+
+            assertEquals(expected, actual);
+        }
+    }
+
+    @Test
+    void testSelectFields() throws IOException {
+        final String salesforceSchemaFileName = "simple_sf_schema.json";
+        final String fieldNames = "ExampleInt,ExampleTime";
+
+        final RecordSchema expected = new SimpleRecordSchema(Arrays.asList(
+                new RecordField("ExampleInt", RecordFieldType.INT.getDataType()),
+                new RecordField("ExampleTime", RecordFieldType.TIME.getDataType(TIME_FORMAT))
+        ));
+
+        try (final InputStream sfSchema = readFile(TEST_PATH + salesforceSchemaFileName)) {
+            final RecordSchema actual = converter.convertSchema(sfSchema, fieldNames);
+
+            assertEquals(expected, actual);
+        }
+    }
+
+    @Test
+    void testSelectEmptyFields() throws IOException {
+        final String salesforceSchemaFileName = "simple_sf_schema.json";
+        final String fieldNames = "";
+
+        final RecordSchema expected = new SimpleRecordSchema(Collections.emptyList());
+
+        try (final InputStream sfSchema = readFile(TEST_PATH + salesforceSchemaFileName)) {
+            final RecordSchema actual = converter.convertSchema(sfSchema, fieldNames);
+
+            assertEquals(expected, actual);
+        }
+    }
+
+    @Test
+    void testConvertEmptySchema() throws IOException {
+        try (final InputStream sfSchema = IOUtils.toInputStream("", Charset.defaultCharset())) {
+            assertThrows(MismatchedInputException.class, () -> converter.convertSchema(sfSchema, "ExampleField"));
+        }
+    }
+
+    @Test
+    void testConvertNullSchema() {
+        final InputStream sfSchema = null;
+        assertThrows(IllegalArgumentException.class, () -> converter.convertSchema(sfSchema, "ExampleField"));
+    }
+
+    @Test
+    void testConvertUnknownDataType() throws IOException {
+        try (final InputStream sfSchema = readFile(TEST_PATH + "unknown_type_sf_schema.json")) {
+            final String fieldNames = "FieldWithUnknownType";
+            final IllegalArgumentException exception = assertThrows(IllegalArgumentException.class, () -> converter.convertSchema(sfSchema, fieldNames));
+            final String errorMessage = "Could not create determine schema for 'SObjectWithUnknownFieldType'. Could not convert field 'FieldWithUnknownType' of soap type 'xsd:unknown'.";
+            assertEquals(errorMessage, exception.getMessage());
+        }
+    }
+
+    private InputStream readFile(final String path) throws IOException {
+        return new FileInputStream(path);
+    }
+
+}
diff --git a/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/test/resources/converter/complex_sf_schema.json b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/test/resources/converter/complex_sf_schema.json
new file mode 100644
index 0000000000..cc62d1b51f
--- /dev/null
+++ b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/test/resources/converter/complex_sf_schema.json
@@ -0,0 +1,16 @@
+{
+  "fields": [
+    {
+      "defaultValue": null,
+      "name": "ExampleAddress",
+      "nillable": true,
+      "soapType": "urn:address"
+    },
+    {
+      "defaultValue": null,
+      "name": "ExampleLocation",
+      "nillable": true,
+      "soapType": "tns:location"
+    }
+  ]
+}
diff --git a/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/test/resources/converter/simple_sf_schema.json b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/test/resources/converter/simple_sf_schema.json
new file mode 100644
index 0000000000..82073792c6
--- /dev/null
+++ b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/test/resources/converter/simple_sf_schema.json
@@ -0,0 +1,76 @@
+{
+  "fields": [
+    {
+      "defaultValue": null,
+      "name": "ExampleInt",
+      "nillable": true,
+      "soapType": "xns:int"
+    },
+    {
+      "defaultValue": null,
+      "name": "ExampleLong",
+      "nillable": true,
+      "soapType": "xns:long"
+    },
+    {
+      "defaultValue": null,
+      "name": "ExampleDouble",
+      "nillable": true,
+      "soapType": "xns:double"
+    },
+    {
+      "defaultValue": null,
+      "name": "ExampleBoolean",
+      "nillable": true,
+      "soapType": "xns:boolean"
+    },
+    {
+      "defaultValue": null,
+      "name": "ExampleID",
+      "nillable": true,
+      "soapType": "xns:ID"
+    },
+    {
+      "defaultValue": null,
+      "name": "ExampleString",
+      "nillable": true,
+      "soapType": "xns:string"
+    },
+    {
+      "defaultValue": null,
+      "name": "ExampleJson",
+      "nillable": true,
+      "soapType": "xns:json"
+    },
+    {
+      "defaultValue": null,
+      "name": "ExampleBase64Binary",
+      "nillable": true,
+      "soapType": "xns:base64Binary"
+    },
+    {
+      "defaultValue": null,
+      "name": "ExampleAnyType",
+      "nillable": true,
+      "soapType": "xns:anyType"
+    },
+    {
+      "defaultValue": null,
+      "name": "ExampleDate",
+      "nillable": true,
+      "soapType": "xsd:date"
+    },
+    {
+      "defaultValue": null,
+      "name": "ExampleDateTime",
+      "nillable": true,
+      "soapType": "xsd:dateTime"
+    },
+    {
+      "defaultValue": null,
+      "name": "ExampleTime",
+      "nillable": true,
+      "soapType": "xsd:time"
+    }
+  ]
+}
diff --git a/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/test/resources/converter/unknown_type_sf_schema.json b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/test/resources/converter/unknown_type_sf_schema.json
new file mode 100644
index 0000000000..4416a35f46
--- /dev/null
+++ b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/test/resources/converter/unknown_type_sf_schema.json
@@ -0,0 +1,11 @@
+{
+  "name": "SObjectWithUnknownFieldType",
+  "fields": [
+    {
+      "defaultValue": null,
+      "name": "FieldWithUnknownType",
+      "nillable": true,
+      "soapType": "xsd:unknown"
+    }
+  ]
+}
diff --git a/nifi-nar-bundles/nifi-salesforce-bundle/pom.xml b/nifi-nar-bundles/nifi-salesforce-bundle/pom.xml
new file mode 100644
index 0000000000..f07b3c1a6f
--- /dev/null
+++ b/nifi-nar-bundles/nifi-salesforce-bundle/pom.xml
@@ -0,0 +1,32 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements. See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License. You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>nifi-nar-bundles</artifactId>
+        <groupId>org.apache.nifi</groupId>
+        <version>1.17.0-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>nifi-salesforce-bundle</artifactId>
+    <packaging>pom</packaging>
+    <modules>
+        <module>nifi-salesforce-nar</module>
+        <module>nifi-salesforce-processors</module>
+    </modules>
+</project>
diff --git a/nifi-nar-bundles/pom.xml b/nifi-nar-bundles/pom.xml
index fa49f9c1dc..e92f54ff3e 100755
--- a/nifi-nar-bundles/pom.xml
+++ b/nifi-nar-bundles/pom.xml
@@ -110,6 +110,7 @@
         <module>nifi-stateless-processor-bundle</module>
         <module>nifi-geohash-bundle</module>
         <module>nifi-snowflake-bundle</module>
+        <module>nifi-salesforce-bundle</module>
     </modules>
 
     <build>