You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by tu...@apache.org on 2023/02/28 18:26:58 UTC

[nifi] branch main updated: NIFI-11158 PutSalesforceObject processor improvements

This is an automated email from the ASF dual-hosted git repository.

turcsanyi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 60c02225d5 NIFI-11158 PutSalesforceObject processor improvements
60c02225d5 is described below

commit 60c02225d5694efe8f6dab151aa441448fb215c7
Author: krisztina-zsihovszki <zs...@gmail.com>
AuthorDate: Wed Feb 15 12:37:31 2023 +0100

    NIFI-11158 PutSalesforceObject processor improvements
    
    This closes #6959.
    
    Reviewed-by: Lehel <le...@hotmail.com>
    Reviewed-by: Mark Bathori <ba...@gmail.com>
    
    Signed-off-by: Peter Turcsanyi <tu...@apache.org>
---
 .../nifi/util/StandardProcessorTestRunner.java     | 13 ++++-
 .../main/java/org/apache/nifi/util/TestRunner.java |  7 +++
 .../processors/salesforce/PutSalesforceObject.java | 34 ++++++++---
 .../salesforce/QuerySalesforceObject.java          |  8 ++-
 .../processors/salesforce/util/RecordExtender.java |  4 +-
 .../salesforce/util/SalesforceRestService.java     | 15 ++++-
 .../salesforce/PutSalesforceObjectIT.java          | 67 +++++++++++++++++++---
 .../salesforce/QuerySalesforceObjectIT.java        |  2 +
 8 files changed, 126 insertions(+), 24 deletions(-)

diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java b/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java
index 63290be359..c113817411 100644
--- a/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java
+++ b/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java
@@ -40,6 +40,7 @@ import org.apache.nifi.processor.ProcessSessionFactory;
 import org.apache.nifi.processor.Processor;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.ProvenanceEventType;
 import org.apache.nifi.registry.VariableDescriptor;
 import org.apache.nifi.reporting.InitializationException;
 import org.apache.nifi.state.MockStateManager;
@@ -72,6 +73,7 @@ import java.util.function.Predicate;
 import java.util.stream.Collectors;
 
 import static java.util.Objects.requireNonNull;
+import static java.util.stream.Collectors.toSet;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 
 public class StandardProcessorTestRunner implements TestRunner {
@@ -366,7 +368,7 @@ public class StandardProcessorTestRunner implements TestRunner {
                 .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))
 
             )
-            .collect(Collectors.toSet());
+            .collect(toSet());
 
         assertEquals(expectedAttributes, actualAttributes);
     }
@@ -1055,4 +1057,13 @@ public class StandardProcessorTestRunner implements TestRunner {
     public void setRunSchedule(long runSchedule) {
         this.runSchedule = runSchedule;
     }
+
+    @Override
+    public void assertProvenanceEvent(final ProvenanceEventType eventType) {
+        Set<ProvenanceEventType> expectedEventTypes = Collections.singleton(eventType);
+        Set<ProvenanceEventType> actualEventTypes = getProvenanceEvents().stream()
+                        .map(ProvenanceEventRecord::getEventType)
+                        .collect(toSet());
+        assertEquals(expectedEventTypes, actualEventTypes);
+    }
 }
diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java b/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java
index 7b01ed9709..dbe31e73c8 100644
--- a/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java
+++ b/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java
@@ -28,6 +28,7 @@ import org.apache.nifi.processor.ProcessSessionFactory;
 import org.apache.nifi.processor.Processor;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.ProvenanceEventType;
 import org.apache.nifi.reporting.InitializationException;
 import org.apache.nifi.state.MockStateManager;
 
@@ -1062,4 +1063,10 @@ public interface TestRunner {
      */
      void setRunSchedule(long runSchedule);
 
+    /**
+     * Assert that provenance event was created with the specified event type.
+     *
+     * @param eventType Provenance event type
+     */
+     void assertProvenanceEvent(ProvenanceEventType eventType);
 }
diff --git a/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/PutSalesforceObject.java b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/PutSalesforceObject.java
index 00d0c2b783..83605800f9 100644
--- a/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/PutSalesforceObject.java
+++ b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/PutSalesforceObject.java
@@ -20,7 +20,9 @@ import com.fasterxml.jackson.databind.node.ObjectNode;
 import org.apache.nifi.NullSuppression;
 import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.ReadsAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
 import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.annotation.lifecycle.OnScheduled;
 import org.apache.nifi.components.PropertyDescriptor;
@@ -64,9 +66,13 @@ import static org.apache.nifi.processors.salesforce.util.CommonSalesforcePropert
 @CapabilityDescription("Creates new records for the specified Salesforce sObject. The type of the Salesforce object must be set in the input flowfile's"
         + " 'objectType' attribute. This processor cannot update existing records.")
 @ReadsAttribute(attribute = "objectType", description = "The Salesforce object type to upload records to. E.g. Account, Contact, Campaign.")
+@WritesAttribute(attribute = "error.message", description = "The error message returned by Salesforce.")
+@SeeAlso(QuerySalesforceObject.class)
 public class PutSalesforceObject extends AbstractProcessor {
 
     private static final int MAX_RECORD_COUNT = 200;
+    private static final String ATTR_OBJECT_TYPE = "objectType";
+    private static final String ATTR_ERROR_MESSAGE = "error.message";
 
     protected static final PropertyDescriptor RECORD_READER_FACTORY = new PropertyDescriptor.Builder()
             .name("record-reader")
@@ -138,15 +144,19 @@ public class PutSalesforceObject extends AbstractProcessor {
             return;
         }
 
-        String objectType = flowFile.getAttribute("objectType");
+        String objectType = flowFile.getAttribute(ATTR_OBJECT_TYPE);
         if (objectType == null) {
-            throw new ProcessException("Salesforce object type not found among the incoming flowfile attributes");
+            getLogger().error("Salesforce object type not found among the incoming FlowFile attributes");
+            flowFile = session.putAttribute(flowFile, ATTR_ERROR_MESSAGE, "Salesforce object type not found among FlowFile attributes");
+            session.transfer(session.penalize(flowFile), REL_FAILURE);
+            return;
         }
 
         RecordReaderFactory readerFactory = context.getProperty(RECORD_READER_FACTORY).asControllerService(RecordReaderFactory.class);
 
         RecordExtender extender;
-
+        long startNanos = System.nanoTime();
+        try {
         try (InputStream in = session.read(flowFile);
              RecordReader reader = readerFactory.createRecordReader(flowFile, in, getLogger());
              ByteArrayOutputStream out = new ByteArrayOutputStream();
@@ -170,24 +180,30 @@ public class PutSalesforceObject extends AbstractProcessor {
                     out.reset();
                 }
             }
-
             if (writer.isActiveRecordSet()) {
                 processRecords(objectType, out, writer, extender);
             }
-            session.transfer(flowFile, REL_SUCCESS);
-
+          }
+          session.transfer(flowFile, REL_SUCCESS);
+          long transferMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
+          session.getProvenanceReporter().send(flowFile, salesforceRestService.getVersionedBaseUrl()+ "/composite/tree/" + objectType, transferMillis);
         } catch (MalformedRecordException e) {
             getLogger().error("Couldn't read records from input", e);
-            session.transfer(flowFile, REL_FAILURE);
+            transferToFailure(session, flowFile, e);
         } catch (SchemaNotFoundException e) {
             getLogger().error("Couldn't create record writer", e);
-            session.transfer(flowFile, REL_FAILURE);
+            transferToFailure(session, flowFile, e);
         } catch (Exception e) {
             getLogger().error("Failed to put records to Salesforce.", e);
-            session.transfer(flowFile, REL_FAILURE);
+            transferToFailure(session, flowFile, e);
         }
     }
 
+    private void transferToFailure(ProcessSession session, FlowFile flowFile, Exception e) {
+        flowFile = session.putAttribute(flowFile, ATTR_ERROR_MESSAGE, e.getMessage());
+        session.transfer(session.penalize(flowFile), REL_FAILURE);
+    }
+
     private void processRecords(String objectType, ByteArrayOutputStream out, WriteJsonResult writer, RecordExtender extender) throws IOException {
         writer.finishRecordSet();
         writer.flush();
diff --git a/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/QuerySalesforceObject.java b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/QuerySalesforceObject.java
index 169e3cdd8b..edea559560 100644
--- a/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/QuerySalesforceObject.java
+++ b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/QuerySalesforceObject.java
@@ -32,6 +32,7 @@ import org.apache.nifi.annotation.behavior.WritesAttribute;
 import org.apache.nifi.annotation.behavior.WritesAttributes;
 import org.apache.nifi.annotation.configuration.DefaultSchedule;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
 import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.annotation.lifecycle.OnScheduled;
 import org.apache.nifi.components.AllowableValue;
@@ -115,6 +116,7 @@ import static org.apache.nifi.processors.salesforce.util.CommonSalesforcePropert
         @WritesAttribute(attribute = "record.count", description = "Sets the number of records in the FlowFile.")
 })
 @DefaultSchedule(strategy = SchedulingStrategy.TIMER_DRIVEN, period = "1 min")
+@SeeAlso(PutSalesforceObject.class)
 public class QuerySalesforceObject extends AbstractProcessor {
 
     static final AllowableValue PROPERTY_BASED_QUERY = new AllowableValue("property-based-query", "Property Based Query", "Provide query by properties.");
@@ -398,7 +400,7 @@ public class QuerySalesforceObject extends AbstractProcessor {
             Map<String, String> attributes = new HashMap<>();
 
             AtomicInteger recordCountHolder = new AtomicInteger();
-
+            long startNanos = System.nanoTime();
             flowFile = session.write(flowFile, out -> {
                 try (
                         InputStream querySObjectResultInputStream = getResultInputStream(nextRecordsUrl.get(), querySObject);
@@ -465,6 +467,10 @@ public class QuerySalesforceObject extends AbstractProcessor {
                 flowFile = session.putAllAttributes(flowFile, attributes);
                 session.transfer(flowFile, REL_SUCCESS);
 
+                long transferMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
+                session.getProvenanceReporter().receive(flowFile, salesforceRestService.getVersionedBaseUrl() + "/composite/tree/" + sObject,
+                        transferMillis);
+
                 session.adjustCounter("Records Processed", recordCount, false);
                 getLogger().info("Successfully written {} records for {}", recordCount, flowFile);
             }
diff --git a/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/util/RecordExtender.java b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/util/RecordExtender.java
index 9e8b03e9d5..6af98cd725 100644
--- a/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/util/RecordExtender.java
+++ b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/util/RecordExtender.java
@@ -66,7 +66,9 @@ public class RecordExtender {
 
     public MapRecord getExtendedRecord(String objectType, int count, Record record) {
 
-        Set<String> rawFieldNames = record.getRawFieldNames();
+        Set<String> rawFieldNames = record.getRawFieldNames().stream()
+                .filter(fieldName -> record.getValue(fieldName) != null)
+                .collect(Collectors.toSet());
         Map<String, Object> objectMap = rawFieldNames.stream()
                 .collect(Collectors.toMap(Function.identity(), record::getValue));
 
diff --git a/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/util/SalesforceRestService.java b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/util/SalesforceRestService.java
index cc3f4b2bf8..0affbc441e 100644
--- a/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/util/SalesforceRestService.java
+++ b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/util/SalesforceRestService.java
@@ -44,7 +44,7 @@ public class SalesforceRestService {
     }
 
     public InputStream describeSObject(String sObject) {
-        String url = baseUrl + "/services/data/v" + version + "/sobjects/" + sObject + "/describe?maxRecords=1";
+        String url = getVersionedBaseUrl() + "/sobjects/" + sObject + "/describe?maxRecords=1";
 
         Request request = new Request.Builder()
                 .addHeader("Authorization", "Bearer " + accessTokenProvider.get())
@@ -56,7 +56,7 @@ public class SalesforceRestService {
     }
 
     public InputStream query(String query) {
-        String url = baseUrl + "/services/data/v" + version + "/query";
+        String url = getVersionedBaseUrl() + "/query";
 
         HttpUrl httpUrl = HttpUrl.get(url).newBuilder()
                 .addQueryParameter("q", query)
@@ -87,7 +87,7 @@ public class SalesforceRestService {
     }
 
     public InputStream postRecord(String sObjectApiName, String body) {
-        String url = baseUrl + "/services/data/v" + version + "/composite/tree/" + sObjectApiName;
+        String url = getVersionedBaseUrl() + "/composite/tree/" + sObjectApiName;
 
         HttpUrl httpUrl = HttpUrl.get(url).newBuilder()
                 .build();
@@ -103,6 +103,10 @@ public class SalesforceRestService {
         return request(request);
     }
 
+    public String getVersionedBaseUrl() {
+        return baseUrl + "/services/data/v" + version;
+    }
+
     private InputStream request(Request request) {
         Response response = null;
         try {
@@ -115,6 +119,11 @@ public class SalesforceRestService {
                 );
             }
             return response.body().byteStream();
+        } catch (ProcessException e) {
+            if (response != null) {
+                response.close();
+            }
+            throw e;
         } catch (Exception e) {
             if (response != null) {
                 response.close();
diff --git a/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/test/java/org/apache/nifi/processors/salesforce/PutSalesforceObjectIT.java b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/test/java/org/apache/nifi/processors/salesforce/PutSalesforceObjectIT.java
index 390fa8ac0f..2c296a119f 100644
--- a/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/test/java/org/apache/nifi/processors/salesforce/PutSalesforceObjectIT.java
+++ b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/test/java/org/apache/nifi/processors/salesforce/PutSalesforceObjectIT.java
@@ -20,6 +20,8 @@ import org.apache.nifi.oauth2.StandardOauth2AccessTokenProvider;
 import org.apache.nifi.processor.Processor;
 import org.apache.nifi.processors.salesforce.util.CommonSalesforceProperties;
 import org.apache.nifi.processors.salesforce.util.SalesforceConfigAware;
+import org.apache.nifi.provenance.ProvenanceEventType;
+import org.apache.nifi.reporting.InitializationException;
 import org.apache.nifi.serialization.record.MockRecordParser;
 import org.apache.nifi.serialization.record.RecordFieldType;
 import org.apache.nifi.util.MockFlowFile;
@@ -32,6 +34,7 @@ import java.util.Collections;
 import java.util.List;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
 class PutSalesforceObjectIT implements SalesforceConfigAware {
 
@@ -62,25 +65,71 @@ class PutSalesforceObjectIT implements SalesforceConfigAware {
         reader.addSchemaField("numberOfEmployees", RecordFieldType.STRING);
         reader.addSchemaField("industry", RecordFieldType.STRING);
 
-        reader.addRecord("SampleAccount1", "111111", "www.salesforce1.com", "100", "Banking");
-        reader.addRecord("SampleAccount2", "222222", "www.salesforce2.com", "200", "Banking");
+        reader.addRecord("SampleAccount1", "111111", "www.salesforce1.com", null, "Banking");
+        reader.addRecord("SampleAccount2", null, "www.salesforce2.com", "200", "Banking");
         reader.addRecord("SampleAccount3", "333333", "www.salesforce3.com", "300", "Banking");
-        reader.addRecord("SampleAccount4", "444444", "www.salesforce4.com", "400", "Banking");
+        reader.addRecord("SampleAccount4", "444444", null, "400", "Banking");
         reader.addRecord("SampleAccount5", "555555", "www.salesforce5.com", "500", "Banking");
 
         runner.enqueue("", Collections.singletonMap("objectType", "Account"));
 
-        runner.addControllerService("reader", reader);
-        runner.enableControllerService(reader);
-
-        runner.setProperty(CommonSalesforceProperties.API_VERSION, VERSION);
-        runner.setProperty(CommonSalesforceProperties.API_URL, BASE_URL);
-        runner.setProperty(PutSalesforceObject.RECORD_READER_FACTORY, reader.getIdentifier());
+        configureProcessor(reader);
 
         runner.run();
 
         List<MockFlowFile> results = runner.getFlowFilesForRelationship(PutSalesforceObject.REL_SUCCESS);
 
         assertEquals(1, results.size());
+
+        runner.assertProvenanceEvent(ProvenanceEventType.SEND);
+    }
+
+    @Test
+    void testMissingObjectType() throws Exception {
+        MockRecordParser reader = new MockRecordParser();
+
+        runner.enqueue("");
+
+        configureProcessor(reader);
+
+        runner.run();
+
+        List<MockFlowFile> results = runner.getFlowFilesForRelationship(PutSalesforceObject.REL_FAILURE);
+        assertEquals(1, results.size());
+        assertTrue(runner.getProvenanceEvents().isEmpty());
+
+        List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(PutSalesforceObject.REL_FAILURE);
+        MockFlowFile ff0 = flowFiles.get(0);
+        ff0.assertAttributeExists("error.message");
+    }
+
+    @Test
+    void testErrorForInvalidRecordField() throws Exception {
+        MockRecordParser reader = new MockRecordParser();
+        reader.addSchemaField("invalidField", RecordFieldType.STRING);
+        reader.addRecord("invalidField");
+
+        runner.enqueue("", Collections.singletonMap("objectType", "Account"));
+
+        configureProcessor(reader);
+
+        runner.run();
+
+        List<MockFlowFile> results = runner.getFlowFilesForRelationship(PutSalesforceObject.REL_FAILURE);
+        assertEquals(1, results.size());
+        assertTrue(runner.getProvenanceEvents().isEmpty());
+
+        List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(PutSalesforceObject.REL_FAILURE);
+        MockFlowFile ff0 = flowFiles.get(0);
+        ff0.assertAttributeExists("error.message");
+    }
+
+    private void configureProcessor(final MockRecordParser reader) throws InitializationException {
+        runner.addControllerService("reader", reader);
+        runner.enableControllerService(reader);
+
+        runner.setProperty(CommonSalesforceProperties.API_VERSION, VERSION);
+        runner.setProperty(CommonSalesforceProperties.API_URL, BASE_URL);
+        runner.setProperty(PutSalesforceObject.RECORD_READER_FACTORY, reader.getIdentifier());
     }
 }
diff --git a/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/test/java/org/apache/nifi/processors/salesforce/QuerySalesforceObjectIT.java b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/test/java/org/apache/nifi/processors/salesforce/QuerySalesforceObjectIT.java
index b3abddebbf..6c91c728ff 100644
--- a/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/test/java/org/apache/nifi/processors/salesforce/QuerySalesforceObjectIT.java
+++ b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/test/java/org/apache/nifi/processors/salesforce/QuerySalesforceObjectIT.java
@@ -20,6 +20,7 @@ import org.apache.nifi.oauth2.StandardOauth2AccessTokenProvider;
 import org.apache.nifi.processor.Processor;
 import org.apache.nifi.processors.salesforce.util.CommonSalesforceProperties;
 import org.apache.nifi.processors.salesforce.util.SalesforceConfigAware;
+import org.apache.nifi.provenance.ProvenanceEventType;
 import org.apache.nifi.serialization.RecordSetWriterFactory;
 import org.apache.nifi.serialization.record.MockRecordWriter;
 import org.apache.nifi.util.MockFlowFile;
@@ -77,6 +78,7 @@ class QuerySalesforceObjectIT implements SalesforceConfigAware {
         List<MockFlowFile> results = runner.getFlowFilesForRelationship(QuerySalesforceObject.REL_SUCCESS);
 
         assertNotNull(results.get(0).getContent());
+        runner.assertProvenanceEvent(ProvenanceEventType.RECEIVE);
     }
 
     @Test