You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by tu...@apache.org on 2023/02/28 18:26:58 UTC
[nifi] branch main updated: NIFI-11158 PutSalesforceObject processor improvements
This is an automated email from the ASF dual-hosted git repository.
turcsanyi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 60c02225d5 NIFI-11158 PutSalesforceObject processor improvements
60c02225d5 is described below
commit 60c02225d5694efe8f6dab151aa441448fb215c7
Author: krisztina-zsihovszki <zs...@gmail.com>
AuthorDate: Wed Feb 15 12:37:31 2023 +0100
NIFI-11158 PutSalesforceObject processor improvements
This closes #6959.
Reviewed-by: Lehel <le...@hotmail.com>
Reviewed-by: Mark Bathori <ba...@gmail.com>
Signed-off-by: Peter Turcsanyi <tu...@apache.org>
---
.../nifi/util/StandardProcessorTestRunner.java | 13 ++++-
.../main/java/org/apache/nifi/util/TestRunner.java | 7 +++
.../processors/salesforce/PutSalesforceObject.java | 34 ++++++++---
.../salesforce/QuerySalesforceObject.java | 8 ++-
.../processors/salesforce/util/RecordExtender.java | 4 +-
.../salesforce/util/SalesforceRestService.java | 15 ++++-
.../salesforce/PutSalesforceObjectIT.java | 67 +++++++++++++++++++---
.../salesforce/QuerySalesforceObjectIT.java | 2 +
8 files changed, 126 insertions(+), 24 deletions(-)
diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java b/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java
index 63290be359..c113817411 100644
--- a/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java
+++ b/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java
@@ -40,6 +40,7 @@ import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.processor.Processor;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.ProvenanceEventType;
import org.apache.nifi.registry.VariableDescriptor;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.state.MockStateManager;
@@ -72,6 +73,7 @@ import java.util.function.Predicate;
import java.util.stream.Collectors;
import static java.util.Objects.requireNonNull;
+import static java.util.stream.Collectors.toSet;
import static org.junit.jupiter.api.Assertions.assertEquals;
public class StandardProcessorTestRunner implements TestRunner {
@@ -366,7 +368,7 @@ public class StandardProcessorTestRunner implements TestRunner {
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))
)
- .collect(Collectors.toSet());
+ .collect(toSet());
assertEquals(expectedAttributes, actualAttributes);
}
@@ -1055,4 +1057,13 @@ public class StandardProcessorTestRunner implements TestRunner {
public void setRunSchedule(long runSchedule) {
this.runSchedule = runSchedule;
}
+
+ @Override
+ public void assertProvenanceEvent(final ProvenanceEventType eventType) {
+ Set<ProvenanceEventType> expectedEventTypes = Collections.singleton(eventType);
+ Set<ProvenanceEventType> actualEventTypes = getProvenanceEvents().stream()
+ .map(ProvenanceEventRecord::getEventType)
+ .collect(toSet());
+ assertEquals(expectedEventTypes, actualEventTypes);
+ }
}
diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java b/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java
index 7b01ed9709..dbe31e73c8 100644
--- a/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java
+++ b/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java
@@ -28,6 +28,7 @@ import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.processor.Processor;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.ProvenanceEventType;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.state.MockStateManager;
@@ -1062,4 +1063,10 @@ public interface TestRunner {
*/
void setRunSchedule(long runSchedule);
+ /**
+ * Assert that provenance event was created with the specified event type.
+ *
+ * @param eventType Provenance event type
+ */
+ void assertProvenanceEvent(ProvenanceEventType eventType);
}
diff --git a/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/PutSalesforceObject.java b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/PutSalesforceObject.java
index 00d0c2b783..83605800f9 100644
--- a/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/PutSalesforceObject.java
+++ b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/PutSalesforceObject.java
@@ -20,7 +20,9 @@ import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.nifi.NullSuppression;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.ReadsAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
@@ -64,9 +66,13 @@ import static org.apache.nifi.processors.salesforce.util.CommonSalesforcePropert
@CapabilityDescription("Creates new records for the specified Salesforce sObject. The type of the Salesforce object must be set in the input flowfile's"
+ " 'objectType' attribute. This processor cannot update existing records.")
@ReadsAttribute(attribute = "objectType", description = "The Salesforce object type to upload records to. E.g. Account, Contact, Campaign.")
+@WritesAttribute(attribute = "error.message", description = "The error message returned by Salesforce.")
+@SeeAlso(QuerySalesforceObject.class)
public class PutSalesforceObject extends AbstractProcessor {
private static final int MAX_RECORD_COUNT = 200;
+ private static final String ATTR_OBJECT_TYPE = "objectType";
+ private static final String ATTR_ERROR_MESSAGE = "error.message";
protected static final PropertyDescriptor RECORD_READER_FACTORY = new PropertyDescriptor.Builder()
.name("record-reader")
@@ -138,15 +144,19 @@ public class PutSalesforceObject extends AbstractProcessor {
return;
}
- String objectType = flowFile.getAttribute("objectType");
+ String objectType = flowFile.getAttribute(ATTR_OBJECT_TYPE);
if (objectType == null) {
- throw new ProcessException("Salesforce object type not found among the incoming flowfile attributes");
+ getLogger().error("Salesforce object type not found among the incoming FlowFile attributes");
+ flowFile = session.putAttribute(flowFile, ATTR_ERROR_MESSAGE, "Salesforce object type not found among FlowFile attributes");
+ session.transfer(session.penalize(flowFile), REL_FAILURE);
+ return;
}
RecordReaderFactory readerFactory = context.getProperty(RECORD_READER_FACTORY).asControllerService(RecordReaderFactory.class);
RecordExtender extender;
-
+ long startNanos = System.nanoTime();
+ try {
try (InputStream in = session.read(flowFile);
RecordReader reader = readerFactory.createRecordReader(flowFile, in, getLogger());
ByteArrayOutputStream out = new ByteArrayOutputStream();
@@ -170,24 +180,30 @@ public class PutSalesforceObject extends AbstractProcessor {
out.reset();
}
}
-
if (writer.isActiveRecordSet()) {
processRecords(objectType, out, writer, extender);
}
- session.transfer(flowFile, REL_SUCCESS);
-
+ }
+ session.transfer(flowFile, REL_SUCCESS);
+ long transferMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
+ session.getProvenanceReporter().send(flowFile, salesforceRestService.getVersionedBaseUrl()+ "/composite/tree/" + objectType, transferMillis);
} catch (MalformedRecordException e) {
getLogger().error("Couldn't read records from input", e);
- session.transfer(flowFile, REL_FAILURE);
+ transferToFailure(session, flowFile, e);
} catch (SchemaNotFoundException e) {
getLogger().error("Couldn't create record writer", e);
- session.transfer(flowFile, REL_FAILURE);
+ transferToFailure(session, flowFile, e);
} catch (Exception e) {
getLogger().error("Failed to put records to Salesforce.", e);
- session.transfer(flowFile, REL_FAILURE);
+ transferToFailure(session, flowFile, e);
}
}
+ private void transferToFailure(ProcessSession session, FlowFile flowFile, Exception e) {
+ flowFile = session.putAttribute(flowFile, ATTR_ERROR_MESSAGE, e.getMessage());
+ session.transfer(session.penalize(flowFile), REL_FAILURE);
+ }
+
private void processRecords(String objectType, ByteArrayOutputStream out, WriteJsonResult writer, RecordExtender extender) throws IOException {
writer.finishRecordSet();
writer.flush();
diff --git a/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/QuerySalesforceObject.java b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/QuerySalesforceObject.java
index 169e3cdd8b..edea559560 100644
--- a/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/QuerySalesforceObject.java
+++ b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/QuerySalesforceObject.java
@@ -32,6 +32,7 @@ import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.configuration.DefaultSchedule;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.AllowableValue;
@@ -115,6 +116,7 @@ import static org.apache.nifi.processors.salesforce.util.CommonSalesforcePropert
@WritesAttribute(attribute = "record.count", description = "Sets the number of records in the FlowFile.")
})
@DefaultSchedule(strategy = SchedulingStrategy.TIMER_DRIVEN, period = "1 min")
+@SeeAlso(PutSalesforceObject.class)
public class QuerySalesforceObject extends AbstractProcessor {
static final AllowableValue PROPERTY_BASED_QUERY = new AllowableValue("property-based-query", "Property Based Query", "Provide query by properties.");
@@ -398,7 +400,7 @@ public class QuerySalesforceObject extends AbstractProcessor {
Map<String, String> attributes = new HashMap<>();
AtomicInteger recordCountHolder = new AtomicInteger();
-
+ long startNanos = System.nanoTime();
flowFile = session.write(flowFile, out -> {
try (
InputStream querySObjectResultInputStream = getResultInputStream(nextRecordsUrl.get(), querySObject);
@@ -465,6 +467,10 @@ public class QuerySalesforceObject extends AbstractProcessor {
flowFile = session.putAllAttributes(flowFile, attributes);
session.transfer(flowFile, REL_SUCCESS);
+ long transferMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
+ session.getProvenanceReporter().receive(flowFile, salesforceRestService.getVersionedBaseUrl() + "/composite/tree/" + sObject,
+ transferMillis);
+
session.adjustCounter("Records Processed", recordCount, false);
getLogger().info("Successfully written {} records for {}", recordCount, flowFile);
}
diff --git a/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/util/RecordExtender.java b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/util/RecordExtender.java
index 9e8b03e9d5..6af98cd725 100644
--- a/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/util/RecordExtender.java
+++ b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/util/RecordExtender.java
@@ -66,7 +66,9 @@ public class RecordExtender {
public MapRecord getExtendedRecord(String objectType, int count, Record record) {
- Set<String> rawFieldNames = record.getRawFieldNames();
+ Set<String> rawFieldNames = record.getRawFieldNames().stream()
+ .filter(fieldName -> record.getValue(fieldName) != null)
+ .collect(Collectors.toSet());
Map<String, Object> objectMap = rawFieldNames.stream()
.collect(Collectors.toMap(Function.identity(), record::getValue));
diff --git a/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/util/SalesforceRestService.java b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/util/SalesforceRestService.java
index cc3f4b2bf8..0affbc441e 100644
--- a/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/util/SalesforceRestService.java
+++ b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/util/SalesforceRestService.java
@@ -44,7 +44,7 @@ public class SalesforceRestService {
}
public InputStream describeSObject(String sObject) {
- String url = baseUrl + "/services/data/v" + version + "/sobjects/" + sObject + "/describe?maxRecords=1";
+ String url = getVersionedBaseUrl() + "/sobjects/" + sObject + "/describe?maxRecords=1";
Request request = new Request.Builder()
.addHeader("Authorization", "Bearer " + accessTokenProvider.get())
@@ -56,7 +56,7 @@ public class SalesforceRestService {
}
public InputStream query(String query) {
- String url = baseUrl + "/services/data/v" + version + "/query";
+ String url = getVersionedBaseUrl() + "/query";
HttpUrl httpUrl = HttpUrl.get(url).newBuilder()
.addQueryParameter("q", query)
@@ -87,7 +87,7 @@ public class SalesforceRestService {
}
public InputStream postRecord(String sObjectApiName, String body) {
- String url = baseUrl + "/services/data/v" + version + "/composite/tree/" + sObjectApiName;
+ String url = getVersionedBaseUrl() + "/composite/tree/" + sObjectApiName;
HttpUrl httpUrl = HttpUrl.get(url).newBuilder()
.build();
@@ -103,6 +103,10 @@ public class SalesforceRestService {
return request(request);
}
+ public String getVersionedBaseUrl() {
+ return baseUrl + "/services/data/v" + version;
+ }
+
private InputStream request(Request request) {
Response response = null;
try {
@@ -115,6 +119,11 @@ public class SalesforceRestService {
);
}
return response.body().byteStream();
+ } catch (ProcessException e) {
+ if (response != null) {
+ response.close();
+ }
+ throw e;
} catch (Exception e) {
if (response != null) {
response.close();
diff --git a/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/test/java/org/apache/nifi/processors/salesforce/PutSalesforceObjectIT.java b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/test/java/org/apache/nifi/processors/salesforce/PutSalesforceObjectIT.java
index 390fa8ac0f..2c296a119f 100644
--- a/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/test/java/org/apache/nifi/processors/salesforce/PutSalesforceObjectIT.java
+++ b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/test/java/org/apache/nifi/processors/salesforce/PutSalesforceObjectIT.java
@@ -20,6 +20,8 @@ import org.apache.nifi.oauth2.StandardOauth2AccessTokenProvider;
import org.apache.nifi.processor.Processor;
import org.apache.nifi.processors.salesforce.util.CommonSalesforceProperties;
import org.apache.nifi.processors.salesforce.util.SalesforceConfigAware;
+import org.apache.nifi.provenance.ProvenanceEventType;
+import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.serialization.record.MockRecordParser;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.util.MockFlowFile;
@@ -32,6 +34,7 @@ import java.util.Collections;
import java.util.List;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
class PutSalesforceObjectIT implements SalesforceConfigAware {
@@ -62,25 +65,71 @@ class PutSalesforceObjectIT implements SalesforceConfigAware {
reader.addSchemaField("numberOfEmployees", RecordFieldType.STRING);
reader.addSchemaField("industry", RecordFieldType.STRING);
- reader.addRecord("SampleAccount1", "111111", "www.salesforce1.com", "100", "Banking");
- reader.addRecord("SampleAccount2", "222222", "www.salesforce2.com", "200", "Banking");
+ reader.addRecord("SampleAccount1", "111111", "www.salesforce1.com", null, "Banking");
+ reader.addRecord("SampleAccount2", null, "www.salesforce2.com", "200", "Banking");
reader.addRecord("SampleAccount3", "333333", "www.salesforce3.com", "300", "Banking");
- reader.addRecord("SampleAccount4", "444444", "www.salesforce4.com", "400", "Banking");
+ reader.addRecord("SampleAccount4", "444444", null, "400", "Banking");
reader.addRecord("SampleAccount5", "555555", "www.salesforce5.com", "500", "Banking");
runner.enqueue("", Collections.singletonMap("objectType", "Account"));
- runner.addControllerService("reader", reader);
- runner.enableControllerService(reader);
-
- runner.setProperty(CommonSalesforceProperties.API_VERSION, VERSION);
- runner.setProperty(CommonSalesforceProperties.API_URL, BASE_URL);
- runner.setProperty(PutSalesforceObject.RECORD_READER_FACTORY, reader.getIdentifier());
+ configureProcessor(reader);
runner.run();
List<MockFlowFile> results = runner.getFlowFilesForRelationship(PutSalesforceObject.REL_SUCCESS);
assertEquals(1, results.size());
+
+ runner.assertProvenanceEvent(ProvenanceEventType.SEND);
+ }
+
+ @Test
+ void testMissingObjectType() throws Exception {
+ MockRecordParser reader = new MockRecordParser();
+
+ runner.enqueue("");
+
+ configureProcessor(reader);
+
+ runner.run();
+
+ List<MockFlowFile> results = runner.getFlowFilesForRelationship(PutSalesforceObject.REL_FAILURE);
+ assertEquals(1, results.size());
+ assertTrue(runner.getProvenanceEvents().isEmpty());
+
+ List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(PutSalesforceObject.REL_FAILURE);
+ MockFlowFile ff0 = flowFiles.get(0);
+ ff0.assertAttributeExists("error.message");
+ }
+
+ @Test
+ void testErrorForInvalidRecordField() throws Exception {
+ MockRecordParser reader = new MockRecordParser();
+ reader.addSchemaField("invalidField", RecordFieldType.STRING);
+ reader.addRecord("invalidField");
+
+ runner.enqueue("", Collections.singletonMap("objectType", "Account"));
+
+ configureProcessor(reader);
+
+ runner.run();
+
+ List<MockFlowFile> results = runner.getFlowFilesForRelationship(PutSalesforceObject.REL_FAILURE);
+ assertEquals(1, results.size());
+ assertTrue(runner.getProvenanceEvents().isEmpty());
+
+ List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(PutSalesforceObject.REL_FAILURE);
+ MockFlowFile ff0 = flowFiles.get(0);
+ ff0.assertAttributeExists("error.message");
+ }
+
+ private void configureProcessor(final MockRecordParser reader) throws InitializationException {
+ runner.addControllerService("reader", reader);
+ runner.enableControllerService(reader);
+
+ runner.setProperty(CommonSalesforceProperties.API_VERSION, VERSION);
+ runner.setProperty(CommonSalesforceProperties.API_URL, BASE_URL);
+ runner.setProperty(PutSalesforceObject.RECORD_READER_FACTORY, reader.getIdentifier());
}
}
diff --git a/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/test/java/org/apache/nifi/processors/salesforce/QuerySalesforceObjectIT.java b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/test/java/org/apache/nifi/processors/salesforce/QuerySalesforceObjectIT.java
index b3abddebbf..6c91c728ff 100644
--- a/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/test/java/org/apache/nifi/processors/salesforce/QuerySalesforceObjectIT.java
+++ b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/test/java/org/apache/nifi/processors/salesforce/QuerySalesforceObjectIT.java
@@ -20,6 +20,7 @@ import org.apache.nifi.oauth2.StandardOauth2AccessTokenProvider;
import org.apache.nifi.processor.Processor;
import org.apache.nifi.processors.salesforce.util.CommonSalesforceProperties;
import org.apache.nifi.processors.salesforce.util.SalesforceConfigAware;
+import org.apache.nifi.provenance.ProvenanceEventType;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.record.MockRecordWriter;
import org.apache.nifi.util.MockFlowFile;
@@ -77,6 +78,7 @@ class QuerySalesforceObjectIT implements SalesforceConfigAware {
List<MockFlowFile> results = runner.getFlowFilesForRelationship(QuerySalesforceObject.REL_SUCCESS);
assertNotNull(results.get(0).getContent());
+ runner.assertProvenanceEvent(ProvenanceEventType.RECEIVE);
}
@Test