You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ex...@apache.org on 2022/04/20 19:03:30 UTC
[nifi] branch main updated: NIFI-9720 Added QuerySalesforceObject Processor
This is an automated email from the ASF dual-hosted git repository.
exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 73ee1a9a1e NIFI-9720 Added QuerySalesforceObject Processor
73ee1a9a1e is described below
commit 73ee1a9a1e355d31f348a82acdfff4a387ef485b
Author: Lehel <Le...@hotmail.com>
AuthorDate: Wed Feb 23 16:07:00 2022 +0100
NIFI-9720 Added QuerySalesforceObject Processor
This closes #5802
Signed-off-by: David Handermann <ex...@apache.org>
---
nifi-assembly/pom.xml | 6 +
.../nifi-salesforce-nar/pom.xml | 46 ++
.../nifi-salesforce-processors/pom.xml | 122 ++++++
.../salesforce/QuerySalesforceObject.java | 486 +++++++++++++++++++++
.../salesforce/util/SalesforceRestService.java | 91 ++++
.../util/SalesforceToRecordSchemaConverter.java | 120 +++++
.../services/org.apache.nifi.processor.Processor | 15 +
.../salesforce/QuerySalesforceObjectIT.java | 79 ++++
.../salesforce/util/SalesforceConfigAware.java | 57 +++
.../salesforce/util/SalesforceRestServiceIT.java | 80 ++++
.../SalesforceToRecordSchemaConverterTest.java | 169 +++++++
.../resources/converter/complex_sf_schema.json | 16 +
.../test/resources/converter/simple_sf_schema.json | 76 ++++
.../converter/unknown_type_sf_schema.json | 11 +
nifi-nar-bundles/nifi-salesforce-bundle/pom.xml | 32 ++
nifi-nar-bundles/pom.xml | 1 +
16 files changed, 1407 insertions(+)
diff --git a/nifi-assembly/pom.xml b/nifi-assembly/pom.xml
index becd82c40e..0501212288 100644
--- a/nifi-assembly/pom.xml
+++ b/nifi-assembly/pom.xml
@@ -839,6 +839,12 @@ language governing permissions and limitations under the License. -->
<version>1.17.0-SNAPSHOT</version>
<type>nar</type>
</dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-salesforce-nar</artifactId>
+ <version>1.17.0-SNAPSHOT</version>
+ <type>nar</type>
+ </dependency>
<!-- dependencies for jaxb/activation/annotation for running NiFi on Java 11 -->
<!-- TODO: remove these once minimum Java version is 11 -->
<dependency>
diff --git a/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-nar/pom.xml b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-nar/pom.xml
new file mode 100644
index 0000000000..68baa4d07d
--- /dev/null
+++ b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-nar/pom.xml
@@ -0,0 +1,46 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>nifi-salesforce-bundle</artifactId>
+ <groupId>org.apache.nifi</groupId>
+ <version>1.17.0-SNAPSHOT</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>nifi-salesforce-nar</artifactId>
+
+ <packaging>nar</packaging>
+ <properties>
+ <maven.javadoc.skip>true</maven.javadoc.skip>
+ <source.skip>true</source.skip>
+ </properties>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-salesforce-processors</artifactId>
+ <version>1.17.0-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-record-serialization-services-nar</artifactId>
+ <version>1.17.0-SNAPSHOT</version>
+ <type>nar</type>
+ </dependency>
+ </dependencies>
+</project>
diff --git a/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/pom.xml b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/pom.xml
new file mode 100644
index 0000000000..07071778ee
--- /dev/null
+++ b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/pom.xml
@@ -0,0 +1,122 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xmlns="http://maven.apache.org/POM/4.0.0"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>nifi-salesforce-bundle</artifactId>
+ <groupId>org.apache.nifi</groupId>
+ <version>1.17.0-SNAPSHOT</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>nifi-salesforce-processors</artifactId>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.squareup.okhttp3</groupId>
+ <artifactId>okhttp</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-oauth2-provider-api</artifactId>
+ <version>1.17.0-SNAPSHOT</version>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-record</artifactId>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.camel</groupId>
+ <artifactId>camel-salesforce</artifactId>
+ <version>3.14.1</version>
+ <exclusions>
+ <exclusion>
+ <groupId>*</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>com.fasterxml.jackson.datatype</groupId>
+ <artifactId>jackson-datatype-jsr310</artifactId>
+ <version>${jackson.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.fasterxml.jackson.module</groupId>
+ <artifactId>jackson-module-jsonSchema</artifactId>
+ <version>${jackson.version}</version>
+ </dependency>
+ <!-- Provided from parent nar -->
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-record-serialization-service-api</artifactId>
+ <version>1.17.0-SNAPSHOT</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-record-serialization-services</artifactId>
+ <version>1.17.0-SNAPSHOT</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.squareup.okhttp3</groupId>
+ <artifactId>mockwebserver</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <!-- Test dependencies -->
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-mock</artifactId>
+ <version>1.17.0-SNAPSHOT</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-oauth2-provider-service</artifactId>
+ <version>1.17.0-SNAPSHOT</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-mock-record-utils</artifactId>
+ <version>1.17.0-SNAPSHOT</version>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.rat</groupId>
+ <artifactId>apache-rat-plugin</artifactId>
+ <configuration>
+ <excludes>
+ <!-- test data -->
+ <exclude>src/test/resources/**/*</exclude>
+ </excludes>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+</project>
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/QuerySalesforceObject.java b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/QuerySalesforceObject.java
new file mode 100644
index 0000000000..89d675e1fc
--- /dev/null
+++ b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/QuerySalesforceObject.java
@@ -0,0 +1,486 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.salesforce;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.PrimaryNodeOnly;
+import org.apache.nifi.annotation.behavior.Stateful;
+import org.apache.nifi.annotation.behavior.TriggerSerially;
+import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.components.state.StateMap;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.json.JsonTreeRowRecordReader;
+import org.apache.nifi.json.StartingFieldStrategy;
+import org.apache.nifi.oauth2.OAuth2AccessTokenProvider;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.salesforce.util.SalesforceRestService;
+import org.apache.nifi.processors.salesforce.util.SalesforceToRecordSchemaConverter;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.WriteResult;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.UncheckedIOException;
+import java.time.OffsetDateTime;
+import java.time.format.DateTimeFormatter;
+import java.time.temporal.ChronoUnit;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+@PrimaryNodeOnly
+@TriggerSerially
+@TriggerWhenEmpty
+@InputRequirement(Requirement.INPUT_FORBIDDEN)
+@Tags({"salesforce", "sobject", "soql", "query"})
+@CapabilityDescription("Retrieves records from a Salesforce sObject. Users can add arbitrary filter conditions by setting the 'Custom WHERE Condition' property."
+ + " Supports incremental retrieval: users can define a field in the 'Age Field' property that will be used to determine when the record was created."
+ + " When this property is set the processor will retrieve new records. It's also possible to define an initial cutoff value for the age, fitering out all older records"
+ + " even for the first run. This processor is intended to be run on the Primary Node only."
+ + " FlowFile attribute 'record.count' indicates how many records were retrieved and written to the output.")
+@Stateful(scopes = Scope.CLUSTER, description = "When 'Age Field' is set, after performing a query the time of execution is stored. Subsequent queries will be augmented"
+ + " with an additional condition so that only records that are newer than the stored execution time (adjusted with the optional value of 'Age Delay') will be retrieved."
+ + " State is stored across the cluster so that this Processor can be run on Primary Node only and if a new Primary Node is selected,"
+ + " the new node can pick up where the previous node left off, without duplicating the data.")
+@WritesAttributes({
+ @WritesAttribute(attribute = "mime.type", description = "Sets the mime.type attribute to the MIME Type specified by the Record Writer."),
+ @WritesAttribute(attribute = "record.count", description = "Sets the number of records in the FlowFile.")
+})
+public class QuerySalesforceObject extends AbstractProcessor {
+
+ static final PropertyDescriptor API_URL = new PropertyDescriptor.Builder()
+ .name("salesforce-url")
+ .displayName("URL")
+ .description("The URL for the Salesforce REST API including the domain without additional path information, such as https://MyDomainName.my.salesforce.com")
+ .required(true)
+ .addValidator(StandardValidators.URL_VALIDATOR)
+ .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .build();
+
+ static final PropertyDescriptor API_VERSION = new PropertyDescriptor.Builder()
+ .name("salesforce-api-version")
+ .displayName("API Version")
+ .description("The version number of the Salesforce REST API appended to the URL after the services/data path. See Salesforce documentation for supported versions")
+ .required(true)
+ .addValidator(StandardValidators.NUMBER_VALIDATOR)
+ .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .defaultValue("54.0")
+ .build();
+
+ static final PropertyDescriptor SOBJECT_NAME = new PropertyDescriptor.Builder()
+ .name("sobject-name")
+ .displayName("sObject Name")
+ .description("The Salesforce sObject to be queried")
+ .required(true)
+ .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+ .build();
+
+ static final PropertyDescriptor FIELD_NAMES = new PropertyDescriptor.Builder()
+ .name("field-names")
+ .displayName("Field Names")
+ .description("Comma-separated list of field names requested from the sObject to be queried")
+ .required(true)
+ .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+ .build();
+
+ static final PropertyDescriptor READ_TIMEOUT = new PropertyDescriptor.Builder()
+ .name("read-timeout")
+ .displayName("Read Timeout")
+ .description("Maximum time allowed for reading a response from the Salesforce REST API")
+ .required(true)
+ .defaultValue("15 s")
+ .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+ .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .build();
+
+ static final PropertyDescriptor TOKEN_PROVIDER = new PropertyDescriptor.Builder()
+ .name("oauth2-access-token-provider")
+ .displayName("OAuth2 Access Token Provider")
+ .description("Service providing OAuth2 Access Tokens for authenticating using the HTTP Authorization Header")
+ .identifiesControllerService(OAuth2AccessTokenProvider.class)
+ .required(true)
+ .build();
+
+ static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder()
+ .name("record-writer")
+ .displayName("Record Writer")
+ .description("Service used for writing records returned from the Salesforce REST API")
+ .identifiesControllerService(RecordSetWriterFactory.class)
+ .required(true)
+ .build();
+
+ static final PropertyDescriptor CREATE_ZERO_RECORD_FILES = new PropertyDescriptor.Builder()
+ .name("create-zero-record-files")
+ .displayName("Create Zero Record FlowFiles")
+ .description("Specifies whether or not to create a FlowFile when the Salesforce REST API does not return any records")
+ .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+ .allowableValues("true", "false")
+ .defaultValue("false")
+ .required(true)
+ .build();
+
+ static final PropertyDescriptor AGE_FIELD = new PropertyDescriptor.Builder()
+ .name("age-field")
+ .displayName("Age Field")
+ .description("The name of a TIMESTAMP field that will be used to limit all and filter already retrieved records."
+ + " Only records that are older than the previous run time of this processor will be retrieved."
+ )
+ .required(false)
+ .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+ .build();
+
+ static final PropertyDescriptor AGE_DELAY = new PropertyDescriptor.Builder()
+ .name("age-delay")
+ .displayName("Age Delay")
+ .description("When 'Age Field' is set the age-based filter will be adjusted by this amount."
+ + " Only records that are older than the previous run time of this processor, by at least this amount, will be retrieved."
+ )
+ .required(false)
+ .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+ .dependsOn(AGE_FIELD)
+ .build();
+
+ static final PropertyDescriptor INITIAL_AGE_FILTER = new PropertyDescriptor.Builder()
+ .name("initial-age-filter")
+ .displayName("Initial Age Filter")
+ .description("When 'Age Field' is set the value of this property will serve as a filter when this processor runs the first time."
+ + " Only records that are older than this value be retrieved."
+ )
+ .required(false)
+ .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+ .dependsOn(AGE_FIELD)
+ .build();
+
+ static final PropertyDescriptor CUSTOM_WHERE_CONDITION = new PropertyDescriptor.Builder()
+ .name("custom-where-condition")
+ .displayName("Custom WHERE Condition")
+ .description("A custom expression to be added in the WHERE clause of the query")
+ .required(false)
+ .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+
+ static final Relationship REL_SUCCESS = new Relationship.Builder()
+ .name("success")
+ .description("For FlowFiles created as a result of a successful query.")
+ .build();
+
+ private static final String LAST_AGE_FILTER = "last_age_filter";
+ private static final String STARTING_FIELD_NAME = "records";
+ private static final String DATE_FORMAT = "yyyy-MM-dd";
+ private static final String TIME_FORMAT = "HH:mm:ss.SSSX";
+ private static final String DATE_TIME_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSSZZZZ";
+
+ private volatile SalesforceToRecordSchemaConverter salesForceToRecordSchemaConverter;
+ private volatile SalesforceRestService salesforceRestService;
+
+ @OnScheduled
+ public void onScheduled(final ProcessContext context) {
+ salesForceToRecordSchemaConverter = new SalesforceToRecordSchemaConverter(
+ DATE_FORMAT,
+ DATE_TIME_FORMAT,
+ TIME_FORMAT
+ );
+
+ String salesforceVersion = context.getProperty(API_VERSION).getValue();
+ String baseUrl = context.getProperty(API_URL).getValue();
+ OAuth2AccessTokenProvider accessTokenProvider = context.getProperty(TOKEN_PROVIDER).asControllerService(OAuth2AccessTokenProvider.class);
+
+ salesforceRestService = new SalesforceRestService(
+ salesforceVersion,
+ baseUrl,
+ () -> accessTokenProvider.getAccessDetails().getAccessToken(),
+ context.getProperty(READ_TIMEOUT).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS).intValue()
+ );
+ }
+
+ @Override
+ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return Collections.unmodifiableList(Arrays.asList(
+ API_URL,
+ API_VERSION,
+ SOBJECT_NAME,
+ FIELD_NAMES,
+ READ_TIMEOUT,
+ TOKEN_PROVIDER,
+ RECORD_WRITER,
+ CREATE_ZERO_RECORD_FILES,
+ AGE_FIELD,
+ INITIAL_AGE_FILTER,
+ AGE_DELAY,
+ CUSTOM_WHERE_CONDITION
+ ));
+ }
+
+ @Override
+ public Set<Relationship> getRelationships() {
+ final Set<Relationship> relationships = new HashSet<>();
+ relationships.add(REL_SUCCESS);
+ return relationships;
+ }
+
+ @Override
+ protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
+ final List<ValidationResult> results = new ArrayList<>(super.customValidate(validationContext));
+ if (validationContext.getProperty(INITIAL_AGE_FILTER).isSet() && !validationContext.getProperty(AGE_FIELD).isSet()) {
+ results.add(
+ new ValidationResult.Builder()
+ .subject(INITIAL_AGE_FILTER.getDisplayName())
+ .valid(false)
+ .explanation("it requires " + AGE_FIELD.getDisplayName() + " also to be set.")
+ .build()
+ );
+ }
+ return results;
+ }
+
+ @Override
+ public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+ String sObject = context.getProperty(SOBJECT_NAME).getValue();
+ String fields = context.getProperty(FIELD_NAMES).getValue();
+ String customWhereClause = context.getProperty(CUSTOM_WHERE_CONDITION).getValue();
+ RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
+ boolean createZeroRecordFlowFiles = context.getProperty(CREATE_ZERO_RECORD_FILES).asBoolean();
+
+ String ageField = context.getProperty(AGE_FIELD).getValue();
+ String initialAgeFilter = context.getProperty(INITIAL_AGE_FILTER).getValue();
+ Long ageDelayMs = context.getProperty(AGE_DELAY).asTimePeriod(TimeUnit.MILLISECONDS);
+
+ String ageFilterLower;
+ StateMap state;
+ try {
+ state = context.getStateManager().getState(Scope.CLUSTER);
+ ageFilterLower = state.get(LAST_AGE_FILTER);
+ } catch (IOException e) {
+ throw new ProcessException("Last Age Filter state retrieval failed", e);
+ }
+
+ String ageFilterUpper;
+ if (ageField == null) {
+ ageFilterUpper = null;
+ } else {
+ OffsetDateTime ageFilterUpperTime;
+ if (ageDelayMs == null) {
+ ageFilterUpperTime = OffsetDateTime.now();
+ } else {
+ ageFilterUpperTime = OffsetDateTime.now().minus(ageDelayMs, ChronoUnit.MILLIS);
+ }
+ ageFilterUpper = ageFilterUpperTime.format(DateTimeFormatter.ISO_OFFSET_DATE_TIME);
+ }
+
+ ConvertedSalesforceSchema convertedSalesforceSchema = getConvertedSalesforceSchema(sObject, fields);
+
+ String querySObject = buildQuery(
+ sObject,
+ fields,
+ customWhereClause,
+ ageField,
+ initialAgeFilter,
+ ageFilterLower,
+ ageFilterUpper
+ );
+
+ FlowFile flowFile = session.create();
+
+ Map<String, String> originalAttributes = flowFile.getAttributes();
+ Map<String, String> attributes = new HashMap<>();
+
+ AtomicInteger recordCountHolder = new AtomicInteger();
+
+ flowFile = session.write(flowFile, out -> {
+ try (
+ InputStream querySObjectResultInputStream = salesforceRestService.query(querySObject);
+ JsonTreeRowRecordReader jsonReader = new JsonTreeRowRecordReader(
+ querySObjectResultInputStream,
+ getLogger(),
+ convertedSalesforceSchema.querySObjectResultSchema,
+ DATE_FORMAT,
+ TIME_FORMAT,
+ DATE_TIME_FORMAT,
+ StartingFieldStrategy.NESTED_FIELD,
+ STARTING_FIELD_NAME
+ );
+
+ RecordSetWriter writer = writerFactory.createWriter(
+ getLogger(),
+ writerFactory.getSchema(
+ originalAttributes,
+ convertedSalesforceSchema.recordSchema
+ ),
+ out,
+ originalAttributes
+ )
+ ) {
+ writer.beginRecordSet();
+
+ Record querySObjectRecord;
+ while ((querySObjectRecord = jsonReader.nextRecord()) != null) {
+ writer.write(querySObjectRecord);
+ }
+
+ WriteResult writeResult = writer.finishRecordSet();
+
+ attributes.put("record.count", String.valueOf(writeResult.getRecordCount()));
+ attributes.put(CoreAttributes.MIME_TYPE.key(), writer.getMimeType());
+ attributes.putAll(writeResult.getAttributes());
+
+ recordCountHolder.set(writeResult.getRecordCount());
+
+ if (ageFilterUpper != null) {
+ Map<String, String> newState = new HashMap<>(state.toMap());
+ newState.put(LAST_AGE_FILTER, ageFilterUpper);
+ updateState(context, newState);
+ }
+ } catch (SchemaNotFoundException e) {
+ throw new ProcessException("Couldn't create record writer", e);
+ } catch (MalformedRecordException e) {
+ throw new ProcessException("Couldn't read records from input", e);
+ }
+ });
+
+ int recordCount = recordCountHolder.get();
+
+ if (!createZeroRecordFlowFiles && recordCount == 0) {
+ session.remove(flowFile);
+ } else {
+ flowFile = session.putAllAttributes(flowFile, attributes);
+ session.transfer(flowFile, REL_SUCCESS);
+
+ session.adjustCounter("Records Processed", recordCount, false);
+ getLogger().info("Successfully written {} records for {}", recordCount, flowFile);
+ }
+ }
+
+ private ConvertedSalesforceSchema getConvertedSalesforceSchema(String sObject, String fields) {
+ try (InputStream describeSObjectResult = salesforceRestService.describeSObject(sObject)) {
+ return convertSchema(describeSObjectResult, fields);
+ } catch (IOException e) {
+ throw new UncheckedIOException("Salesforce input stream close failed", e);
+ }
+ }
+
+ private void updateState(ProcessContext context, Map<String, String> newState) {
+ try {
+ context.getStateManager().setState(newState, Scope.CLUSTER);
+ } catch (IOException e) {
+ throw new ProcessException("Last Age Filter state update failed", e);
+ }
+ }
+
+ protected ConvertedSalesforceSchema convertSchema(InputStream describeSObjectResult, String fields) {
+ try {
+ RecordSchema recordSchema = salesForceToRecordSchemaConverter.convertSchema(describeSObjectResult, fields);
+
+ RecordSchema querySObjectResultSchema = new SimpleRecordSchema(Collections.singletonList(
+ new RecordField(STARTING_FIELD_NAME, RecordFieldType.ARRAY.getArrayDataType(
+ RecordFieldType.RECORD.getRecordDataType(
+ recordSchema
+ )
+ ))
+ ));
+
+ return new ConvertedSalesforceSchema(querySObjectResultSchema, recordSchema);
+ } catch (IOException e) {
+ throw new ProcessException("SObject to Record schema conversion failed", e);
+ }
+ }
+
+ protected String buildQuery(
+ String sObject,
+ String fields,
+ String customWhereClause,
+ String ageField,
+ String initialAgeFilter,
+ String ageFilterLower,
+ String ageFilterUpper
+ ) {
+ StringBuilder queryBuilder = new StringBuilder("SELECT ")
+ .append(fields)
+ .append(" FROM ")
+ .append(sObject);
+
+ List<String> whereItems = new ArrayList<>();
+ if (customWhereClause != null) {
+ whereItems.add("( " + customWhereClause + " )");
+ }
+
+ if (ageField != null) {
+ if (ageFilterLower != null) {
+ whereItems.add(ageField + " >= " + ageFilterLower);
+ } else if (initialAgeFilter != null) {
+ whereItems.add(ageField + " >= " + initialAgeFilter);
+ }
+
+ whereItems.add(ageField + " < " + ageFilterUpper);
+ }
+
+ if (!whereItems.isEmpty()) {
+ String finalWhereClause = String.join(" AND ", whereItems);
+ queryBuilder.append(" WHERE ").append(finalWhereClause);
+ }
+
+ return queryBuilder.toString();
+ }
+
+ static class ConvertedSalesforceSchema {
+ RecordSchema querySObjectResultSchema;
+ RecordSchema recordSchema;
+
+ public ConvertedSalesforceSchema(RecordSchema querySObjectResultSchema, RecordSchema recordSchema) {
+ this.querySObjectResultSchema = querySObjectResultSchema;
+ this.recordSchema = recordSchema;
+ }
+ }
+}
diff --git a/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/util/SalesforceRestService.java b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/util/SalesforceRestService.java
new file mode 100644
index 0000000000..bc3f746158
--- /dev/null
+++ b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/util/SalesforceRestService.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.salesforce.util;
+
+import okhttp3.HttpUrl;
+import okhttp3.OkHttpClient;
+import okhttp3.Request;
+import okhttp3.Response;
+import org.apache.nifi.processor.exception.ProcessException;
+
+import java.io.InputStream;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+
+public class SalesforceRestService {
+ private final String version;
+ private final String baseUrl;
+ private final Supplier<String> accessTokenProvider;
+ private final OkHttpClient httpClient;
+
+ public SalesforceRestService(String version, String baseUrl, Supplier<String> accessTokenProvider, int responseTimeoutMillis) {
+ this.version = version;
+ this.baseUrl = baseUrl;
+ this.accessTokenProvider = accessTokenProvider;
+ httpClient = new OkHttpClient.Builder()
+ .readTimeout(responseTimeoutMillis, TimeUnit.MILLISECONDS)
+ .build();
+ }
+
+ public InputStream describeSObject(String sObject) {
+ String url = baseUrl + "/services/data/v" + version + "/sobjects/" + sObject + "/describe?maxRecords=1";
+
+ Request request = new Request.Builder()
+ .addHeader("Authorization", "Bearer " + accessTokenProvider.get())
+ .url(url)
+ .get()
+ .build();
+
+ return request(request);
+ }
+
+ public InputStream query(String query) {
+ String url = baseUrl + "/services/data/v" + version + "/query";
+
+ HttpUrl httpUrl = HttpUrl.get(url).newBuilder()
+ .addQueryParameter("q", query)
+ .build();
+
+ Request request = new Request.Builder()
+ .addHeader("Authorization", "Bearer " + accessTokenProvider.get())
+ .url(httpUrl)
+ .get()
+ .build();
+
+ return request(request);
+ }
+
+ private InputStream request(Request request) {
+ Response response = null;
+ try {
+ response = httpClient.newCall(request).execute();
+ if (response.code() != 200) {
+ throw new ProcessException("Invalid response" +
+ " Code: " + response.code() +
+ " Message: " + response.message() +
+ " Body: " + (response.body() == null ? null : response.body().string())
+ );
+ }
+ return response.body().byteStream();
+ } catch (Exception e) {
+ if (response != null) {
+ response.close();
+ }
+ throw new ProcessException(String.format("Salesforce HTTP request failed [%s]", request.url()), e);
+ }
+ }
+}
diff --git a/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/util/SalesforceToRecordSchemaConverter.java b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/util/SalesforceToRecordSchemaConverter.java
new file mode 100644
index 0000000000..f4396415ea
--- /dev/null
+++ b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/util/SalesforceToRecordSchemaConverter.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.salesforce.util;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.camel.component.salesforce.api.dto.SObjectDescription;
+import org.apache.camel.component.salesforce.api.dto.SObjectField;
+import org.apache.camel.component.salesforce.api.utils.JsonUtils;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class SalesforceToRecordSchemaConverter {
+
+ private final String dateFormat;
+ private final String dateTimeFormat;
+ private final String timeFormat;
+ private final ObjectMapper objectMapper;
+
+ public SalesforceToRecordSchemaConverter(String dateFormat, String dateTimeFormat, String timeFormat) {
+ this.dateFormat = dateFormat;
+ this.dateTimeFormat = dateTimeFormat;
+ this.timeFormat = timeFormat;
+ objectMapper = JsonUtils.createObjectMapper();
+ }
+
+ public RecordSchema convertSchema(final InputStream describeSOjbectResultJsonString, final String fieldNamesOfInterest) throws IOException {
+
+ final SObjectDescription sObjectDescription = objectMapper.readValue(describeSOjbectResultJsonString, SObjectDescription.class);
+ final List<String> listOfFieldNamesOfInterest = Arrays.asList(fieldNamesOfInterest.toLowerCase().split("\\s*,\\s*"));
+ final List<SObjectField> fields = sObjectDescription.getFields()
+ .stream()
+ .filter(sObjectField -> listOfFieldNamesOfInterest.contains(sObjectField.getName().toLowerCase()))
+ .collect(Collectors.toList());
+
+ final List<RecordField> recordFields = new ArrayList<>();
+
+ for (SObjectField field : fields) {
+ final String soapType = field.getSoapType();
+
+ switch (soapType.substring(soapType.indexOf(':') + 1)) {
+ case "ID":
+ case "string":
+ case "json":
+ case "base64Binary":
+ case "anyType":
+ recordFields.add(new RecordField(field.getName(), RecordFieldType.STRING.getDataType(), field.getDefaultValue(), field.isNillable()));
+ break;
+ case "int":
+ recordFields.add(new RecordField(field.getName(), RecordFieldType.INT.getDataType(), field.getDefaultValue(), field.isNillable()));
+ break;
+ case "long":
+ recordFields.add(new RecordField(field.getName(), RecordFieldType.LONG.getDataType(), field.getDefaultValue(), field.isNillable()));
+ break;
+ case "double":
+ recordFields.add(new RecordField(field.getName(), RecordFieldType.DOUBLE.getDataType(), field.getDefaultValue(), field.isNillable()));
+ break;
+ case "boolean":
+ recordFields.add(new RecordField(field.getName(), RecordFieldType.BOOLEAN.getDataType(), field.getDefaultValue(), field.isNillable()));
+ break;
+ case "date":
+ recordFields.add(new RecordField(field.getName(), RecordFieldType.DATE.getDataType(dateFormat), field.getDefaultValue(), field.isNillable()));
+ break;
+ case "dateTime":
+ recordFields.add(new RecordField(field.getName(), RecordFieldType.TIMESTAMP.getDataType(dateTimeFormat), field.getDefaultValue(), field.isNillable()));
+ break;
+ case "time":
+ recordFields.add(new RecordField(field.getName(), RecordFieldType.TIME.getDataType(timeFormat), field.getDefaultValue(), field.isNillable()));
+ break;
+ case "address":
+ final RecordSchema addressSchema = new SimpleRecordSchema(Arrays.asList(
+ new RecordField("city", RecordFieldType.STRING.getDataType(), true),
+ new RecordField("country", RecordFieldType.STRING.getDataType(), true),
+ new RecordField("countryCode", RecordFieldType.STRING.getDataType(), true),
+ new RecordField("postalCode", RecordFieldType.STRING.getDataType(), true),
+ new RecordField("state", RecordFieldType.STRING.getDataType(), true),
+ new RecordField("stateCode", RecordFieldType.STRING.getDataType(), true),
+ new RecordField("street", RecordFieldType.STRING.getDataType(), true),
+ new RecordField("geocodeAccuracy", RecordFieldType.STRING.getDataType(), true)
+ ));
+ recordFields.add(new RecordField(field.getName(), RecordFieldType.RECORD.getRecordDataType(addressSchema), field.getDefaultValue(), field.isNillable()));
+ break;
+ case "location":
+ final RecordSchema locationSchema = new SimpleRecordSchema(Arrays.asList(
+ new RecordField("latitude", RecordFieldType.STRING.getDataType(), true),
+ new RecordField("longitude", RecordFieldType.STRING.getDataType(), true)
+ ));
+ recordFields.add(new RecordField(field.getName(), RecordFieldType.RECORD.getRecordDataType(locationSchema), field.getDefaultValue(), field.isNillable()));
+ break;
+ default:
+ throw new IllegalArgumentException(String.format("Could not create determine schema for '%s'. Could not convert field '%s' of soap type '%s'.",
+ sObjectDescription.getName(), field.getName(), soapType));
+ }
+ }
+
+ return new SimpleRecordSchema(recordFields);
+ }
+}
diff --git a/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
new file mode 100644
index 0000000000..503138fb69
--- /dev/null
+++ b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -0,0 +1,15 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+org.apache.nifi.processors.salesforce.QuerySalesforceObject
diff --git a/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/test/java/org/apache/nifi/processors/salesforce/QuerySalesforceObjectIT.java b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/test/java/org/apache/nifi/processors/salesforce/QuerySalesforceObjectIT.java
new file mode 100644
index 0000000000..547b99d54c
--- /dev/null
+++ b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/test/java/org/apache/nifi/processors/salesforce/QuerySalesforceObjectIT.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.salesforce;
+
+import org.apache.nifi.oauth2.StandardOauth2AccessTokenProvider;
+import org.apache.nifi.processor.Processor;
+import org.apache.nifi.processors.salesforce.util.SalesforceConfigAware;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.record.MockRecordWriter;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
+/**
+ * Set constants in {@link SalesforceConfigAware}
+ */
+class QuerySalesforceObjectIT implements SalesforceConfigAware {
+ private TestRunner runner;
+
+ @BeforeEach
+ void setUp() throws Exception {
+ Processor querySObject = new QuerySalesforceObject();
+
+ runner = TestRunners.newTestRunner(querySObject);
+
+ StandardOauth2AccessTokenProvider oauth2AccessTokenProvider = initOAuth2AccessTokenProvider(runner);
+ runner.setProperty(QuerySalesforceObject.TOKEN_PROVIDER, oauth2AccessTokenProvider.getIdentifier());
+ }
+
+ @AfterEach
+ void tearDown() {
+ runner.shutdown();
+ }
+
+ @Test
+ void retrievesAndWritesRecords() throws Exception {
+ String sObjectName = "Account";
+ String fieldNames = "Id,name,CreatedDate";
+
+ RecordSetWriterFactory writer = new MockRecordWriter();
+ runner.addControllerService("writer", writer);
+ runner.enableControllerService(writer);
+
+ runner.setProperty(QuerySalesforceObject.SOBJECT_NAME, sObjectName);
+ runner.setProperty(QuerySalesforceObject.FIELD_NAMES, fieldNames);
+ runner.setProperty(QuerySalesforceObject.API_VERSION, VERSION);
+ runner.setProperty(QuerySalesforceObject.API_URL, BASE_URL);
+ runner.setProperty(QuerySalesforceObject.RECORD_WRITER, writer.getIdentifier());
+ runner.setProperty(QuerySalesforceObject.AGE_FIELD, "CreatedDate");
+ runner.setProperty(QuerySalesforceObject.INITIAL_AGE_FILTER, "2022-01-06T08:43:24.000+0000");
+
+ runner.run();
+
+ List<MockFlowFile> results = runner.getFlowFilesForRelationship(QuerySalesforceObject.REL_SUCCESS);
+
+ assertNotNull(results.get(0).getContent());
+ }
+}
diff --git a/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/test/java/org/apache/nifi/processors/salesforce/util/SalesforceConfigAware.java b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/test/java/org/apache/nifi/processors/salesforce/util/SalesforceConfigAware.java
new file mode 100644
index 0000000000..c5b73e5897
--- /dev/null
+++ b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/test/java/org/apache/nifi/processors/salesforce/util/SalesforceConfigAware.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.salesforce.util;
+
+import org.apache.nifi.oauth2.StandardOauth2AccessTokenProvider;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.util.TestRunner;
+
+/**
+ * Set the following constants:<p>
+ * VERSION<p>
+ * BASE_URL<p>
+ * USERNAME<p>
+ * PASSWORD<p>
+ * CLIENT_ID<p>
+ * CLIENT_SECRET<p>
+ */
+public interface SalesforceConfigAware {
+ String VERSION = "54.0";
+ String BASE_URL = "https://MyDomainName.my.salesforce.com";
+
+ String AUTHORIZATION_SERVER_URL = BASE_URL + "/services/oauth2/token";
+ String USERNAME = "???";
+ String PASSWORD = "???";
+ String CLIENT_ID = "???";
+ String CLIENT_SECRET = "???";
+
+ default StandardOauth2AccessTokenProvider initOAuth2AccessTokenProvider(TestRunner runner) throws InitializationException {
+ StandardOauth2AccessTokenProvider oauth2AccessTokenProvider = new StandardOauth2AccessTokenProvider();
+
+ runner.addControllerService("oauth2AccessTokenProvider", oauth2AccessTokenProvider);
+
+ runner.setProperty(oauth2AccessTokenProvider, StandardOauth2AccessTokenProvider.AUTHORIZATION_SERVER_URL, AUTHORIZATION_SERVER_URL);
+ runner.setProperty(oauth2AccessTokenProvider, StandardOauth2AccessTokenProvider.USERNAME, USERNAME);
+ runner.setProperty(oauth2AccessTokenProvider, StandardOauth2AccessTokenProvider.PASSWORD, PASSWORD);
+ runner.setProperty(oauth2AccessTokenProvider, StandardOauth2AccessTokenProvider.CLIENT_ID, CLIENT_ID);
+ runner.setProperty(oauth2AccessTokenProvider, StandardOauth2AccessTokenProvider.CLIENT_SECRET, CLIENT_SECRET);
+
+ runner.enableControllerService(oauth2AccessTokenProvider);
+
+ return oauth2AccessTokenProvider;
+ }
+}
diff --git a/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/test/java/org/apache/nifi/processors/salesforce/util/SalesforceRestServiceIT.java b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/test/java/org/apache/nifi/processors/salesforce/util/SalesforceRestServiceIT.java
new file mode 100644
index 0000000000..a82b9ac863
--- /dev/null
+++ b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/test/java/org/apache/nifi/processors/salesforce/util/SalesforceRestServiceIT.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.salesforce.util;
+
+import org.apache.nifi.oauth2.StandardOauth2AccessTokenProvider;
+import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSessionFactory;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
+/**
+ * Set constants in {@link SalesforceConfigAware}
+ */
+class SalesforceRestServiceIT implements SalesforceConfigAware {
+ private TestRunner runner;
+ private SalesforceRestService testSubject;
+
+ @BeforeEach
+ void setUp() throws Exception {
+ runner = TestRunners.newTestRunner(new AbstractSessionFactoryProcessor() {
+ @Override
+ public void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) throws ProcessException {
+ }
+ });
+
+ StandardOauth2AccessTokenProvider oauth2AccessTokenProvider = initOAuth2AccessTokenProvider(runner);
+
+ testSubject = new SalesforceRestService(
+ VERSION,
+ BASE_URL,
+ () -> oauth2AccessTokenProvider.getAccessDetails().getAccessToken(),
+ 5_000
+ );
+ }
+
+ @AfterEach
+ void tearDown() {
+ runner.shutdown();
+ }
+
+ @Test
+ void describeSObjectSucceeds() throws IOException {
+ try (InputStream describeSObjectResultJson = testSubject.describeSObject("Account")) {
+ assertNotNull(describeSObjectResultJson);
+ }
+ }
+
+ @Test
+ void querySucceeds() throws IOException {
+ String query = "SELECT id,BillingAddress FROM Account";
+
+ try (InputStream querySObjectRecordsResultJson = testSubject.query(query)) {
+ assertNotNull(querySObjectRecordsResultJson);
+ }
+ }
+}
diff --git a/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/test/java/org/apache/nifi/processors/salesforce/util/SalesforceToRecordSchemaConverterTest.java b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/test/java/org/apache/nifi/processors/salesforce/util/SalesforceToRecordSchemaConverterTest.java
new file mode 100644
index 0000000000..f65506a0aa
--- /dev/null
+++ b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/test/java/org/apache/nifi/processors/salesforce/util/SalesforceToRecordSchemaConverterTest.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.salesforce.util;
+
+import com.fasterxml.jackson.databind.exc.MismatchedInputException;
+import org.apache.commons.io.IOUtils;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.junit.jupiter.api.Test;
+
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.Charset;
+import java.util.Arrays;
+import java.util.Collections;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+class SalesforceToRecordSchemaConverterTest {
+
+ private static final String TEST_PATH = "src/test/resources/converter/";
+ private static final String DATE_FORMAT = "yyyy-mm-dd";
+ private static final String TIME_STAMP_FORMAT = "yyyy-mm-dd / hh:mm:ss";
+ private static final String TIME_FORMAT = "hh:mm:ss";
+
+ private static final SalesforceToRecordSchemaConverter converter = new SalesforceToRecordSchemaConverter(DATE_FORMAT, TIME_STAMP_FORMAT, TIME_FORMAT);
+
+ @Test
+ void testConvertSchema() throws IOException {
+ final String salesforceSchemaFileName = "simple_sf_schema.json";
+ final String fieldNames = "ExampleInt,ExampleLong,ExampleDouble,ExampleBoolean," +
+ "ExampleID,ExampleString,ExampleJson,ExampleBase64Binary,ExampleAnyType," +
+ "ExampleDate,ExampleDateTime,ExampleTime";
+
+ final RecordSchema expected = new SimpleRecordSchema(Arrays.asList(
+ // primitives
+ new RecordField("ExampleInt", RecordFieldType.INT.getDataType()),
+ new RecordField("ExampleLong", RecordFieldType.LONG.getDataType()),
+ new RecordField("ExampleDouble", RecordFieldType.DOUBLE.getDataType()),
+ new RecordField("ExampleBoolean", RecordFieldType.BOOLEAN.getDataType()),
+ // string types
+ new RecordField("ExampleID", RecordFieldType.STRING.getDataType()),
+ new RecordField("ExampleString", RecordFieldType.STRING.getDataType()),
+ new RecordField("ExampleJson", RecordFieldType.STRING.getDataType()),
+ new RecordField("ExampleBase64Binary", RecordFieldType.STRING.getDataType()),
+ new RecordField("ExampleAnyType", RecordFieldType.STRING.getDataType()),
+ // date types
+ new RecordField("ExampleDate", RecordFieldType.DATE.getDataType(DATE_FORMAT)),
+ new RecordField("ExampleDateTime", RecordFieldType.TIMESTAMP.getDataType(TIME_STAMP_FORMAT)),
+ new RecordField("ExampleTime", RecordFieldType.TIME.getDataType(TIME_FORMAT))
+ ));
+
+ try (final InputStream sfSchema = readFile(TEST_PATH + salesforceSchemaFileName)) {
+ final RecordSchema actual = converter.convertSchema(sfSchema, fieldNames);
+
+ assertEquals(expected, actual);
+ }
+ }
+
+ @Test
+ void testConvertComplexTypes() throws IOException {
+ final String salesforceSchemaFileName = "complex_sf_schema.json";
+ final String fieldNames = "ExampleAddress,ExampleLocation";
+
+ final RecordSchema addressSchema = new SimpleRecordSchema(Arrays.asList(
+ new RecordField("city", RecordFieldType.STRING.getDataType(), true),
+ new RecordField("country", RecordFieldType.STRING.getDataType(), true),
+ new RecordField("countryCode", RecordFieldType.STRING.getDataType(), true),
+ new RecordField("postalCode", RecordFieldType.STRING.getDataType(), true),
+ new RecordField("state", RecordFieldType.STRING.getDataType(), true),
+ new RecordField("stateCode", RecordFieldType.STRING.getDataType(), true),
+ new RecordField("street", RecordFieldType.STRING.getDataType(), true),
+ new RecordField("geocodeAccuracy", RecordFieldType.STRING.getDataType(), true)
+ ));
+
+ final RecordSchema locationSchema = new SimpleRecordSchema(Arrays.asList(
+ new RecordField("latitude", RecordFieldType.STRING.getDataType(), true),
+ new RecordField("longitude", RecordFieldType.STRING.getDataType(), true)
+ ));
+
+ RecordSchema expected = new SimpleRecordSchema(Arrays.asList(
+ new RecordField("ExampleAddress", RecordFieldType.RECORD.getRecordDataType(addressSchema)),
+ new RecordField("ExampleLocation", RecordFieldType.RECORD.getRecordDataType(locationSchema))
+ ));
+
+ try (final InputStream sfSchema = readFile(TEST_PATH + salesforceSchemaFileName)) {
+ final RecordSchema actual = converter.convertSchema(sfSchema, fieldNames);
+
+ assertEquals(expected, actual);
+ }
+ }
+
+ @Test
+ void testSelectFields() throws IOException {
+ final String salesforceSchemaFileName = "simple_sf_schema.json";
+ final String fieldNames = "ExampleInt,ExampleTime";
+
+ final RecordSchema expected = new SimpleRecordSchema(Arrays.asList(
+ new RecordField("ExampleInt", RecordFieldType.INT.getDataType()),
+ new RecordField("ExampleTime", RecordFieldType.TIME.getDataType(TIME_FORMAT))
+ ));
+
+ try (final InputStream sfSchema = readFile(TEST_PATH + salesforceSchemaFileName)) {
+ final RecordSchema actual = converter.convertSchema(sfSchema, fieldNames);
+
+ assertEquals(expected, actual);
+ }
+ }
+
+ @Test
+ void testSelectEmptyFields() throws IOException {
+ final String salesforceSchemaFileName = "simple_sf_schema.json";
+ final String fieldNames = "";
+
+ final RecordSchema expected = new SimpleRecordSchema(Collections.emptyList());
+
+ try (final InputStream sfSchema = readFile(TEST_PATH + salesforceSchemaFileName)) {
+ final RecordSchema actual = converter.convertSchema(sfSchema, fieldNames);
+
+ assertEquals(expected, actual);
+ }
+ }
+
+ @Test
+ void testConvertEmptySchema() throws IOException {
+ try (final InputStream sfSchema = IOUtils.toInputStream("", Charset.defaultCharset())) {
+ assertThrows(MismatchedInputException.class, () -> converter.convertSchema(sfSchema, "ExampleField"));
+ }
+ }
+
+ @Test
+ void testConvertNullSchema() {
+ final InputStream sfSchema = null;
+ assertThrows(IllegalArgumentException.class, () -> converter.convertSchema(sfSchema, "ExampleField"));
+ }
+
+ @Test
+ void testConvertUnknownDataType() throws IOException {
+ try (final InputStream sfSchema = readFile(TEST_PATH + "unknown_type_sf_schema.json")) {
+ final String fieldNames = "FieldWithUnknownType";
+ final IllegalArgumentException exception = assertThrows(IllegalArgumentException.class, () -> converter.convertSchema(sfSchema, fieldNames));
+ final String errorMessage = "Could not create determine schema for 'SObjectWithUnknownFieldType'. Could not convert field 'FieldWithUnknownType' of soap type 'xsd:unknown'.";
+ assertEquals(errorMessage, exception.getMessage());
+ }
+ }
+
+ private InputStream readFile(final String path) throws IOException {
+ return new FileInputStream(path);
+ }
+
+}
diff --git a/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/test/resources/converter/complex_sf_schema.json b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/test/resources/converter/complex_sf_schema.json
new file mode 100644
index 0000000000..cc62d1b51f
--- /dev/null
+++ b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/test/resources/converter/complex_sf_schema.json
@@ -0,0 +1,16 @@
+{
+ "fields": [
+ {
+ "defaultValue": null,
+ "name": "ExampleAddress",
+ "nillable": true,
+ "soapType": "urn:address"
+ },
+ {
+ "defaultValue": null,
+ "name": "ExampleLocation",
+ "nillable": true,
+ "soapType": "tns:location"
+ }
+ ]
+}
diff --git a/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/test/resources/converter/simple_sf_schema.json b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/test/resources/converter/simple_sf_schema.json
new file mode 100644
index 0000000000..82073792c6
--- /dev/null
+++ b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/test/resources/converter/simple_sf_schema.json
@@ -0,0 +1,76 @@
+{
+ "fields": [
+ {
+ "defaultValue": null,
+ "name": "ExampleInt",
+ "nillable": true,
+ "soapType": "xns:int"
+ },
+ {
+ "defaultValue": null,
+ "name": "ExampleLong",
+ "nillable": true,
+ "soapType": "xns:long"
+ },
+ {
+ "defaultValue": null,
+ "name": "ExampleDouble",
+ "nillable": true,
+ "soapType": "xns:double"
+ },
+ {
+ "defaultValue": null,
+ "name": "ExampleBoolean",
+ "nillable": true,
+ "soapType": "xns:boolean"
+ },
+ {
+ "defaultValue": null,
+ "name": "ExampleID",
+ "nillable": true,
+ "soapType": "xns:ID"
+ },
+ {
+ "defaultValue": null,
+ "name": "ExampleString",
+ "nillable": true,
+ "soapType": "xns:string"
+ },
+ {
+ "defaultValue": null,
+ "name": "ExampleJson",
+ "nillable": true,
+ "soapType": "xns:json"
+ },
+ {
+ "defaultValue": null,
+ "name": "ExampleBase64Binary",
+ "nillable": true,
+ "soapType": "xns:base64Binary"
+ },
+ {
+ "defaultValue": null,
+ "name": "ExampleAnyType",
+ "nillable": true,
+ "soapType": "xns:anyType"
+ },
+ {
+ "defaultValue": null,
+ "name": "ExampleDate",
+ "nillable": true,
+ "soapType": "xsd:date"
+ },
+ {
+ "defaultValue": null,
+ "name": "ExampleDateTime",
+ "nillable": true,
+ "soapType": "xsd:dateTime"
+ },
+ {
+ "defaultValue": null,
+ "name": "ExampleTime",
+ "nillable": true,
+ "soapType": "xsd:time"
+ }
+ ]
+}
diff --git a/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/test/resources/converter/unknown_type_sf_schema.json b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/test/resources/converter/unknown_type_sf_schema.json
new file mode 100644
index 0000000000..4416a35f46
--- /dev/null
+++ b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/test/resources/converter/unknown_type_sf_schema.json
@@ -0,0 +1,11 @@
+{
+ "name": "SObjectWithUnknownFieldType",
+ "fields": [
+ {
+ "defaultValue": null,
+ "name": "FieldWithUnknownType",
+ "nillable": true,
+ "soapType": "xsd:unknown"
+ }
+ ]
+}
diff --git a/nifi-nar-bundles/nifi-salesforce-bundle/pom.xml b/nifi-nar-bundles/nifi-salesforce-bundle/pom.xml
new file mode 100644
index 0000000000..f07b3c1a6f
--- /dev/null
+++ b/nifi-nar-bundles/nifi-salesforce-bundle/pom.xml
@@ -0,0 +1,32 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>nifi-nar-bundles</artifactId>
+ <groupId>org.apache.nifi</groupId>
+ <version>1.17.0-SNAPSHOT</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>nifi-salesforce-bundle</artifactId>
+ <packaging>pom</packaging>
+ <modules>
+ <module>nifi-salesforce-nar</module>
+ <module>nifi-salesforce-processors</module>
+ </modules>
+</project>
diff --git a/nifi-nar-bundles/pom.xml b/nifi-nar-bundles/pom.xml
index fa49f9c1dc..e92f54ff3e 100755
--- a/nifi-nar-bundles/pom.xml
+++ b/nifi-nar-bundles/pom.xml
@@ -110,6 +110,7 @@
<module>nifi-stateless-processor-bundle</module>
<module>nifi-geohash-bundle</module>
<module>nifi-snowflake-bundle</module>
+ <module>nifi-salesforce-bundle</module>
</modules>
<build>