You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by GitBox <gi...@apache.org> on 2022/11/16 20:52:54 UTC

[GitHub] [nifi] Lehel44 opened a new pull request, #6670: NIFI-10832: Create PutSalesforceRecord processor

Lehel44 opened a new pull request, #6670:
URL: https://github.com/apache/nifi/pull/6670

   <!-- Licensed to the Apache Software Foundation (ASF) under one or more -->
   <!-- contributor license agreements.  See the NOTICE file distributed with -->
   <!-- this work for additional information regarding copyright ownership. -->
   <!-- The ASF licenses this file to You under the Apache License, Version 2.0 -->
   <!-- (the "License"); you may not use this file except in compliance with -->
   <!-- the License.  You may obtain a copy of the License at -->
   <!--     http://www.apache.org/licenses/LICENSE-2.0 -->
   <!-- Unless required by applicable law or agreed to in writing, software -->
   <!-- distributed under the License is distributed on an "AS IS" BASIS, -->
   <!-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -->
   <!-- See the License for the specific language governing permissions and -->
   <!-- limitations under the License. -->
   
   # Summary
   
   [NIFI-10832](https://issues.apache.org/jira/browse/NIFI-10832)
   
   # Tracking
   
   Please complete the following tracking steps prior to pull request creation.
   
   ### Issue Tracking
   
   - [ ] [Apache NiFi Jira](https://issues.apache.org/jira/browse/NIFI-10832) issue created
   
   ### Pull Request Tracking
   
   - [ ] Pull Request title starts with Apache NiFi Jira issue number, such as `NIFI-00000`
   - [ ] Pull Request commit message starts with Apache NiFi Jira issue number, as such `NIFI-00000`
   
   ### Pull Request Formatting
   
   - [ ] Pull Request based on current revision of the `main` branch
   - [ ] Pull Request refers to a feature branch with one commit containing changes
   
   # Verification
   
   Please indicate the verification steps performed prior to pull request creation.
   
   ### Build
   
   - [ ] Build completed using `mvn clean install -P contrib-check`
     - [ ] JDK 8
     - [ ] JDK 11
     - [ ] JDK 17
   
   ### Licensing
   
   - [ ] New dependencies are compatible with the [Apache License 2.0](https://apache.org/licenses/LICENSE-2.0) according to the [License Policy](https://www.apache.org/legal/resolved.html)
   - [ ] New dependencies are documented in applicable `LICENSE` and `NOTICE` files
   
   ### Documentation
   
   - [ ] Documentation formatting appears as expected in rendered files
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] Lehel44 commented on pull request #6670: NIFI-10832: Create PutSalesforceRecord processor

Posted by GitBox <gi...@apache.org>.
Lehel44 commented on PR #6670:
URL: https://github.com/apache/nifi/pull/6670#issuecomment-1318907094

   Hi @nandorsoma 
   Thanks for comment. The record in processor name refers to the "Salesforce record". In Salesforce, objects are database tables and records are the rows. Checking with @turcsanyip the processor is gonna be renamed to PutSalesforceObject where the "Object" part means the destination we put the "Salesforce records" to. Please not that a few minor changes will be added soon:
   - Extracting common processor properties
   - LICENSE/NOTICE files
   - additionalDetails.html


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] tpalfy commented on pull request #6670: NIFI-10832: Create PutSalesforceObject processor

Posted by GitBox <gi...@apache.org>.
tpalfy commented on PR #6670:
URL: https://github.com/apache/nifi/pull/6670#issuecomment-1348708673

   LGTM
   Thanks for your work @Lehel44 and @turcsanyip for closing the PR (after I forgot it during the merge).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] Lehel44 commented on a diff in pull request #6670: NIFI-10832: Create PutSalesforceObject processor

Posted by GitBox <gi...@apache.org>.
Lehel44 commented on code in PR #6670:
URL: https://github.com/apache/nifi/pull/6670#discussion_r1046534890


##########
nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/PutSalesforceObject.java:
##########
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.salesforce;
+
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.nifi.NullSuppression;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.ReadsAttribute;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.json.OutputGrouping;
+import org.apache.nifi.json.WriteJsonResult;
+import org.apache.nifi.oauth2.OAuth2AccessTokenProvider;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processors.salesforce.util.RecordExtender;
+import org.apache.nifi.processors.salesforce.util.SalesforceRestService;
+import org.apache.nifi.schema.access.NopSchemaAccessWriter;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSchema;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.nifi.processors.salesforce.util.CommonSalesforceProperties.API_URL;
+import static org.apache.nifi.processors.salesforce.util.CommonSalesforceProperties.API_VERSION;
+import static org.apache.nifi.processors.salesforce.util.CommonSalesforceProperties.READ_TIMEOUT;
+import static org.apache.nifi.processors.salesforce.util.CommonSalesforceProperties.TOKEN_PROVIDER;
+
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Tags({"salesforce", "sobject", "put"})
+@CapabilityDescription("Creates new records for the specified Salesforce sObject. The type of the Salesforce object must be set in the input flowfile's"
+        + " 'objectType' attribute. This processor cannot update existing records.")
+@ReadsAttribute(attribute = "objectType", description = "The Salesforce object type to upload records to. E.g. Account, Contact, Campaign.")
+public class PutSalesforceObject extends AbstractProcessor {
+
+    private static final int MAX_RECORD_COUNT = 200;
+
+    protected static final PropertyDescriptor RECORD_READER_FACTORY = new PropertyDescriptor.Builder()
+            .name("record-reader")
+            .displayName("Record Reader")
+            .description(
+                    "Specifies the Controller Service to use for parsing incoming data and determining the data's schema")
+            .identifiesControllerService(RecordReaderFactory.class)
+            .required(true)
+            .build();
+
+    static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("For FlowFiles created as a result of a successful execution.")
+            .build();
+
+    static final Relationship REL_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("For FlowFiles created as a result of an execution error.")
+            .build();
+
+    private static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(Arrays.asList(
+            API_URL,
+            API_VERSION,
+            READ_TIMEOUT,
+            TOKEN_PROVIDER,
+            RECORD_READER_FACTORY
+    ));
+
+    private static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
+            REL_SUCCESS,
+            REL_FAILURE
+    )));
+
+    private volatile SalesforceRestService salesforceRestService;
+    private volatile int maxRecordCount;
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return PROPERTIES;
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return RELATIONSHIPS;
+    }
+
+    @OnScheduled
+    public void onScheduled(final ProcessContext context) {
+        maxRecordCount = getMaxRecordCount();
+
+        String salesforceVersion = context.getProperty(API_VERSION).getValue();
+        String baseUrl = context.getProperty(API_URL).getValue();
+        OAuth2AccessTokenProvider accessTokenProvider =
+                context.getProperty(TOKEN_PROVIDER).asControllerService(OAuth2AccessTokenProvider.class);
+
+        salesforceRestService = new SalesforceRestService(
+                salesforceVersion,
+                baseUrl,
+                () -> accessTokenProvider.getAccessDetails().getAccessToken(),
+                context.getProperty(READ_TIMEOUT).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS)
+                        .intValue()
+        );
+    }
+
+    @Override
+    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
+        FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+
+        String objectType = flowFile.getAttribute("objectType");
+        if (objectType == null) {
+            throw new ProcessException("Salesforce object type not found among the incoming flowfile attributes");
+        }
+
+        RecordReaderFactory readerFactory = context.getProperty(RECORD_READER_FACTORY).asControllerService(RecordReaderFactory.class);
+
+        RecordExtender extender;
+
+        try (InputStream in = session.read(flowFile);
+             RecordReader reader = readerFactory.createRecordReader(flowFile, in, getLogger());
+             ByteArrayOutputStream out = new ByteArrayOutputStream();
+             WriteJsonResult writer = getWriter(extender = new RecordExtender(reader.getSchema()), out)) {
+
+            int count = 0;
+            Record record;
+
+            while ((record = reader.nextRecord()) != null) {
+                count++;
+                if (!writer.isActiveRecordSet()) {
+                    writer.beginRecordSet();
+                }
+
+                MapRecord extendedRecord = extender.getExtendedRecord(objectType, count, record);
+                writer.write(extendedRecord);
+
+                if (count == maxRecordCount) {
+                    count = 0;
+                    processRecords(objectType, out, writer, extender);
+                    out.reset();
+                }
+            }
+
+            if (writer.isActiveRecordSet()) {
+                processRecords(objectType, out, writer, extender);
+            }
+            session.transfer(flowFile, REL_SUCCESS);
+
+        } catch (MalformedRecordException e) {

Review Comment:
   Thanks for pointing out. I generalized the IOException catch branch to handle Exceptions since the error message would be the same.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] tpalfy commented on a diff in pull request #6670: NIFI-10832: Create PutSalesforceObject processor

Posted by GitBox <gi...@apache.org>.
tpalfy commented on code in PR #6670:
URL: https://github.com/apache/nifi/pull/6670#discussion_r1046116928


##########
nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/PutSalesforceObject.java:
##########
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.salesforce;
+
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.nifi.NullSuppression;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.ReadsAttribute;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.json.OutputGrouping;
+import org.apache.nifi.json.WriteJsonResult;
+import org.apache.nifi.oauth2.OAuth2AccessTokenProvider;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processors.salesforce.util.RecordExtender;
+import org.apache.nifi.processors.salesforce.util.SalesforceRestService;
+import org.apache.nifi.schema.access.NopSchemaAccessWriter;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSchema;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.nifi.processors.salesforce.util.CommonSalesforceProperties.API_URL;
+import static org.apache.nifi.processors.salesforce.util.CommonSalesforceProperties.API_VERSION;
+import static org.apache.nifi.processors.salesforce.util.CommonSalesforceProperties.READ_TIMEOUT;
+import static org.apache.nifi.processors.salesforce.util.CommonSalesforceProperties.TOKEN_PROVIDER;
+
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Tags({"salesforce", "sobject", "put"})
+@CapabilityDescription("Creates new records for the specified Salesforce sObject. The type of the Salesforce object must be set in the input flowfile's"
+        + " 'objectType' attribute. This processor cannot update existing records.")
+@ReadsAttribute(attribute = "objectType", description = "The Salesforce object type to upload records to. E.g. Account, Contact, Campaign.")
+public class PutSalesforceObject extends AbstractProcessor {
+
+    private static final int MAX_RECORD_COUNT = 200;
+
+    protected static final PropertyDescriptor RECORD_READER_FACTORY = new PropertyDescriptor.Builder()
+            .name("record-reader")
+            .displayName("Record Reader")
+            .description(
+                    "Specifies the Controller Service to use for parsing incoming data and determining the data's schema")
+            .identifiesControllerService(RecordReaderFactory.class)
+            .required(true)
+            .build();
+
+    static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("For FlowFiles created as a result of a successful execution.")
+            .build();
+
+    static final Relationship REL_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("For FlowFiles created as a result of an execution error.")
+            .build();
+
+    private static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(Arrays.asList(
+            API_URL,
+            API_VERSION,
+            READ_TIMEOUT,
+            TOKEN_PROVIDER,
+            RECORD_READER_FACTORY
+    ));
+
+    private static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
+            REL_SUCCESS,
+            REL_FAILURE
+    )));
+
+    private volatile SalesforceRestService salesforceRestService;
+    private volatile int maxRecordCount;
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return PROPERTIES;
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return RELATIONSHIPS;
+    }
+
+    @OnScheduled
+    public void onScheduled(final ProcessContext context) {
+        maxRecordCount = getMaxRecordCount();
+
+        String salesforceVersion = context.getProperty(API_VERSION).getValue();
+        String baseUrl = context.getProperty(API_URL).getValue();
+        OAuth2AccessTokenProvider accessTokenProvider =
+                context.getProperty(TOKEN_PROVIDER).asControllerService(OAuth2AccessTokenProvider.class);
+
+        salesforceRestService = new SalesforceRestService(
+                salesforceVersion,
+                baseUrl,
+                () -> accessTokenProvider.getAccessDetails().getAccessToken(),
+                context.getProperty(READ_TIMEOUT).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS)
+                        .intValue()
+        );
+    }
+
+    @Override
+    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
+        FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+
+        String objectType = flowFile.getAttribute("objectType");
+        if (objectType == null) {
+            throw new ProcessException("Salesforce object type not found among the incoming flowfile attributes");
+        }
+
+        RecordReaderFactory readerFactory = context.getProperty(RECORD_READER_FACTORY).asControllerService(RecordReaderFactory.class);
+
+        RecordExtender extender;
+
+        try (InputStream in = session.read(flowFile);
+             RecordReader reader = readerFactory.createRecordReader(flowFile, in, getLogger());
+             ByteArrayOutputStream out = new ByteArrayOutputStream();
+             WriteJsonResult writer = getWriter(extender = new RecordExtender(reader.getSchema()), out)) {
+
+            int count = 0;
+            Record record;
+
+            while ((record = reader.nextRecord()) != null) {
+                count++;
+                if (!writer.isActiveRecordSet()) {
+                    writer.beginRecordSet();
+                }
+
+                MapRecord extendedRecord = extender.getExtendedRecord(objectType, count, record);
+                writer.write(extendedRecord);
+
+                if (count == maxRecordCount) {
+                    count = 0;
+                    processRecords(objectType, out, writer, extender);
+                    out.reset();
+                }
+            }
+
+            if (writer.isActiveRecordSet()) {
+                processRecords(objectType, out, writer, extender);
+            }
+            session.transfer(flowFile, REL_SUCCESS);
+
+        } catch (MalformedRecordException e) {

Review Comment:
   Sorry for not noticing this before. When a non-handled exception is thrown the session is rolled back and the flowfile will be tired again - likely resulting and endless loop of failures.
   
   Maybe the IOException could warrant such an approach but I'd say it would be too risky even in that case. The current implementation sends the flowfile to failure in that case as well and probably for the better.
   
   In any case, we should catch a general Exception as well and handle similarly (log and send to failure).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] tpalfy commented on a diff in pull request #6670: NIFI-10832: Create PutSalesforceObject processor

Posted by GitBox <gi...@apache.org>.
tpalfy commented on code in PR #6670:
URL: https://github.com/apache/nifi/pull/6670#discussion_r1046116928


##########
nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/PutSalesforceObject.java:
##########
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.salesforce;
+
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.nifi.NullSuppression;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.ReadsAttribute;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.json.OutputGrouping;
+import org.apache.nifi.json.WriteJsonResult;
+import org.apache.nifi.oauth2.OAuth2AccessTokenProvider;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processors.salesforce.util.RecordExtender;
+import org.apache.nifi.processors.salesforce.util.SalesforceRestService;
+import org.apache.nifi.schema.access.NopSchemaAccessWriter;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSchema;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.nifi.processors.salesforce.util.CommonSalesforceProperties.API_URL;
+import static org.apache.nifi.processors.salesforce.util.CommonSalesforceProperties.API_VERSION;
+import static org.apache.nifi.processors.salesforce.util.CommonSalesforceProperties.READ_TIMEOUT;
+import static org.apache.nifi.processors.salesforce.util.CommonSalesforceProperties.TOKEN_PROVIDER;
+
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Tags({"salesforce", "sobject", "put"})
+@CapabilityDescription("Creates new records for the specified Salesforce sObject. The type of the Salesforce object must be set in the input flowfile's"
+        + " 'objectType' attribute. This processor cannot update existing records.")
+@ReadsAttribute(attribute = "objectType", description = "The Salesforce object type to upload records to. E.g. Account, Contact, Campaign.")
+public class PutSalesforceObject extends AbstractProcessor {
+
+    private static final int MAX_RECORD_COUNT = 200;
+
+    protected static final PropertyDescriptor RECORD_READER_FACTORY = new PropertyDescriptor.Builder()
+            .name("record-reader")
+            .displayName("Record Reader")
+            .description(
+                    "Specifies the Controller Service to use for parsing incoming data and determining the data's schema")
+            .identifiesControllerService(RecordReaderFactory.class)
+            .required(true)
+            .build();
+
+    static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("For FlowFiles created as a result of a successful execution.")
+            .build();
+
+    static final Relationship REL_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("For FlowFiles created as a result of an execution error.")
+            .build();
+
+    private static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(Arrays.asList(
+            API_URL,
+            API_VERSION,
+            READ_TIMEOUT,
+            TOKEN_PROVIDER,
+            RECORD_READER_FACTORY
+    ));
+
+    private static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
+            REL_SUCCESS,
+            REL_FAILURE
+    )));
+
+    private volatile SalesforceRestService salesforceRestService;
+    private volatile int maxRecordCount;
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return PROPERTIES;
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return RELATIONSHIPS;
+    }
+
+    @OnScheduled
+    public void onScheduled(final ProcessContext context) {
+        maxRecordCount = getMaxRecordCount();
+
+        String salesforceVersion = context.getProperty(API_VERSION).getValue();
+        String baseUrl = context.getProperty(API_URL).getValue();
+        OAuth2AccessTokenProvider accessTokenProvider =
+                context.getProperty(TOKEN_PROVIDER).asControllerService(OAuth2AccessTokenProvider.class);
+
+        salesforceRestService = new SalesforceRestService(
+                salesforceVersion,
+                baseUrl,
+                () -> accessTokenProvider.getAccessDetails().getAccessToken(),
+                context.getProperty(READ_TIMEOUT).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS)
+                        .intValue()
+        );
+    }
+
+    @Override
+    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
+        FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+
+        String objectType = flowFile.getAttribute("objectType");
+        if (objectType == null) {
+            throw new ProcessException("Salesforce object type not found among the incoming flowfile attributes");
+        }
+
+        RecordReaderFactory readerFactory = context.getProperty(RECORD_READER_FACTORY).asControllerService(RecordReaderFactory.class);
+
+        RecordExtender extender;
+
+        try (InputStream in = session.read(flowFile);
+             RecordReader reader = readerFactory.createRecordReader(flowFile, in, getLogger());
+             ByteArrayOutputStream out = new ByteArrayOutputStream();
+             WriteJsonResult writer = getWriter(extender = new RecordExtender(reader.getSchema()), out)) {
+
+            int count = 0;
+            Record record;
+
+            while ((record = reader.nextRecord()) != null) {
+                count++;
+                if (!writer.isActiveRecordSet()) {
+                    writer.beginRecordSet();
+                }
+
+                MapRecord extendedRecord = extender.getExtendedRecord(objectType, count, record);
+                writer.write(extendedRecord);
+
+                if (count == maxRecordCount) {
+                    count = 0;
+                    processRecords(objectType, out, writer, extender);
+                    out.reset();
+                }
+            }
+
+            if (writer.isActiveRecordSet()) {
+                processRecords(objectType, out, writer, extender);
+            }
+            session.transfer(flowFile, REL_SUCCESS);
+
+        } catch (MalformedRecordException e) {

Review Comment:
   Sorry for not noticing this before. When a non-handled exception is thrown the session is rolled back and the flowfile will be tried again later - resulting in a likely endless loop of failures.
   
   Maybe the IOException could warrant such an approach but I'd say it would be too risky even in that case. The current implementation sends the flowfile to failure in that case as well and probably for the better.
   
   In any case, we should catch a general Exception as well and handle similarly (log and send to failure).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] exceptionfactory commented on pull request #6670: NIFI-10832: Create PutSalesforceObject processor

Posted by GitBox <gi...@apache.org>.
exceptionfactory commented on PR #6670:
URL: https://github.com/apache/nifi/pull/6670#issuecomment-1349199366

   @turcsanyip It looks like the commit did not get applied to the `main` branch, and instead references a different branch named `commit`:
   
   https://gitbox.apache.org/repos/asf?p=nifi.git;h=df7ed8ee3f
   
   Can you check the `main` branch and update as needed?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] tpalfy commented on a diff in pull request #6670: NIFI-10832: Create PutSalesforceObject processor

Posted by GitBox <gi...@apache.org>.
tpalfy commented on code in PR #6670:
URL: https://github.com/apache/nifi/pull/6670#discussion_r1035006046


##########
nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/util/RecordExtender.java:
##########
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.salesforce.util;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+public class RecordExtender {
+
+    private static final ObjectMapper MAPPER = new ObjectMapper();
+
+    static final SimpleRecordSchema ATTRIBUTES_RECORD_SCHEMA = new SimpleRecordSchema(Arrays.asList(
+            new RecordField("type", RecordFieldType.STRING.getDataType()),
+            new RecordField("referenceId", RecordFieldType.STRING.getDataType())
+    ));
+
+    private final RecordSchema extendedSchema;
+
+    public RecordExtender(final RecordSchema originalSchema) {
+        extendedSchema = getExtendedSchema(originalSchema);

Review Comment:
   ```suggestion
           List<RecordField> recordFields = new ArrayList<>(originalSchema.getFields());
           recordFields.add(new RecordField("attributes", RecordFieldType.RECORD.getRecordDataType(
                   ATTRIBUTES_RECORD_SCHEMA
           )));
           
           extendedSchema = new SimpleRecordSchema(recordFields);
   ```



##########
nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/test/java/org/apache/nifi/processors/salesforce/util/TestRecordExtender.java:
##########
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.salesforce.util;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+
+import static org.apache.nifi.processors.salesforce.util.RecordExtender.ATTRIBUTES_RECORD_SCHEMA;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+class TestRecordExtender {
+
+    private static ObjectMapper OBJECT_MAPPER;
+    private static RecordSchema ORIGINAL_SCHEMA;
+    private static RecordSchema EXPECTED_EXTENDED_SCHEMA;
+
+    @BeforeAll
+    public static void setup() {
+        OBJECT_MAPPER = new ObjectMapper();
+        ORIGINAL_SCHEMA = new SimpleRecordSchema(Arrays.asList(
+                new RecordField("testRecordField1", RecordFieldType.STRING.getDataType()),
+                new RecordField("testRecordField2", RecordFieldType.STRING.getDataType())
+        ));
+        EXPECTED_EXTENDED_SCHEMA = new SimpleRecordSchema(Arrays.asList(
+                new RecordField("testRecordField1", RecordFieldType.STRING.getDataType()),
+                new RecordField("testRecordField2", RecordFieldType.STRING.getDataType()),
+                new RecordField("attributes", RecordFieldType.RECORD.getRecordDataType(new SimpleRecordSchema(Arrays.asList(
+                        new RecordField("type", RecordFieldType.STRING.getDataType()),
+                        new RecordField("referenceId", RecordFieldType.STRING.getDataType()
+                        )))))
+        ));
+    }
+
+    private RecordExtender testSubject;
+
+    @BeforeEach
+    public void init() {
+        testSubject = new RecordExtender(ORIGINAL_SCHEMA);
+    }
+
+    @Test
+    void testGetWrappedRecordJson() throws IOException {
+        ObjectNode testNode = OBJECT_MAPPER.createObjectNode();
+        testNode.put("testField1", "testValue1");
+        testNode.put("testField2", "testValue2");
+
+        ObjectNode expectedWrappedNode = OBJECT_MAPPER.createObjectNode();
+        expectedWrappedNode.set("records", testNode);
+
+        ByteArrayOutputStream out = new ByteArrayOutputStream();
+        out.write(testNode.toString().getBytes());
+
+        ObjectNode actualWrappedJson = testSubject.getWrappedRecordsJson(out);
+
+        assertEquals(expectedWrappedNode, actualWrappedJson);
+    }
+
+    @Test
+    void testGetExtendedSchema() {
+        final SimpleRecordSchema actualExtendedSchema = testSubject.getExtendedSchema(ORIGINAL_SCHEMA);

Review Comment:
   ```suggestion
           final RecordSchema actualExtendedSchema = testSubject.getExtendedSchema();
   ```



##########
nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/util/RecordExtender.java:
##########
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.salesforce.util;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+public class RecordExtender {
+
+    private static final ObjectMapper MAPPER = new ObjectMapper();
+
+    static final SimpleRecordSchema ATTRIBUTES_RECORD_SCHEMA = new SimpleRecordSchema(Arrays.asList(
+            new RecordField("type", RecordFieldType.STRING.getDataType()),
+            new RecordField("referenceId", RecordFieldType.STRING.getDataType())
+    ));
+
+    private final RecordSchema extendedSchema;
+
+    public RecordExtender(final RecordSchema originalSchema) {
+        extendedSchema = getExtendedSchema(originalSchema);
+    }
+
+    public ObjectNode getWrappedRecordsJson(ByteArrayOutputStream out) throws IOException {
+        ObjectNode root = MAPPER.createObjectNode();
+        JsonNode jsonNode = MAPPER.readTree(out.toByteArray());
+        root.set("records", jsonNode);
+        return root;
+    }
+
+    public MapRecord getExtendedRecord(String objectType, int count, Record record) {
+
+        Set<String> rawFieldNames = record.getRawFieldNames();
+        Map<String, Object> objectMap = rawFieldNames.stream()
+                .collect(Collectors.toMap(Function.identity(), record::getValue));
+
+        Map<String, Object> attributesMap = new HashMap<>();
+        attributesMap.put("type", objectType);
+        attributesMap.put("referenceId", count);
+
+        MapRecord attributesRecord = new MapRecord(ATTRIBUTES_RECORD_SCHEMA, attributesMap);
+
+        objectMap.put("attributes", attributesRecord);
+
+        return new MapRecord(extendedSchema, objectMap);
+    }
+
+    public SimpleRecordSchema getExtendedSchema(RecordSchema original) {
+        List<RecordField> recordFields = new ArrayList<>(original.getFields());
+        recordFields.add(new RecordField("attributes", RecordFieldType.RECORD.getRecordDataType(
+                ATTRIBUTES_RECORD_SCHEMA
+        )));
+        return new SimpleRecordSchema(recordFields);
+    }
+

Review Comment:
   This can further be simplified. We truly don't need to do this more than once.
   ```suggestion
   ```



##########
nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/PutSalesforceObject.java:
##########
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.salesforce;
+
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.nifi.NullSuppression;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.ReadsAttribute;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.json.OutputGrouping;
+import org.apache.nifi.json.WriteJsonResult;
+import org.apache.nifi.oauth2.OAuth2AccessTokenProvider;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processors.salesforce.util.RecordExtender;
+import org.apache.nifi.processors.salesforce.util.SalesforceRestService;
+import org.apache.nifi.schema.access.NopSchemaAccessWriter;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSchema;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.nifi.processors.salesforce.util.CommonSalesforceProperties.API_URL;
+import static org.apache.nifi.processors.salesforce.util.CommonSalesforceProperties.API_VERSION;
+import static org.apache.nifi.processors.salesforce.util.CommonSalesforceProperties.READ_TIMEOUT;
+import static org.apache.nifi.processors.salesforce.util.CommonSalesforceProperties.TOKEN_PROVIDER;
+
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Tags({"salesforce", "sobject", "put"})
+@CapabilityDescription("Posts records to a Salesforce sObject. The type of the Salesforce object must be set in the input flowfile's"
+        + " 'objectType' attribute.")
+@ReadsAttribute(attribute = "objectType", description = "The Salesforce object type to upload records to. E.g. Account, Contact, Campaign.")
+public class PutSalesforceObject extends AbstractProcessor {
+
+    private static final int MAX_RECORD_COUNT = 200;
+
+    protected static final PropertyDescriptor RECORD_READER_FACTORY = new PropertyDescriptor.Builder()
+            .name("record-reader")
+            .displayName("Record Reader")
+            .description(
+                    "Specifies the Controller Service to use for parsing incoming data and determining the data's schema")
+            .identifiesControllerService(RecordReaderFactory.class)
+            .required(true)
+            .build();
+
+    static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("For FlowFiles created as a result of a successful execution.")
+            .build();
+
+    static final Relationship REL_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("For FlowFiles created as a result of an execution error.")
+            .build();
+
+    private static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(Arrays.asList(
+            API_URL,
+            API_VERSION,
+            READ_TIMEOUT,
+            TOKEN_PROVIDER,
+            RECORD_READER_FACTORY
+    ));
+
+    private static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
+            REL_SUCCESS,
+            REL_FAILURE
+    )));
+
+    private volatile SalesforceRestService salesforceRestService;
+    private volatile int maxRecordCount;
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return PROPERTIES;
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return RELATIONSHIPS;
+    }
+
+    @OnScheduled
+    public void onScheduled(final ProcessContext context) {
+        maxRecordCount = getMaxRecordCount();
+
+        String salesforceVersion = context.getProperty(API_VERSION).getValue();
+        String baseUrl = context.getProperty(API_URL).getValue();
+        OAuth2AccessTokenProvider accessTokenProvider =
+                context.getProperty(TOKEN_PROVIDER).asControllerService(OAuth2AccessTokenProvider.class);
+
+        salesforceRestService = new SalesforceRestService(
+                salesforceVersion,
+                baseUrl,
+                () -> accessTokenProvider.getAccessDetails().getAccessToken(),
+                context.getProperty(READ_TIMEOUT).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS)
+                        .intValue()
+        );
+    }
+
+    @Override
+    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
+        FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+
+        String objectType = flowFile.getAttribute("objectType");
+        if (objectType == null) {
+            throw new ProcessException("Salesforce object type not found among the incoming flowfile attributes");
+        }
+
+        RecordReaderFactory readerFactory = context.getProperty(RECORD_READER_FACTORY).asControllerService(RecordReaderFactory.class);
+
+        RecordExtender extender;
+
+        try (InputStream in = session.read(flowFile);
+             RecordReader reader = readerFactory.createRecordReader(flowFile, in, getLogger());
+             ByteArrayOutputStream out = new ByteArrayOutputStream();
+             WriteJsonResult writer = getWriter(reader.getSchema(), extender = new RecordExtender(reader.getSchema()), out)) {

Review Comment:
   ```suggestion
                WriteJsonResult writer = getWriter(extender = new RecordExtender(reader.getSchema()), out)) {
   ```



##########
nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/PutSalesforceObject.java:
##########
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.salesforce;
+
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.nifi.NullSuppression;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.ReadsAttribute;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.json.OutputGrouping;
+import org.apache.nifi.json.WriteJsonResult;
+import org.apache.nifi.oauth2.OAuth2AccessTokenProvider;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processors.salesforce.util.RecordExtender;
+import org.apache.nifi.processors.salesforce.util.SalesforceRestService;
+import org.apache.nifi.schema.access.NopSchemaAccessWriter;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSchema;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.nifi.processors.salesforce.util.CommonSalesforceProperties.API_URL;
+import static org.apache.nifi.processors.salesforce.util.CommonSalesforceProperties.API_VERSION;
+import static org.apache.nifi.processors.salesforce.util.CommonSalesforceProperties.READ_TIMEOUT;
+import static org.apache.nifi.processors.salesforce.util.CommonSalesforceProperties.TOKEN_PROVIDER;
+
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Tags({"salesforce", "sobject", "put"})
+@CapabilityDescription("Posts records to a Salesforce sObject. The type of the Salesforce object must be set in the input flowfile's"
+        + " 'objectType' attribute.")
+@ReadsAttribute(attribute = "objectType", description = "The Salesforce object type to upload records to. E.g. Account, Contact, Campaign.")
+public class PutSalesforceObject extends AbstractProcessor {
+
+    private static final int MAX_RECORD_COUNT = 200;
+
+    protected static final PropertyDescriptor RECORD_READER_FACTORY = new PropertyDescriptor.Builder()
+            .name("record-reader")
+            .displayName("Record Reader")
+            .description(
+                    "Specifies the Controller Service to use for parsing incoming data and determining the data's schema")
+            .identifiesControllerService(RecordReaderFactory.class)
+            .required(true)
+            .build();
+
+    static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("For FlowFiles created as a result of a successful execution.")
+            .build();
+
+    static final Relationship REL_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("For FlowFiles created as a result of an execution error.")
+            .build();
+
+    private static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(Arrays.asList(
+            API_URL,
+            API_VERSION,
+            READ_TIMEOUT,
+            TOKEN_PROVIDER,
+            RECORD_READER_FACTORY
+    ));
+
+    private static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
+            REL_SUCCESS,
+            REL_FAILURE
+    )));
+
+    private volatile SalesforceRestService salesforceRestService;
+    private volatile int maxRecordCount;
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return PROPERTIES;
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return RELATIONSHIPS;
+    }
+
+    @OnScheduled
+    public void onScheduled(final ProcessContext context) {
+        maxRecordCount = getMaxRecordCount();
+
+        String salesforceVersion = context.getProperty(API_VERSION).getValue();
+        String baseUrl = context.getProperty(API_URL).getValue();
+        OAuth2AccessTokenProvider accessTokenProvider =
+                context.getProperty(TOKEN_PROVIDER).asControllerService(OAuth2AccessTokenProvider.class);
+
+        salesforceRestService = new SalesforceRestService(
+                salesforceVersion,
+                baseUrl,
+                () -> accessTokenProvider.getAccessDetails().getAccessToken(),
+                context.getProperty(READ_TIMEOUT).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS)
+                        .intValue()
+        );
+    }
+
+    @Override
+    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
+        FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+
+        String objectType = flowFile.getAttribute("objectType");
+        if (objectType == null) {
+            throw new ProcessException("Salesforce object type not found among the incoming flowfile attributes");
+        }
+
+        RecordReaderFactory readerFactory = context.getProperty(RECORD_READER_FACTORY).asControllerService(RecordReaderFactory.class);
+
+        RecordExtender extender;
+
+        try (InputStream in = session.read(flowFile);
+             RecordReader reader = readerFactory.createRecordReader(flowFile, in, getLogger());
+             ByteArrayOutputStream out = new ByteArrayOutputStream();
+             WriteJsonResult writer = getWriter(reader.getSchema(), extender = new RecordExtender(reader.getSchema()), out)) {
+
+            int count = 0;
+            Record record;
+
+            while ((record = reader.nextRecord()) != null) {
+                count++;
+                if (!writer.isActiveRecordSet()) {
+                    writer.beginRecordSet();
+                }
+
+                MapRecord extendedRecord = extender.getExtendedRecord(objectType, count, record);
+                writer.write(extendedRecord);
+
+                if (count == maxRecordCount) {
+                    count = 0;
+                    processRecords(objectType, out, writer, extender);
+                    out.reset();
+                }
+            }
+
+            if (writer.isActiveRecordSet()) {
+                processRecords(objectType, out, writer, extender);
+            }
+
+        } catch (MalformedRecordException e) {
+            getLogger().error("Couldn't read records from input", e);
+            session.transfer(flowFile, REL_FAILURE);
+        } catch (SchemaNotFoundException e) {
+            getLogger().error("Couldn't create record writer", e);
+        } catch (IOException e) {
+            getLogger().error("Failed to put records to Salesforce.", e);
+        }
+
+        session.transfer(flowFile, REL_SUCCESS);
+    }
+
+    private void processRecords(String objectType, ByteArrayOutputStream out, WriteJsonResult writer, RecordExtender extender) throws IOException {
+        writer.finishRecordSet();
+        writer.flush();
+        ObjectNode wrappedJson = extender.getWrappedRecordsJson(out);
+        salesforceRestService.postRecord(objectType, wrappedJson.toPrettyString());
+    }
+
+    private WriteJsonResult getWriter(RecordSchema originalSchema, RecordExtender extender, ByteArrayOutputStream out) throws IOException {
+        final SimpleRecordSchema extendedSchema = extender.getExtendedSchema(originalSchema);

Review Comment:
   ```suggestion
       private WriteJsonResult getWriter(RecordExtender extender, ByteArrayOutputStream out) throws IOException {
           final RecordSchema extendedSchema = extender.getExtendedSchema();
   ```



##########
nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/PutSalesforceObject.java:
##########
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.salesforce;
+
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.nifi.NullSuppression;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.ReadsAttribute;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.json.OutputGrouping;
+import org.apache.nifi.json.WriteJsonResult;
+import org.apache.nifi.oauth2.OAuth2AccessTokenProvider;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processors.salesforce.util.RecordExtender;
+import org.apache.nifi.processors.salesforce.util.SalesforceRestService;
+import org.apache.nifi.schema.access.NopSchemaAccessWriter;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSchema;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.nifi.processors.salesforce.util.CommonSalesforceProperties.API_URL;
+import static org.apache.nifi.processors.salesforce.util.CommonSalesforceProperties.API_VERSION;
+import static org.apache.nifi.processors.salesforce.util.CommonSalesforceProperties.READ_TIMEOUT;
+import static org.apache.nifi.processors.salesforce.util.CommonSalesforceProperties.TOKEN_PROVIDER;
+
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Tags({"salesforce", "sobject", "put"})
+@CapabilityDescription("Posts records to a Salesforce sObject. The type of the Salesforce object must be set in the input flowfile's"
+        + " 'objectType' attribute.")
+@ReadsAttribute(attribute = "objectType", description = "The Salesforce object type to upload records to. E.g. Account, Contact, Campaign.")
+public class PutSalesforceObject extends AbstractProcessor {
+
+    private static final int MAX_RECORD_COUNT = 200;
+
+    protected static final PropertyDescriptor RECORD_READER_FACTORY = new PropertyDescriptor.Builder()
+            .name("record-reader")
+            .displayName("Record Reader")
+            .description(
+                    "Specifies the Controller Service to use for parsing incoming data and determining the data's schema")
+            .identifiesControllerService(RecordReaderFactory.class)
+            .required(true)
+            .build();
+
+    static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("For FlowFiles created as a result of a successful execution.")
+            .build();
+
+    static final Relationship REL_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("For FlowFiles created as a result of an execution error.")
+            .build();
+
+    private static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(Arrays.asList(
+            API_URL,
+            API_VERSION,
+            READ_TIMEOUT,
+            TOKEN_PROVIDER,
+            RECORD_READER_FACTORY
+    ));
+
+    private static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
+            REL_SUCCESS,
+            REL_FAILURE
+    )));
+
+    private volatile SalesforceRestService salesforceRestService;
+    private volatile int maxRecordCount;
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return PROPERTIES;
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return RELATIONSHIPS;
+    }
+
+    @OnScheduled
+    public void onScheduled(final ProcessContext context) {
+        maxRecordCount = getMaxRecordCount();
+
+        String salesforceVersion = context.getProperty(API_VERSION).getValue();
+        String baseUrl = context.getProperty(API_URL).getValue();
+        OAuth2AccessTokenProvider accessTokenProvider =
+                context.getProperty(TOKEN_PROVIDER).asControllerService(OAuth2AccessTokenProvider.class);
+
+        salesforceRestService = new SalesforceRestService(
+                salesforceVersion,
+                baseUrl,
+                () -> accessTokenProvider.getAccessDetails().getAccessToken(),
+                context.getProperty(READ_TIMEOUT).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS)
+                        .intValue()
+        );
+    }
+
+    @Override
+    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
+        FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+
+        String objectType = flowFile.getAttribute("objectType");
+        if (objectType == null) {
+            throw new ProcessException("Salesforce object type not found among the incoming flowfile attributes");
+        }
+
+        RecordReaderFactory readerFactory = context.getProperty(RECORD_READER_FACTORY).asControllerService(RecordReaderFactory.class);
+
+        RecordExtender extender;
+
+        try (InputStream in = session.read(flowFile);
+             RecordReader reader = readerFactory.createRecordReader(flowFile, in, getLogger());
+             ByteArrayOutputStream out = new ByteArrayOutputStream();
+             WriteJsonResult writer = getWriter(reader.getSchema(), extender = new RecordExtender(reader.getSchema()), out)) {
+
+            int count = 0;
+            Record record;
+
+            while ((record = reader.nextRecord()) != null) {
+                count++;
+                if (!writer.isActiveRecordSet()) {
+                    writer.beginRecordSet();
+                }
+
+                MapRecord extendedRecord = extender.getExtendedRecord(objectType, count, record);
+                writer.write(extendedRecord);
+
+                if (count == maxRecordCount) {
+                    count = 0;
+                    processRecords(objectType, out, writer, extender);
+                    out.reset();
+                }
+            }
+
+            if (writer.isActiveRecordSet()) {
+                processRecords(objectType, out, writer, extender);
+            }
+
+        } catch (MalformedRecordException e) {
+            getLogger().error("Couldn't read records from input", e);
+            session.transfer(flowFile, REL_FAILURE);
+        } catch (SchemaNotFoundException e) {
+            getLogger().error("Couldn't create record writer", e);
+        } catch (IOException e) {
+            getLogger().error("Failed to put records to Salesforce.", e);
+        }
+
+        session.transfer(flowFile, REL_SUCCESS);
+    }

Review Comment:
   I think the success/failure handling is not working as intended.
   Maybe this is the correct one:
   ```suggestion
               if (writer.isActiveRecordSet()) {
                   processRecords(objectType, out, writer, extender);
               }
   
               session.transfer(flowFile, REL_SUCCESS);
   
           } catch (MalformedRecordException e) {
               getLogger().error("Couldn't read records from input", e);
               session.transfer(flowFile, REL_FAILURE);
           } catch (SchemaNotFoundException e) {
               getLogger().error("Couldn't create record writer", e);
               session.transfer(flowFile, REL_FAILURE);
           } catch (IOException e) {
               getLogger().error("Failed to put records to Salesforce.", e);
               session.transfer(flowFile, REL_FAILURE);
           }
       }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] tpalfy commented on a diff in pull request #6670: NIFI-10832: Create PutSalesforceRecord processor

Posted by GitBox <gi...@apache.org>.
tpalfy commented on code in PR #6670:
URL: https://github.com/apache/nifi/pull/6670#discussion_r1026780290


##########
nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/util/RecordExtender.java:
##########
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.salesforce.util;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+public class RecordExtender {
+
+    private static final ObjectMapper MAPPER = new ObjectMapper();
+    static final SimpleRecordSchema ATTRIBUTES_RECORD_SCHEMA = new SimpleRecordSchema(Arrays.asList(
+            new RecordField("type", RecordFieldType.STRING.getDataType()),
+            new RecordField("referenceId", RecordFieldType.STRING.getDataType())
+    ));
+
+    public ObjectNode getWrappedRecordsJson(ByteArrayOutputStream out) throws IOException {
+        ObjectNode root = MAPPER.createObjectNode();
+        JsonNode jsonNode = MAPPER.readTree(out.toByteArray());
+        root.set("records", jsonNode);
+        return root;
+    }
+
+    public MapRecord getExtendedRecord(String objectType, RecordSchema originalSchema, int count, Record record) {
+        SimpleRecordSchema extendedSchema = getExtendedSchema(originalSchema);
+
+        Set<String> rawFieldNames = record.getRawFieldNames();
+        Map<String, Object> objectMap = rawFieldNames.stream()
+                .collect(Collectors.toMap(Function.identity(), record::getValue));
+
+        Map<String, Object> attributesMap = new HashMap<>();
+        attributesMap.put("type", objectType);
+        attributesMap.put("referenceId", count);
+
+        MapRecord attributesRecord = new MapRecord(ATTRIBUTES_RECORD_SCHEMA, attributesMap);
+
+        objectMap.put("attributes", attributesRecord);
+
+        return new MapRecord(extendedSchema, objectMap);
+    }
+
+    public SimpleRecordSchema getExtendedSchema(RecordSchema original) {

Review Comment:
   This would be better done once, in the constructor for example.
   
   A challenge when it needs to be instantiated in a try-with-resources block without implementing AutoClosable but there's a trick we can use:
   ```java
           RecordExtender extender;
           
           try (InputStream in = session.read(flowFile);
                RecordReader reader = readerFactory.createRecordReader(flowFile, in, getLogger());
                ByteArrayOutputStream out = new ByteArrayOutputStream();
                WriteJsonResult writer = getWriter(extender = new RecordExtender(reader.getSchema()), out)) {
   ```



##########
nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/util/SalesforceRestService.java:
##########
@@ -84,11 +86,28 @@ public InputStream getNextRecords(String nextRecordsUrl) {
         return request(request);
     }
 
+    public InputStream postRecord(String sObjectApiName, String body) {
+        String url = baseUrl + "/services/data/v" + version + "/composite/tree/" + sObjectApiName;
+
+        HttpUrl httpUrl = HttpUrl.get(url).newBuilder()
+                .build();
+
+        final RequestBody requestBody = RequestBody.create(body, MediaType.parse("application/json"));
+
+        Request request = new Request.Builder()
+                .addHeader("Authorization", "Bearer " + accessTokenProvider.get())
+                .url(httpUrl)
+                .post(requestBody)
+                .build();
+
+        return request(request);
+    }
+
     private InputStream request(Request request) {
         Response response = null;
         try {
             response = httpClient.newCall(request).execute();
-            if (response.code() != 200) {
+            if (response.code() < 200 || response.code() >= 400) {

Review Comment:
   I'm not sure about this expansion. I assume the new endpoint returns a 201. I think we should allow only that (apart from 200), or maybe 2xx.



##########
nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/PutSalesforceRecord.java:
##########
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.salesforce;
+
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.nifi.NullSuppression;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.json.OutputGrouping;
+import org.apache.nifi.json.WriteJsonResult;
+import org.apache.nifi.oauth2.OAuth2AccessTokenProvider;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.salesforce.util.RecordExtender;
+import org.apache.nifi.processors.salesforce.util.SalesforceRestService;
+import org.apache.nifi.schema.access.NopSchemaAccessWriter;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSchema;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Tags({"salesforce", "sobject", "put"})
+@CapabilityDescription("Posts records to a Salesforce sObject. The type of the Salesforce object must be set in the input flowfile's"
+        + " 'objectType' attribute.")
+public class PutSalesforceRecord extends AbstractProcessor {
+
+    private static final int MAX_RECORD_COUNT = 200;
+
+    static final PropertyDescriptor API_URL = new PropertyDescriptor.Builder()
+            .name("salesforce-url")
+            .displayName("URL")
+            .description(
+                    "The URL for the Salesforce REST API including the domain without additional path information, such as https://MyDomainName.my.salesforce.com")
+            .required(true)
+            .addValidator(StandardValidators.URL_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    static final PropertyDescriptor API_VERSION = new PropertyDescriptor.Builder()
+            .name("salesforce-api-version")
+            .displayName("API Version")
+            .description(
+                    "The version number of the Salesforce REST API appended to the URL after the services/data path. See Salesforce documentation for supported versions")
+            .required(true)
+            .addValidator(StandardValidators.NUMBER_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .defaultValue("54.0")
+            .build();
+
+    static final PropertyDescriptor READ_TIMEOUT = new PropertyDescriptor.Builder()
+            .name("read-timeout")
+            .displayName("Read Timeout")
+            .description("Maximum time allowed for reading a response from the Salesforce REST API")
+            .required(true)
+            .defaultValue("15 s")
+            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    static final PropertyDescriptor TOKEN_PROVIDER = new PropertyDescriptor.Builder()
+            .name("oauth2-access-token-provider")
+            .displayName("OAuth2 Access Token Provider")
+            .description(
+                    "Service providing OAuth2 Access Tokens for authenticating using the HTTP Authorization Header")
+            .identifiesControllerService(OAuth2AccessTokenProvider.class)
+            .required(true)
+            .build();
+
+    protected static final PropertyDescriptor RECORD_READER_FACTORY = new PropertyDescriptor.Builder()
+            .name("record-reader")
+            .displayName("Record Reader")
+            .description(
+                    "Specifies the Controller Service to use for parsing incoming data and determining the data's schema")
+            .identifiesControllerService(RecordReaderFactory.class)
+            .required(true)
+            .build();
+
+    static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("For FlowFiles created as a result of a successful execution.")
+            .build();
+
+    static final Relationship REL_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("For FlowFiles created as a result of an execution error.")
+            .build();
+
+    private static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(Arrays.asList(
+            API_URL,
+            API_VERSION,
+            READ_TIMEOUT,
+            TOKEN_PROVIDER,
+            RECORD_READER_FACTORY
+    ));
+
+    private static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
+            REL_SUCCESS,
+            REL_FAILURE
+    )));
+
+    private volatile SalesforceRestService salesforceRestService;
+    private volatile int maxRecordCount;
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return PROPERTIES;
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return RELATIONSHIPS;
+    }
+
+    @OnScheduled
+    public void onScheduled(final ProcessContext context) {
+        maxRecordCount = getMaxRecordCount();
+
+        String salesforceVersion = context.getProperty(API_VERSION).getValue();
+        String baseUrl = context.getProperty(API_URL).getValue();
+        OAuth2AccessTokenProvider accessTokenProvider =
+                context.getProperty(TOKEN_PROVIDER).asControllerService(OAuth2AccessTokenProvider.class);
+
+        salesforceRestService = new SalesforceRestService(
+                salesforceVersion,
+                baseUrl,
+                () -> accessTokenProvider.getAccessDetails().getAccessToken(),
+                context.getProperty(READ_TIMEOUT).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS)
+                        .intValue()
+        );
+    }
+
+    @Override
+    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
+        FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+
+        String objectType = flowFile.getAttribute("objectType");

Review Comment:
   Please add a `@ReadsAttribute` annotation on the class with a description that explains what this is supposed to be.



##########
nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/PutSalesforceRecord.java:
##########
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.salesforce;
+
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.nifi.NullSuppression;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.json.OutputGrouping;
+import org.apache.nifi.json.WriteJsonResult;
+import org.apache.nifi.oauth2.OAuth2AccessTokenProvider;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.salesforce.util.RecordExtender;
+import org.apache.nifi.processors.salesforce.util.SalesforceRestService;
+import org.apache.nifi.schema.access.NopSchemaAccessWriter;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSchema;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Tags({"salesforce", "sobject", "put"})
+@CapabilityDescription("Posts records to a Salesforce sObject. The type of the Salesforce object must be set in the input flowfile's"
+        + " 'objectType' attribute.")
+public class PutSalesforceRecord extends AbstractProcessor {
+
+    private static final int MAX_RECORD_COUNT = 200;
+
+    static final PropertyDescriptor API_URL = new PropertyDescriptor.Builder()
+            .name("salesforce-url")
+            .displayName("URL")
+            .description(
+                    "The URL for the Salesforce REST API including the domain without additional path information, such as https://MyDomainName.my.salesforce.com")
+            .required(true)
+            .addValidator(StandardValidators.URL_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    static final PropertyDescriptor API_VERSION = new PropertyDescriptor.Builder()
+            .name("salesforce-api-version")
+            .displayName("API Version")
+            .description(
+                    "The version number of the Salesforce REST API appended to the URL after the services/data path. See Salesforce documentation for supported versions")
+            .required(true)
+            .addValidator(StandardValidators.NUMBER_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .defaultValue("54.0")
+            .build();
+
+    static final PropertyDescriptor READ_TIMEOUT = new PropertyDescriptor.Builder()
+            .name("read-timeout")
+            .displayName("Read Timeout")
+            .description("Maximum time allowed for reading a response from the Salesforce REST API")
+            .required(true)
+            .defaultValue("15 s")
+            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    static final PropertyDescriptor TOKEN_PROVIDER = new PropertyDescriptor.Builder()
+            .name("oauth2-access-token-provider")
+            .displayName("OAuth2 Access Token Provider")
+            .description(
+                    "Service providing OAuth2 Access Tokens for authenticating using the HTTP Authorization Header")
+            .identifiesControllerService(OAuth2AccessTokenProvider.class)
+            .required(true)
+            .build();
+
+    protected static final PropertyDescriptor RECORD_READER_FACTORY = new PropertyDescriptor.Builder()
+            .name("record-reader")
+            .displayName("Record Reader")
+            .description(
+                    "Specifies the Controller Service to use for parsing incoming data and determining the data's schema")
+            .identifiesControllerService(RecordReaderFactory.class)
+            .required(true)
+            .build();
+
+    static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("For FlowFiles created as a result of a successful execution.")
+            .build();
+
+    static final Relationship REL_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("For FlowFiles created as a result of an execution error.")
+            .build();
+
+    private static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(Arrays.asList(
+            API_URL,
+            API_VERSION,
+            READ_TIMEOUT,
+            TOKEN_PROVIDER,
+            RECORD_READER_FACTORY
+    ));
+
+    private static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
+            REL_SUCCESS,
+            REL_FAILURE
+    )));
+
+    private volatile SalesforceRestService salesforceRestService;
+    private volatile int maxRecordCount;
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return PROPERTIES;
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return RELATIONSHIPS;
+    }
+
+    @OnScheduled
+    public void onScheduled(final ProcessContext context) {
+        maxRecordCount = getMaxRecordCount();
+
+        String salesforceVersion = context.getProperty(API_VERSION).getValue();
+        String baseUrl = context.getProperty(API_URL).getValue();
+        OAuth2AccessTokenProvider accessTokenProvider =
+                context.getProperty(TOKEN_PROVIDER).asControllerService(OAuth2AccessTokenProvider.class);
+
+        salesforceRestService = new SalesforceRestService(
+                salesforceVersion,
+                baseUrl,
+                () -> accessTokenProvider.getAccessDetails().getAccessToken(),
+                context.getProperty(READ_TIMEOUT).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS)
+                        .intValue()
+        );
+    }
+
+    @Override
+    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
+        FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+
+        String objectType = flowFile.getAttribute("objectType");
+        if (objectType == null) {
+            throw new ProcessException("Salesforce object type not found");

Review Comment:
   For easier error handling the message should state that the object type was not found _among the incoming flowfile attributes_ - as such it's okay to refer to the actual attribute name.



##########
nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/PutSalesforceRecord.java:
##########
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.salesforce;
+
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.nifi.NullSuppression;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.json.OutputGrouping;
+import org.apache.nifi.json.WriteJsonResult;
+import org.apache.nifi.oauth2.OAuth2AccessTokenProvider;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.salesforce.util.RecordExtender;
+import org.apache.nifi.processors.salesforce.util.SalesforceRestService;
+import org.apache.nifi.schema.access.NopSchemaAccessWriter;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSchema;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Tags({"salesforce", "sobject", "put"})
+@CapabilityDescription("Posts records to a Salesforce sObject. The type of the Salesforce object must be set in the input flowfile's"
+        + " 'objectType' attribute.")
+public class PutSalesforceRecord extends AbstractProcessor {
+
+    private static final int MAX_RECORD_COUNT = 200;
+
+    static final PropertyDescriptor API_URL = new PropertyDescriptor.Builder()
+            .name("salesforce-url")
+            .displayName("URL")
+            .description(
+                    "The URL for the Salesforce REST API including the domain without additional path information, such as https://MyDomainName.my.salesforce.com")
+            .required(true)
+            .addValidator(StandardValidators.URL_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    static final PropertyDescriptor API_VERSION = new PropertyDescriptor.Builder()
+            .name("salesforce-api-version")
+            .displayName("API Version")
+            .description(
+                    "The version number of the Salesforce REST API appended to the URL after the services/data path. See Salesforce documentation for supported versions")
+            .required(true)
+            .addValidator(StandardValidators.NUMBER_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .defaultValue("54.0")
+            .build();
+
+    static final PropertyDescriptor READ_TIMEOUT = new PropertyDescriptor.Builder()
+            .name("read-timeout")
+            .displayName("Read Timeout")
+            .description("Maximum time allowed for reading a response from the Salesforce REST API")
+            .required(true)
+            .defaultValue("15 s")
+            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    static final PropertyDescriptor TOKEN_PROVIDER = new PropertyDescriptor.Builder()
+            .name("oauth2-access-token-provider")
+            .displayName("OAuth2 Access Token Provider")
+            .description(
+                    "Service providing OAuth2 Access Tokens for authenticating using the HTTP Authorization Header")
+            .identifiesControllerService(OAuth2AccessTokenProvider.class)
+            .required(true)
+            .build();
+
+    protected static final PropertyDescriptor RECORD_READER_FACTORY = new PropertyDescriptor.Builder()
+            .name("record-reader")
+            .displayName("Record Reader")
+            .description(
+                    "Specifies the Controller Service to use for parsing incoming data and determining the data's schema")
+            .identifiesControllerService(RecordReaderFactory.class)
+            .required(true)
+            .build();
+
+    static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("For FlowFiles created as a result of a successful execution.")
+            .build();
+
+    static final Relationship REL_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("For FlowFiles created as a result of an execution error.")
+            .build();
+
+    private static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(Arrays.asList(
+            API_URL,
+            API_VERSION,
+            READ_TIMEOUT,
+            TOKEN_PROVIDER,
+            RECORD_READER_FACTORY
+    ));
+
+    private static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
+            REL_SUCCESS,
+            REL_FAILURE
+    )));
+
+    private volatile SalesforceRestService salesforceRestService;
+    private volatile int maxRecordCount;
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return PROPERTIES;
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return RELATIONSHIPS;
+    }
+
+    @OnScheduled
+    public void onScheduled(final ProcessContext context) {
+        maxRecordCount = getMaxRecordCount();
+
+        String salesforceVersion = context.getProperty(API_VERSION).getValue();
+        String baseUrl = context.getProperty(API_URL).getValue();
+        OAuth2AccessTokenProvider accessTokenProvider =
+                context.getProperty(TOKEN_PROVIDER).asControllerService(OAuth2AccessTokenProvider.class);
+
+        salesforceRestService = new SalesforceRestService(
+                salesforceVersion,
+                baseUrl,
+                () -> accessTokenProvider.getAccessDetails().getAccessToken(),
+                context.getProperty(READ_TIMEOUT).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS)
+                        .intValue()
+        );
+    }
+
+    @Override
+    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
+        FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+
+        String objectType = flowFile.getAttribute("objectType");
+        if (objectType == null) {
+            throw new ProcessException("Salesforce object type not found");
+        }
+
+        RecordReaderFactory readerFactory = context.getProperty(RECORD_READER_FACTORY).asControllerService(RecordReaderFactory.class);
+
+        RecordExtender extender = new RecordExtender();
+
+        try (InputStream in = session.read(flowFile);
+             RecordReader reader = readerFactory.createRecordReader(flowFile, in, getLogger());
+             ByteArrayOutputStream out = new ByteArrayOutputStream();
+             WriteJsonResult writer = getWriter(reader.getSchema(), extender, out)) {
+
+            AtomicInteger count = new AtomicInteger();
+            RecordSchema originalSchema = reader.getSchema();
+            Record record;
+
+            while ((record = reader.nextRecord()) != null) {
+                count.incrementAndGet();
+                if (!writer.isActiveRecordSet()) {
+                    writer.beginRecordSet();
+                }
+
+                MapRecord extendedRecord = extender.getExtendedRecord(objectType, originalSchema, count.get(), record);
+                writer.write(extendedRecord);
+
+                if (count.compareAndSet(maxRecordCount, 0)) {
+                    processRecords(objectType, out, writer, extender);
+                    out.reset();
+                }
+            }
+
+            if (writer.isActiveRecordSet()) {
+                processRecords(objectType, out, writer, extender);
+            }
+
+        } catch (Exception ex) {

Review Comment:
   For error handling purposes we it would be better to distinguish between IOException, MalformedRecordException and SchemaNotFoundException.
   
   In all 3 cases the flowfile can go to REL_FAILURE but the message could be different.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] tpalfy commented on a diff in pull request #6670: NIFI-10832: Create PutSalesforceObject processor

Posted by GitBox <gi...@apache.org>.
tpalfy commented on code in PR #6670:
URL: https://github.com/apache/nifi/pull/6670#discussion_r1043518617


##########
nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/PutSalesforceObject.java:
##########
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.salesforce;
+
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.nifi.NullSuppression;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.ReadsAttribute;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.json.OutputGrouping;
+import org.apache.nifi.json.WriteJsonResult;
+import org.apache.nifi.oauth2.OAuth2AccessTokenProvider;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processors.salesforce.util.RecordExtender;
+import org.apache.nifi.processors.salesforce.util.SalesforceRestService;
+import org.apache.nifi.schema.access.NopSchemaAccessWriter;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSchema;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.nifi.processors.salesforce.util.CommonSalesforceProperties.API_URL;
+import static org.apache.nifi.processors.salesforce.util.CommonSalesforceProperties.API_VERSION;
+import static org.apache.nifi.processors.salesforce.util.CommonSalesforceProperties.READ_TIMEOUT;
+import static org.apache.nifi.processors.salesforce.util.CommonSalesforceProperties.TOKEN_PROVIDER;
+
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Tags({"salesforce", "sobject", "put"})
+@CapabilityDescription("Posts records to a Salesforce sObject. The type of the Salesforce object must be set in the input flowfile's"

Review Comment:
   "Post" feels too much of an HTML technical term.
   Also if we don't want to support updating existing records for now, we should indicate this in the description.



##########
nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/resources/docs/org.apache.nifi.processors.salesforce.PutSalesforceObject/additionalDetails.html:
##########
@@ -0,0 +1,69 @@
+<!DOCTYPE html>
+<html lang="en" xmlns="http://www.w3.org/1999/html">
+<!--
+      Licensed to the Apache Software Foundation (ASF) under one or more
+      contributor license agreements.  See the NOTICE file distributed with
+      this work for additional information regarding copyright ownership.
+      The ASF licenses this file to You under the Apache License, Version 2.0
+      (the "License"); you may not use this file except in compliance with
+      the License.  You may obtain a copy of the License at
+          http://www.apache.org/licenses/LICENSE-2.0
+      Unless required by applicable law or agreed to in writing, software
+      distributed under the License is distributed on an "AS IS" BASIS,
+      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+      See the License for the specific language governing permissions and
+      limitations under the License.
+    -->
+
+<head>
+    <meta charset="utf-8"/>
+    <title>PutSalesforceObject</title>
+    <link rel="stylesheet" href="../../../../../css/component-usage.css" type="text/css"/>
+    <style>
+h2 {margin-top: 4em}
+h3 {margin-top: 3em}
+td {text-align: left}
+    </style>
+</head>
+
+<body>
+
+<h1>QuerySalesforceObject</h1>
+
+<h3>Description</h3>
+
+<p>
+    Objects in Salesforce are database tables, their rows are known as records, and their columns are called fields. The PutSalesforceObject creates a new a Salesforce record in a Salesforce object.
+    The Salesforce object must be set as the "objectType" attribute of an incoming flowfile. Check <a href="https://developer.salesforce.com/docs/atlas.en-us.object_reference.meta/object_reference/sforce_api_objects_list.htm">Salesforce documentation</a> for object types and metadata.
+    The processor utilizes NiFi record-based processing to allow arbitrary input format.
+</p>
+
+<h4>Example</h4>
+
+<p>
+    If the "objectType" is set to "Account", the following JSON input will create two records on the Account object with the names
+    "SampleAccount1" and "SampleAccount2".
+
+    <code class="prettyprint">

Review Comment:
   When viewing in a browser the JSON is not formatted. Probably should use the `<pre>` tag.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] turcsanyip commented on pull request #6670: NIFI-10832: Create PutSalesforceObject processor

Posted by GitBox <gi...@apache.org>.
turcsanyip commented on PR #6670:
URL: https://github.com/apache/nifi/pull/6670#issuecomment-1348704501

   Closing PR manually due to missing PR reference in the commit message.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] nandorsoma commented on pull request #6670: NIFI-10832: Create PutSalesforceRecord processor

Posted by GitBox <gi...@apache.org>.
nandorsoma commented on PR #6670:
URL: https://github.com/apache/nifi/pull/6670#issuecomment-1318909819

   That's a good idea, this way it will be in sync with the `QuerySalesforceObject` processor.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] nandorsoma commented on pull request #6670: NIFI-10832: Create PutSalesforceRecord processor

Posted by GitBox <gi...@apache.org>.
nandorsoma commented on PR #6670:
URL: https://github.com/apache/nifi/pull/6670#issuecomment-1318899146

   Hey @Lehel44!
   I'll review this one, but before that, one thing. We no longer separate "standard" and "record-based" processors. We do record processing when Record Reader/Writer properties are set. Otherwise, we process the whole FlowFile content in one piece. Because of that, I'd remove the Record part from the processor's name and add "record" into the `@Tag` attribute. If we ever want to support standard processing, this way, we leave it as an option because otherwise, we would have to rename the processor, which is a breaking change, as I know.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] turcsanyip closed pull request #6670: NIFI-10832: Create PutSalesforceObject processor

Posted by GitBox <gi...@apache.org>.
turcsanyip closed pull request #6670: NIFI-10832: Create PutSalesforceObject processor
URL: https://github.com/apache/nifi/pull/6670


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] tpalfy commented on a diff in pull request #6670: NIFI-10832: Create PutSalesforceRecord processor

Posted by GitBox <gi...@apache.org>.
tpalfy commented on code in PR #6670:
URL: https://github.com/apache/nifi/pull/6670#discussion_r1028016451


##########
nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/test/java/org/apache/nifi/processors/salesforce/util/TestRecordExtender.java:
##########
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.salesforce.util;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+
+import static org.apache.nifi.processors.salesforce.util.RecordExtender.ATTRIBUTES_RECORD_SCHEMA;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+class TestRecordExtender {
+
+    private static RecordExtender EXTENDER;
+    private static ObjectMapper OBJECT_MAPPER;
+    private static RecordSchema TEST_RECORD_SCHEMA;
+    private static RecordSchema EXTENDED_TEST_RECORD_SCHEMA;
+
+    @BeforeAll
+    public static void setup() {
+        EXTENDER = new RecordExtender();
+        OBJECT_MAPPER = new ObjectMapper();
+        TEST_RECORD_SCHEMA = new SimpleRecordSchema(Arrays.asList(
+                new RecordField("testRecordField1", RecordFieldType.STRING.getDataType()),
+                new RecordField("testRecordField2", RecordFieldType.STRING.getDataType())
+        ));
+        EXTENDED_TEST_RECORD_SCHEMA = new SimpleRecordSchema(Arrays.asList(
+                new RecordField("testRecordField1", RecordFieldType.STRING.getDataType()),
+                new RecordField("testRecordField2", RecordFieldType.STRING.getDataType()),
+                new RecordField("attributes", RecordFieldType.RECORD.getRecordDataType(new SimpleRecordSchema(Arrays.asList(
+                        new RecordField("type", RecordFieldType.STRING.getDataType()),
+                        new RecordField("referenceId", RecordFieldType.STRING.getDataType()
+                        )))))
+        ));
+    }
+
+    @Test
+    void testGetWrappedRecordJson() throws IOException {
+        ObjectNode testNode = OBJECT_MAPPER.createObjectNode();
+        testNode.put("testField1", "testValue1");
+        testNode.put("testField2", "testValue2");
+
+        ObjectNode expectedWrappedNode = OBJECT_MAPPER.createObjectNode();
+        expectedWrappedNode.set("records", testNode);
+
+        ByteArrayOutputStream out = new ByteArrayOutputStream();
+        out.write(testNode.toString().getBytes());
+
+        ObjectNode actualWrappedJson = EXTENDER.getWrappedRecordsJson(out);
+
+        assertEquals(expectedWrappedNode, actualWrappedJson);
+    }
+
+    @Test
+    void testGetExtendedSchema() {
+        final SimpleRecordSchema actualRecordSchema = EXTENDER.getExtendedSchema(TEST_RECORD_SCHEMA);
+
+        assertEquals(EXTENDED_TEST_RECORD_SCHEMA, actualRecordSchema);

Review Comment:
   ```suggestion
           final SimpleRecordSchema actualExtendedSchema = EXTENDER.getExtendedSchema(TEST_RECORD_SCHEMA);
   
           assertEquals(EXPECTED_EXTENDED_SCHEMA, actualExtendedSchema);
   
   ```



##########
nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/test/java/org/apache/nifi/processors/salesforce/PutSalesforceRecordIT.java:
##########
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.salesforce;
+
+import org.apache.nifi.oauth2.StandardOauth2AccessTokenProvider;
+import org.apache.nifi.processor.Processor;
+import org.apache.nifi.processors.salesforce.util.SalesforceConfigAware;
+import org.apache.nifi.serialization.record.MockRecordParser;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.util.MockComponentLog;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.Collections;
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
+class PutSalesforceRecordIT implements SalesforceConfigAware {
+
+    private TestRunner runner;
+
+    @BeforeEach
+    void setUp() throws Exception {
+        int maxRecordCount = 2;
+        Processor putSalesforceRecord = new CustomPutSalesforceRecord(maxRecordCount);
+
+        runner = TestRunners.newTestRunner(putSalesforceRecord);
+
+        StandardOauth2AccessTokenProvider oauth2AccessTokenProvider = initOAuth2AccessTokenProvider(runner);
+        runner.setProperty(QuerySalesforceObject.TOKEN_PROVIDER, oauth2AccessTokenProvider.getIdentifier());

Review Comment:
   ```suggestion
           runner.setProperty(PutSalesforceRecord.TOKEN_PROVIDER, oauth2AccessTokenProvider.getIdentifier());
   
   ```



##########
nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/test/java/org/apache/nifi/processors/salesforce/PutSalesforceRecordIT.java:
##########
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.salesforce;
+
+import org.apache.nifi.oauth2.StandardOauth2AccessTokenProvider;
+import org.apache.nifi.processor.Processor;
+import org.apache.nifi.processors.salesforce.util.SalesforceConfigAware;
+import org.apache.nifi.serialization.record.MockRecordParser;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.util.MockComponentLog;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.Collections;
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
+class PutSalesforceRecordIT implements SalesforceConfigAware {
+
+    private TestRunner runner;
+
+    @BeforeEach
+    void setUp() throws Exception {
+        int maxRecordCount = 2;
+        Processor putSalesforceRecord = new CustomPutSalesforceRecord(maxRecordCount);

Review Comment:
   Suggestion: We don't necessarily need a new class for this. `Custom...` is not a particularly descriptive name. An anonymous class shows the intent in a more streamlined way in my opinion.
   ```suggestion
           Processor putSalesforceRecord = new PutSalesforceRecord() {
               @Override
               int getMaxRecordCount() {
                   return 2;
               }
           };
   ```



##########
nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/test/java/org/apache/nifi/processors/salesforce/PutSalesforceRecordIT.java:
##########
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.salesforce;
+
+import org.apache.nifi.oauth2.StandardOauth2AccessTokenProvider;
+import org.apache.nifi.processor.Processor;
+import org.apache.nifi.processors.salesforce.util.SalesforceConfigAware;
+import org.apache.nifi.serialization.record.MockRecordParser;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.util.MockComponentLog;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.Collections;
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
+class PutSalesforceRecordIT implements SalesforceConfigAware {
+
+    private TestRunner runner;
+
+    @BeforeEach
+    void setUp() throws Exception {
+        int maxRecordCount = 2;
+        Processor putSalesforceRecord = new CustomPutSalesforceRecord(maxRecordCount);
+
+        runner = TestRunners.newTestRunner(putSalesforceRecord);
+
+        StandardOauth2AccessTokenProvider oauth2AccessTokenProvider = initOAuth2AccessTokenProvider(runner);
+        runner.setProperty(QuerySalesforceObject.TOKEN_PROVIDER, oauth2AccessTokenProvider.getIdentifier());
+    }
+
+    @Test
+    void testPutSalesforceRecord() throws Exception {
+        MockComponentLog mockComponentLog = new MockComponentLog("id1", "testPutSalesforceRecord");
+        InputStream in = readFile("src/test/resources/json/put_records.json");
+
+        MockFlowFile flowFile = new MockFlowFile(1L);
+        byte[] fileContent = Files.readAllBytes(Paths.get("src/test/resources/json/put_records.json"));
+        flowFile.setData(fileContent);
+        flowFile.putAttributes(Collections.singletonMap("objectType", "Account"));

Review Comment:
   I think this part (as well as the the later `reader.createRecordReader(flowFile, in, mockComponentLog);` call) is not needed. When you mock the records them the input data is going to be ignored.
   ```suggestion
   ```



##########
nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/test/java/org/apache/nifi/processors/salesforce/util/TestRecordExtender.java:
##########
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.salesforce.util;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+
+import static org.apache.nifi.processors.salesforce.util.RecordExtender.ATTRIBUTES_RECORD_SCHEMA;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+class TestRecordExtender {
+
+    private static RecordExtender EXTENDER;
+    private static ObjectMapper OBJECT_MAPPER;
+    private static RecordSchema TEST_RECORD_SCHEMA;
+    private static RecordSchema EXTENDED_TEST_RECORD_SCHEMA;

Review Comment:
   ```suggestion
       private static RecordSchema ORIGINAL_SCHEMA;
       private static RecordSchema EXPECTED_EXTENDED_SCHEMA;
   
   ```



##########
nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/test/java/org/apache/nifi/processors/salesforce/PutSalesforceRecordIT.java:
##########
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.salesforce;
+
+import org.apache.nifi.oauth2.StandardOauth2AccessTokenProvider;
+import org.apache.nifi.processor.Processor;
+import org.apache.nifi.processors.salesforce.util.SalesforceConfigAware;
+import org.apache.nifi.serialization.record.MockRecordParser;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.util.MockComponentLog;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.Collections;
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
+class PutSalesforceRecordIT implements SalesforceConfigAware {
+
+    private TestRunner runner;
+
+    @BeforeEach
+    void setUp() throws Exception {
+        int maxRecordCount = 2;
+        Processor putSalesforceRecord = new CustomPutSalesforceRecord(maxRecordCount);
+
+        runner = TestRunners.newTestRunner(putSalesforceRecord);
+
+        StandardOauth2AccessTokenProvider oauth2AccessTokenProvider = initOAuth2AccessTokenProvider(runner);
+        runner.setProperty(QuerySalesforceObject.TOKEN_PROVIDER, oauth2AccessTokenProvider.getIdentifier());
+    }
+
+    @Test
+    void testPutSalesforceRecord() throws Exception {
+        MockComponentLog mockComponentLog = new MockComponentLog("id1", "testPutSalesforceRecord");
+        InputStream in = readFile("src/test/resources/json/put_records.json");
+
+        MockFlowFile flowFile = new MockFlowFile(1L);
+        byte[] fileContent = Files.readAllBytes(Paths.get("src/test/resources/json/put_records.json"));
+        flowFile.setData(fileContent);
+        flowFile.putAttributes(Collections.singletonMap("objectType", "Account"));
+
+        MockRecordParser reader = new MockRecordParser();
+        reader.addSchemaField("name", RecordFieldType.STRING);
+        reader.addSchemaField("phone", RecordFieldType.STRING);
+        reader.addSchemaField("website", RecordFieldType.STRING);
+        reader.addSchemaField("numberOfEmployees", RecordFieldType.STRING);
+        reader.addSchemaField("industry", RecordFieldType.STRING);
+
+        reader.addRecord("SampleAccount1", "111111", "www.salesforce1.com", "100", "Banking");
+        reader.addRecord("SampleAccount2", "222222", "www.salesforce2.com", "200", "Banking");
+        reader.addRecord("SampleAccount3", "333333", "www.salesforce3.com", "300", "Banking");
+        reader.addRecord("SampleAccount4", "444444", "www.salesforce4.com", "400", "Banking");
+        reader.addRecord("SampleAccount5", "555555", "www.salesforce5.com", "500", "Banking");
+
+        reader.createRecordReader(flowFile, in, mockComponentLog);
+
+        runner.addControllerService("reader", reader);
+        runner.enableControllerService(reader);
+
+        runner.setProperty(PutSalesforceRecord.API_VERSION, VERSION);
+        runner.setProperty(PutSalesforceRecord.API_URL, BASE_URL);
+        runner.setProperty(PutSalesforceRecord.RECORD_READER_FACTORY, reader.getIdentifier());
+
+
+        runner.enqueue(flowFile);

Review Comment:
   With my previous suggestion.
   ```suggestion
           runner.enqueue("", Collections.singletonMap("objectType", "Account"));
   
   ```



##########
nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/test/java/org/apache/nifi/processors/salesforce/PutSalesforceRecordIT.java:
##########
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.salesforce;
+
+import org.apache.nifi.oauth2.StandardOauth2AccessTokenProvider;
+import org.apache.nifi.processor.Processor;
+import org.apache.nifi.processors.salesforce.util.SalesforceConfigAware;
+import org.apache.nifi.serialization.record.MockRecordParser;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.util.MockComponentLog;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.Collections;
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
+class PutSalesforceRecordIT implements SalesforceConfigAware {
+
+    private TestRunner runner;
+
+    @BeforeEach
+    void setUp() throws Exception {
+        int maxRecordCount = 2;
+        Processor putSalesforceRecord = new CustomPutSalesforceRecord(maxRecordCount);
+
+        runner = TestRunners.newTestRunner(putSalesforceRecord);
+
+        StandardOauth2AccessTokenProvider oauth2AccessTokenProvider = initOAuth2AccessTokenProvider(runner);
+        runner.setProperty(QuerySalesforceObject.TOKEN_PROVIDER, oauth2AccessTokenProvider.getIdentifier());
+    }
+
+    @Test
+    void testPutSalesforceRecord() throws Exception {
+        MockComponentLog mockComponentLog = new MockComponentLog("id1", "testPutSalesforceRecord");
+        InputStream in = readFile("src/test/resources/json/put_records.json");
+
+        MockFlowFile flowFile = new MockFlowFile(1L);
+        byte[] fileContent = Files.readAllBytes(Paths.get("src/test/resources/json/put_records.json"));
+        flowFile.setData(fileContent);
+        flowFile.putAttributes(Collections.singletonMap("objectType", "Account"));
+
+        MockRecordParser reader = new MockRecordParser();
+        reader.addSchemaField("name", RecordFieldType.STRING);
+        reader.addSchemaField("phone", RecordFieldType.STRING);
+        reader.addSchemaField("website", RecordFieldType.STRING);
+        reader.addSchemaField("numberOfEmployees", RecordFieldType.STRING);
+        reader.addSchemaField("industry", RecordFieldType.STRING);
+
+        reader.addRecord("SampleAccount1", "111111", "www.salesforce1.com", "100", "Banking");
+        reader.addRecord("SampleAccount2", "222222", "www.salesforce2.com", "200", "Banking");
+        reader.addRecord("SampleAccount3", "333333", "www.salesforce3.com", "300", "Banking");
+        reader.addRecord("SampleAccount4", "444444", "www.salesforce4.com", "400", "Banking");
+        reader.addRecord("SampleAccount5", "555555", "www.salesforce5.com", "500", "Banking");
+
+        reader.createRecordReader(flowFile, in, mockComponentLog);
+
+        runner.addControllerService("reader", reader);
+        runner.enableControllerService(reader);
+
+        runner.setProperty(PutSalesforceRecord.API_VERSION, VERSION);
+        runner.setProperty(PutSalesforceRecord.API_URL, BASE_URL);
+        runner.setProperty(PutSalesforceRecord.RECORD_READER_FACTORY, reader.getIdentifier());
+
+
+        runner.enqueue(flowFile);
+        runner.run();
+
+        List<MockFlowFile> results = runner.getFlowFilesForRelationship(QuerySalesforceObject.REL_SUCCESS);

Review Comment:
   ```suggestion
           List<MockFlowFile> results = runner.getFlowFilesForRelationship(PutSalesforceRecord.REL_SUCCESS);
   
   ```



##########
nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/PutSalesforceRecord.java:
##########
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.salesforce;
+
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.nifi.NullSuppression;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.json.OutputGrouping;
+import org.apache.nifi.json.WriteJsonResult;
+import org.apache.nifi.oauth2.OAuth2AccessTokenProvider;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.salesforce.util.RecordExtender;
+import org.apache.nifi.processors.salesforce.util.SalesforceRestService;
+import org.apache.nifi.schema.access.NopSchemaAccessWriter;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSchema;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Tags({"salesforce", "sobject", "put"})
+@CapabilityDescription("Posts records to a Salesforce sObject. The type of the Salesforce object must be set in the input flowfile's"
+        + " 'objectType' attribute.")
+public class PutSalesforceRecord extends AbstractProcessor {
+
+    private static final int MAX_RECORD_COUNT = 200;
+
+    static final PropertyDescriptor API_URL = new PropertyDescriptor.Builder()
+            .name("salesforce-url")
+            .displayName("URL")
+            .description(
+                    "The URL for the Salesforce REST API including the domain without additional path information, such as https://MyDomainName.my.salesforce.com")
+            .required(true)
+            .addValidator(StandardValidators.URL_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    static final PropertyDescriptor API_VERSION = new PropertyDescriptor.Builder()
+            .name("salesforce-api-version")
+            .displayName("API Version")
+            .description(
+                    "The version number of the Salesforce REST API appended to the URL after the services/data path. See Salesforce documentation for supported versions")
+            .required(true)
+            .addValidator(StandardValidators.NUMBER_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .defaultValue("54.0")
+            .build();
+
+    static final PropertyDescriptor READ_TIMEOUT = new PropertyDescriptor.Builder()
+            .name("read-timeout")
+            .displayName("Read Timeout")
+            .description("Maximum time allowed for reading a response from the Salesforce REST API")
+            .required(true)
+            .defaultValue("15 s")
+            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    static final PropertyDescriptor TOKEN_PROVIDER = new PropertyDescriptor.Builder()
+            .name("oauth2-access-token-provider")
+            .displayName("OAuth2 Access Token Provider")
+            .description(
+                    "Service providing OAuth2 Access Tokens for authenticating using the HTTP Authorization Header")
+            .identifiesControllerService(OAuth2AccessTokenProvider.class)
+            .required(true)
+            .build();
+
+    protected static final PropertyDescriptor RECORD_READER_FACTORY = new PropertyDescriptor.Builder()
+            .name("record-reader")
+            .displayName("Record Reader")
+            .description(
+                    "Specifies the Controller Service to use for parsing incoming data and determining the data's schema")
+            .identifiesControllerService(RecordReaderFactory.class)
+            .required(true)
+            .build();
+
+    static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("For FlowFiles created as a result of a successful execution.")
+            .build();
+
+    static final Relationship REL_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("For FlowFiles created as a result of an execution error.")
+            .build();
+
+    private static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(Arrays.asList(
+            API_URL,
+            API_VERSION,
+            READ_TIMEOUT,
+            TOKEN_PROVIDER,
+            RECORD_READER_FACTORY
+    ));
+
+    private static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
+            REL_SUCCESS,
+            REL_FAILURE
+    )));
+
+    private volatile SalesforceRestService salesforceRestService;
+    private volatile int maxRecordCount;
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return PROPERTIES;
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return RELATIONSHIPS;
+    }
+
+    @OnScheduled
+    public void onScheduled(final ProcessContext context) {
+        maxRecordCount = getMaxRecordCount();
+
+        String salesforceVersion = context.getProperty(API_VERSION).getValue();
+        String baseUrl = context.getProperty(API_URL).getValue();
+        OAuth2AccessTokenProvider accessTokenProvider =
+                context.getProperty(TOKEN_PROVIDER).asControllerService(OAuth2AccessTokenProvider.class);
+
+        salesforceRestService = new SalesforceRestService(
+                salesforceVersion,
+                baseUrl,
+                () -> accessTokenProvider.getAccessDetails().getAccessToken(),
+                context.getProperty(READ_TIMEOUT).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS)
+                        .intValue()
+        );
+    }
+
+    @Override
+    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
+        FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+
+        String objectType = flowFile.getAttribute("objectType");
+        if (objectType == null) {
+            throw new ProcessException("Salesforce object type not found");
+        }
+
+        RecordReaderFactory readerFactory = context.getProperty(RECORD_READER_FACTORY).asControllerService(RecordReaderFactory.class);
+
+        RecordExtender extender = new RecordExtender();
+
+        try (InputStream in = session.read(flowFile);
+             RecordReader reader = readerFactory.createRecordReader(flowFile, in, getLogger());
+             ByteArrayOutputStream out = new ByteArrayOutputStream();
+             WriteJsonResult writer = getWriter(reader.getSchema(), extender, out)) {
+
+            AtomicInteger count = new AtomicInteger();
+            RecordSchema originalSchema = reader.getSchema();
+            Record record;
+
+            while ((record = reader.nextRecord()) != null) {
+                count.incrementAndGet();
+                if (!writer.isActiveRecordSet()) {
+                    writer.beginRecordSet();
+                }
+
+                MapRecord extendedRecord = extender.getExtendedRecord(objectType, originalSchema, count.get(), record);
+                writer.write(extendedRecord);
+
+                if (count.compareAndSet(maxRecordCount, 0)) {

Review Comment:
   `count` is a local variable, cannot be shared among threads. Is there another reason why we need it as an `AtomicInteger`?



##########
nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/test/java/org/apache/nifi/processors/salesforce/util/TestRecordExtender.java:
##########
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.salesforce.util;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+
+import static org.apache.nifi.processors.salesforce.util.RecordExtender.ATTRIBUTES_RECORD_SCHEMA;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+class TestRecordExtender {
+
+    private static RecordExtender EXTENDER;

Review Comment:
   Suggestion
   Usually the test subject should not be a constant.
   
   Also I like to call it `testSubject` because it's much easier to identify it that way in the tests.
   In general it's a good idea to give name _not_ based on the behaviour of the object (i.e. type) but based on its _purpose_ (in this case its primary purpose is to be tested).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] Lehel44 commented on a diff in pull request #6670: NIFI-10832: Create PutSalesforceRecord processor

Posted by GitBox <gi...@apache.org>.
Lehel44 commented on code in PR #6670:
URL: https://github.com/apache/nifi/pull/6670#discussion_r1028478436


##########
nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/test/java/org/apache/nifi/processors/salesforce/PutSalesforceRecordIT.java:
##########
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.salesforce;
+
+import org.apache.nifi.oauth2.StandardOauth2AccessTokenProvider;
+import org.apache.nifi.processor.Processor;
+import org.apache.nifi.processors.salesforce.util.SalesforceConfigAware;
+import org.apache.nifi.serialization.record.MockRecordParser;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.util.MockComponentLog;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.Collections;
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
+class PutSalesforceRecordIT implements SalesforceConfigAware {
+
+    private TestRunner runner;
+
+    @BeforeEach
+    void setUp() throws Exception {
+        int maxRecordCount = 2;
+        Processor putSalesforceRecord = new CustomPutSalesforceRecord(maxRecordCount);
+
+        runner = TestRunners.newTestRunner(putSalesforceRecord);
+
+        StandardOauth2AccessTokenProvider oauth2AccessTokenProvider = initOAuth2AccessTokenProvider(runner);
+        runner.setProperty(QuerySalesforceObject.TOKEN_PROVIDER, oauth2AccessTokenProvider.getIdentifier());
+    }
+
+    @Test
+    void testPutSalesforceRecord() throws Exception {
+        MockComponentLog mockComponentLog = new MockComponentLog("id1", "testPutSalesforceRecord");
+        InputStream in = readFile("src/test/resources/json/put_records.json");
+
+        MockFlowFile flowFile = new MockFlowFile(1L);
+        byte[] fileContent = Files.readAllBytes(Paths.get("src/test/resources/json/put_records.json"));
+        flowFile.setData(fileContent);
+        flowFile.putAttributes(Collections.singletonMap("objectType", "Account"));

Review Comment:
   That's true, thanks for the tip!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] tpalfy commented on a diff in pull request #6670: NIFI-10832: Create PutSalesforceObject processor

Posted by GitBox <gi...@apache.org>.
tpalfy commented on code in PR #6670:
URL: https://github.com/apache/nifi/pull/6670#discussion_r1043582519


##########
nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-nar/src/main/resources/META-INF/NOTICE:
##########
@@ -0,0 +1,101 @@
+nifi-salesforce-nar

Review Comment:
   There are jars in the bundled-dependencies that don't seem to be covered here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org