You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by GitBox <gi...@apache.org> on 2022/12/18 22:08:47 UTC

[GitHub] [nifi] Lehel44 opened a new pull request, #6794: NIFI-10966: Add option to QuerySalesforceObject to run custom query

Lehel44 opened a new pull request, #6794:
URL: https://github.com/apache/nifi/pull/6794

   <!-- Licensed to the Apache Software Foundation (ASF) under one or more -->
   <!-- contributor license agreements.  See the NOTICE file distributed with -->
   <!-- this work for additional information regarding copyright ownership. -->
   <!-- The ASF licenses this file to You under the Apache License, Version 2.0 -->
   <!-- (the "License"); you may not use this file except in compliance with -->
   <!-- the License.  You may obtain a copy of the License at -->
   <!--     http://www.apache.org/licenses/LICENSE-2.0 -->
   <!-- Unless required by applicable law or agreed to in writing, software -->
   <!-- distributed under the License is distributed on an "AS IS" BASIS, -->
   <!-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -->
   <!-- See the License for the specific language governing permissions and -->
   <!-- limitations under the License. -->
   
   # Summary
   
   [NIFI-10966](https://issues.apache.org/jira/browse/NIFI-10966)
   
   - Query Type property added: Query Parameters (original behavior), Custom Query
   - Since we cannot define a schema for custom objects and inferring can be slow, in case of Custom Query mode record processing is not supported
   
   # Tracking
   
   Please complete the following tracking steps prior to pull request creation.
   
   ### Issue Tracking
   
   - [ ] [Apache NiFi Jira](https://issues.apache.org/jira/browse/NIFI-10966) issue created
   
   ### Pull Request Tracking
   
   - [ ] Pull Request title starts with Apache NiFi Jira issue number, such as `NIFI-00000`
   - [ ] Pull Request commit message starts with Apache NiFi Jira issue number, as such `NIFI-00000`
   
   ### Pull Request Formatting
   
   - [ ] Pull Request based on current revision of the `main` branch
   - [ ] Pull Request refers to a feature branch with one commit containing changes
   
   # Verification
   
   Please indicate the verification steps performed prior to pull request creation.
   
   ### Build
   
   - [ ] Build completed using `mvn clean install -P contrib-check`
     - [ ] JDK 8
     - [ ] JDK 11
     - [ ] JDK 17
   
   ### Licensing
   
   - [ ] New dependencies are compatible with the [Apache License 2.0](https://apache.org/licenses/LICENSE-2.0) according to the [License Policy](https://www.apache.org/legal/resolved.html)
   - [ ] New dependencies are documented in applicable `LICENSE` and `NOTICE` files
   
   ### Documentation
   
   - [ ] Documentation formatting appears as expected in rendered files
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] Lehel44 commented on a diff in pull request #6794: NIFI-10966: Add option to QuerySalesforceObject to run custom query

Posted by "Lehel44 (via GitHub)" <gi...@apache.org>.
Lehel44 commented on code in PR #6794:
URL: https://github.com/apache/nifi/pull/6794#discussion_r1113585762


##########
nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/QuerySalesforceObject.java:
##########
@@ -387,11 +461,63 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
         } while (nextRecordsUrl.get() != null);
     }
 
-    private InputStream getResultInputStream(AtomicReference<String> nextRecordsUrl, String querySObject) {
-        if (nextRecordsUrl.get() == null) {
+    private void processCustomQuery(ProcessContext context, ProcessSession session, FlowFile flowFile, AtomicReference<String> nextRecordsUrl, AtomicReference<String> totalSize) {
+        String customQuery = context.getProperty(CUSTOM_SOQL_QUERY).evaluateAttributeExpressions(flowFile).getValue();
+        do {
+            FlowFile outgoingFlowFile;
+            if (flowFile != null) {
+                outgoingFlowFile = session.create(flowFile);
+            } else {
+                outgoingFlowFile = session.create();
+            }
+            Map<String, String> attributes = new HashMap<>();
+            try (InputStream response = getResultInputStream(nextRecordsUrl.get(), customQuery)) {
+                outgoingFlowFile = session.write(outgoingFlowFile, parseHttpResponse(response, nextRecordsUrl, totalSize));
+                int recordCount = nextRecordsUrl.get() != null ? 2000 : Integer.parseInt(totalSize.get()) % 2000;
+                attributes.put(CoreAttributes.MIME_TYPE.key(), "application/json");
+                attributes.put(TOTAL_RECORD_COUNT, String.valueOf(recordCount));
+                session.adjustCounter("Salesforce records processed", recordCount, false);
+                session.putAllAttributes(outgoingFlowFile, attributes);
+                session.transfer(outgoingFlowFile, REL_SUCCESS);
+            } catch (IOException e) {
+                session.remove(outgoingFlowFile);
+                throw new ProcessException("Couldn't get Salesforce records", e);
+            }
+        } while (nextRecordsUrl.get() != null);
+        if (flowFile != null) {
+            session.remove(flowFile);
+        }
+    }
+
+    private OutputStreamCallback parseHttpResponse(final InputStream in, AtomicReference<String> nextRecordsUrl, AtomicReference<String> totalSize) {
+        nextRecordsUrl.set(null);
+        return out -> {
+            try (JsonParser jsonParser = JSON_FACTORY.createParser(in);
+                 JsonGenerator jsonGenerator = JSON_FACTORY.createGenerator(out, JsonEncoding.UTF8)) {
+                while (jsonParser.nextToken() != null) {
+                    if (jsonParser.getCurrentToken() == JsonToken.FIELD_NAME && jsonParser.getCurrentName()
+                            .equals(TOTAL_SIZE)) {
+                        jsonParser.nextToken();
+                        totalSize.set(jsonParser.getValueAsString());
+                    } else if (jsonParser.getCurrentToken() == JsonToken.FIELD_NAME && jsonParser.getCurrentName()
+                            .equals(NEXT_RECORDS_URL)) {
+                        jsonParser.nextToken();
+                        nextRecordsUrl.set(jsonParser.getValueAsString());
+                    } else if (jsonParser.getCurrentToken() == JsonToken.FIELD_NAME && jsonParser.getCurrentName()
+                            .equals(RECORDS)) {
+                        jsonParser.nextToken();
+                        jsonGenerator.copyCurrentStructure(jsonParser);
+                    }

Review Comment:
   This is a good idea. I'd leave the fieldName parameter out for now as it's value is always JsonToken.FIELD_NAME. Thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] tpalfy commented on pull request #6794: NIFI-10966: Add option to QuerySalesforceObject to run custom query

Posted by "tpalfy (via GitHub)" <gi...@apache.org>.
tpalfy commented on PR #6794:
URL: https://github.com/apache/nifi/pull/6794#issuecomment-1443705766

   LGTM
   Thank you for your work @Lehel44 and your review @pvillard31 !
   Merged into main.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] tpalfy commented on a diff in pull request #6794: NIFI-10966: Add option to QuerySalesforceObject to run custom query

Posted by GitBox <gi...@apache.org>.
tpalfy commented on code in PR #6794:
URL: https://github.com/apache/nifi/pull/6794#discussion_r1069710817


##########
nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/QuerySalesforceObject.java:
##########
@@ -106,13 +113,54 @@
 @DefaultSchedule(strategy = SchedulingStrategy.TIMER_DRIVEN, period = "1 min")
 public class QuerySalesforceObject extends AbstractProcessor {
 
+    static final AllowableValue QUERY_PARAMETERS = new AllowableValue("query-parameters", "Query Parameters", "Provide query by parameters.");

Review Comment:
   To me this name was confusing at first. I think for most of us "query parameter" implies the idea of using parameters in prepared statement or even in an http request.
   
   Also it's not really parameters but properties isn't it? We are using the properties of the processor to build the query.
   
   With that in mind I'd rename this to PROPERTY_BASED_QUERY or something similar.
   
   BTW both query methods look very vulnerable to injection. Are they not?



##########
nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/QuerySalesforceObject.java:
##########
@@ -261,6 +323,30 @@ protected Collection<ValidationResult> customValidate(ValidationContext validati
 
     @Override
     public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+        boolean isCustomQuery = CUSTOM_QUERY.getValue().equals(context.getProperty(QUERY_TYPE).getValue());

Review Comment:
   This onTrigger is getting a little too big.
   
   I would at least move the 2 types of request-result handling logic to their own separate methods.



##########
nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/QuerySalesforceObject.java:
##########
@@ -261,6 +323,30 @@ protected Collection<ValidationResult> customValidate(ValidationContext validati
 
     @Override
     public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+        boolean isCustomQuery = CUSTOM_QUERY.getValue().equals(context.getProperty(QUERY_TYPE).getValue());
+        AtomicReference<String> nextRecordsUrl = new AtomicReference<>();
+
+        if (isCustomQuery) {
+            String customQuery = context.getProperty(CUSTOM_SOQL_QUERY).getValue();
+            do {
+                FlowFile flowFile = session.create();
+                Map<String, String> attributes = new HashMap<>();
+                try (InputStream response = getResultInputStream(nextRecordsUrl.get(), customQuery)) {
+                    flowFile = session.write(flowFile, parseHttpResponse(response, nextRecordsUrl));
+                    int recordCount = nextRecordsUrl.get() != null ? 2000 : Integer.parseInt(totalSize) % 2000;

Review Comment:
   Is there a guarantee that the number of records will always be 2000 in a batch when there are more than that much in total?
   
   In any case I'd make a constant out of it.



##########
nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/QuerySalesforceObject.java:
##########
@@ -194,10 +248,16 @@ public class QuerySalesforceObject extends AbstractProcessor {
     private static final String TIME_FORMAT = "HH:mm:ss.SSSX";
     private static final String DATE_TIME_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSSZZZZ";
     private static final String NEXT_RECORDS_URL = "nextRecordsUrl";
+    private static final String TOTAL_SIZE = "totalSize";
+    private static final String RECORDS = "records";
     private static final BiPredicate<String, String> CAPTURE_PREDICATE = (fieldName, fieldValue) -> NEXT_RECORDS_URL.equals(fieldName);
+    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+    private static final JsonFactory JSON_FACTORY = OBJECT_MAPPER.getFactory();
+    private static final String TOTAL_RECORD_COUNT = "total.record.count";
 
     private volatile SalesforceToRecordSchemaConverter salesForceToRecordSchemaConverter;
     private volatile SalesforceRestService salesforceRestService;
+    private volatile String totalSize;

Review Comment:
   I don't think this is a good idea. Not only it is not thread safe (although not sure if it makes much sense to run this processor on multiple threads) but a running method to store intermediate state in fields is a bad design in general.
   
   The crux of the issue is that the new `parseHttpResponse` method needs to return 3 entirely different things. The output stream callback, this total size and also the next record url.
   If we really don't want to create a POJO for those 3, we could use an Atomic or other wrapper to have the total size in an in-out parameter (similar to how the next record url is handled already).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] tpalfy commented on a diff in pull request #6794: NIFI-10966: Add option to QuerySalesforceObject to run custom query

Posted by "tpalfy (via GitHub)" <gi...@apache.org>.
tpalfy commented on code in PR #6794:
URL: https://github.com/apache/nifi/pull/6794#discussion_r1115841881


##########
nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/QuerySalesforceObject.java:
##########
@@ -387,11 +461,78 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
         } while (nextRecordsUrl.get() != null);
     }
 
-    private InputStream getResultInputStream(AtomicReference<String> nextRecordsUrl, String querySObject) {
-        if (nextRecordsUrl.get() == null) {
+    private void processCustomQuery(ProcessContext context, ProcessSession session, FlowFile originalFlowFile) {
+        String customQuery = context.getProperty(CUSTOM_SOQL_QUERY).evaluateAttributeExpressions(originalFlowFile).getValue();
+        AtomicReference<String> nextRecordsUrl = new AtomicReference<>();
+        AtomicReference<String> totalSize = new AtomicReference<>();
+        boolean isOriginalTransferred = false;
+        List<FlowFile> flowFiles = new ArrayList<>();
+        do {
+            FlowFile outgoingFlowFile;
+            if (originalFlowFile != null) {
+                outgoingFlowFile = session.create(originalFlowFile);
+            } else {
+                outgoingFlowFile = session.create();
+            }
+            Map<String, String> attributes = new HashMap<>();
+            try (InputStream response = getResultInputStream(nextRecordsUrl.get(), customQuery)) {
+                outgoingFlowFile = session.write(outgoingFlowFile, parseHttpResponse(response, nextRecordsUrl, totalSize));
+                int recordCount = nextRecordsUrl.get() != null ? 2000 : Integer.parseInt(totalSize.get()) % 2000;
+                attributes.put(CoreAttributes.MIME_TYPE.key(), "application/json");
+                attributes.put(TOTAL_RECORD_COUNT, String.valueOf(recordCount));
+                session.adjustCounter("Salesforce records processed", recordCount, false);
+                session.putAllAttributes(outgoingFlowFile, attributes);
+                flowFiles.add(outgoingFlowFile);
+            } catch (IOException e) {
+                throw new ProcessException("Couldn't get Salesforce records", e);
+            } catch (Exception e) {
+                if (originalFlowFile != null) {
+                    session.transfer(originalFlowFile, REL_FAILURE);
+                    isOriginalTransferred = true;
+                }
+                getLogger().error("Couldn't get Salesforce records", e);
+                session.remove(flowFiles);
+                flowFiles.clear();
+                break;
+            }
+        } while (nextRecordsUrl.get() != null);
+
+        if (!flowFiles.isEmpty()) {
+            session.transfer(flowFiles, REL_SUCCESS);
+        }
+        if (originalFlowFile != null && !isOriginalTransferred) {
+            session.transfer(originalFlowFile, REL_ORIGINAL);
+        }

Review Comment:
   An exception can occur between the creation of the `outgoingFlowFile` and it's addition to the `flowFiles` collection. (For example when OAuth fails.) When this happens we don't remove it from the session.
   Here's a suggested fix (contains some other minor changes): 
   ```suggestion
   String customQuery = context.getProperty(CUSTOM_SOQL_QUERY).evaluateAttributeExpressions(originalFlowFile).getValue();
           AtomicReference<String> nextRecordsUrl = new AtomicReference<>();
           AtomicReference<String> totalSize = new AtomicReference<>();
   
           boolean isOriginalTransferred = false;
   
           List<FlowFile> outgoingFlowFiles = new ArrayList<>();
           do {
               FlowFile outgoingFlowFile;
               try (InputStream response = getResultInputStream(nextRecordsUrl.get(), customQuery)) {
                   if (originalFlowFile != null) {
                       outgoingFlowFile = session.create(originalFlowFile);
                   } else {
                       outgoingFlowFile = session.create();
                   }
                   outgoingFlowFiles.add(outgoingFlowFile);
   
                   outgoingFlowFile = session.write(outgoingFlowFile, parseHttpResponse(response, nextRecordsUrl, totalSize));
   
                   int recordCount = nextRecordsUrl.get() != null ? 2000 : Integer.parseInt(totalSize.get()) % 2000;
   
                   Map<String, String> attributes = new HashMap<>();
                   attributes.put(CoreAttributes.MIME_TYPE.key(), "application/json");
                   attributes.put(TOTAL_RECORD_COUNT, String.valueOf(recordCount));
   
                   session.adjustCounter("Salesforce records processed", recordCount, false);
   
                   session.putAllAttributes(outgoingFlowFile, attributes);
               } catch (IOException e) {
                   throw new ProcessException("Couldn't get Salesforce records", e);
               } catch (Exception e) {
                   if (originalFlowFile != null) {
                       session.transfer(originalFlowFile, REL_FAILURE);
                       isOriginalTransferred = true;
                   }
                   getLogger().error("Couldn't get Salesforce records", e);
                   session.remove(outgoingFlowFiles);
                   outgoingFlowFiles.clear();
                   break;
               }
           } while (nextRecordsUrl.get() != null);
   
           if (!outgoingFlowFiles.isEmpty()) {
               session.transfer(outgoingFlowFiles, REL_SUCCESS);
           }
           if (originalFlowFile != null && !isOriginalTransferred) {
               session.transfer(originalFlowFile, REL_ORIGINAL);
           }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] asfgit closed pull request #6794: NIFI-10966: Add option to QuerySalesforceObject to run custom query

Posted by "asfgit (via GitHub)" <gi...@apache.org>.
asfgit closed pull request #6794: NIFI-10966: Add option to QuerySalesforceObject to run custom query
URL: https://github.com/apache/nifi/pull/6794


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] Lehel44 commented on a diff in pull request #6794: NIFI-10966: Add option to QuerySalesforceObject to run custom query

Posted by GitBox <gi...@apache.org>.
Lehel44 commented on code in PR #6794:
URL: https://github.com/apache/nifi/pull/6794#discussion_r1072970089


##########
nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/QuerySalesforceObject.java:
##########
@@ -106,13 +113,54 @@
 @DefaultSchedule(strategy = SchedulingStrategy.TIMER_DRIVEN, period = "1 min")
 public class QuerySalesforceObject extends AbstractProcessor {
 
+    static final AllowableValue QUERY_PARAMETERS = new AllowableValue("query-parameters", "Query Parameters", "Provide query by parameters.");

Review Comment:
   I agree with the naming. About the SQL Injection I found this in Salesforce docs. I think the room for injection is still there but Salesforce argues it's the user's responsibility by setting the right permissions for everything. What do you think?
   
   "SOQL INJECTION and APIs
   
   The REST and SOAP APIs allow end users to submit arbitrary SOQL strings. However, this does not lead to SOQL injection because the APIs include built in checks for sharing and CRUD/FLS permissions. This means that end users are only allowed to see or modify records and fields that they already have access to. On the other hand, when making SOQL calls in Apex Code, no CRUD/FLS checks are performed (and sharing checks are only performed if the 'with sharing' keyword is used). Therefore it is a serious security vulnerability to allow end users to control the contents of a SOQL query issued in Apex code, but not for end users to control the contents of a SOQL query via the API."



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] pvillard31 commented on pull request #6794: NIFI-10966: Add option to QuerySalesforceObject to run custom query

Posted by GitBox <gi...@apache.org>.
pvillard31 commented on PR #6794:
URL: https://github.com/apache/nifi/pull/6794#issuecomment-1397265977

   Hi - I've been playing with this and it works as expected (except that SOQL is super hard to understand...). One improvement I'd like to see is that the processor should accept input relationships (optional - a bit like ExecuteSQL) and the custom SOQL query should be able to reference flow file attributes. This would give me the option to potentially chain multiple instances together in a flow definition.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] Lehel44 commented on a diff in pull request #6794: NIFI-10966: Add option to QuerySalesforceObject to run custom query

Posted by GitBox <gi...@apache.org>.
Lehel44 commented on code in PR #6794:
URL: https://github.com/apache/nifi/pull/6794#discussion_r1072982453


##########
nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/QuerySalesforceObject.java:
##########
@@ -261,6 +323,30 @@ protected Collection<ValidationResult> customValidate(ValidationContext validati
 
     @Override
     public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+        boolean isCustomQuery = CUSTOM_QUERY.getValue().equals(context.getProperty(QUERY_TYPE).getValue());
+        AtomicReference<String> nextRecordsUrl = new AtomicReference<>();
+
+        if (isCustomQuery) {
+            String customQuery = context.getProperty(CUSTOM_SOQL_QUERY).getValue();
+            do {
+                FlowFile flowFile = session.create();
+                Map<String, String> attributes = new HashMap<>();
+                try (InputStream response = getResultInputStream(nextRecordsUrl.get(), customQuery)) {
+                    flowFile = session.write(flowFile, parseHttpResponse(response, nextRecordsUrl));
+                    int recordCount = nextRecordsUrl.get() != null ? 2000 : Integer.parseInt(totalSize) % 2000;

Review Comment:
   Yes, 2000 is the maximum number of records.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] pvillard31 commented on pull request #6794: NIFI-10966: Add option to QuerySalesforceObject to run custom query

Posted by "pvillard31 (via GitHub)" <gi...@apache.org>.
pvillard31 commented on PR #6794:
URL: https://github.com/apache/nifi/pull/6794#issuecomment-1433444998

   I've been playing with the latest version and this is working as expected. I'm a +1 from a feature usage point of view.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] tpalfy commented on a diff in pull request #6794: NIFI-10966: Add option to QuerySalesforceObject to run custom query

Posted by "tpalfy (via GitHub)" <gi...@apache.org>.
tpalfy commented on code in PR #6794:
URL: https://github.com/apache/nifi/pull/6794#discussion_r1108665694


##########
nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/QuerySalesforceObject.java:
##########
@@ -224,6 +282,8 @@ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
         return Collections.unmodifiableList(Arrays.asList(
                 API_URL,

Review Comment:
   When switching between PROPERTY_BASED and CUSTOM_QUERY the list of properties on the processor changes in a confusing manner because dependent and non-dependent properties are intermingled.
   
   Can we group them a bit better and move the non-dependent properties either before or after the dependent ones?
   See example below.
   
   Also CREATE_ZERO_RECORD_FILES doesn't work with CUSTOM_QUERY.
   
   ```java
                   API_URL,
                   API_VERSION,
                   QUERY_TYPE,
                   CUSTOM_SOQL_QUERY,
                   SOBJECT_NAME,
                   FIELD_NAMES,
                   RECORD_WRITER,
                   AGE_FIELD,
                   INITIAL_AGE_FILTER,
                   AGE_DELAY,
                   CUSTOM_WHERE_CONDITION,
                   READ_TIMEOUT,
                   CREATE_ZERO_RECORD_FILES,
                   TOKEN_PROVIDER
   ```



##########
nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/QuerySalesforceObject.java:
##########
@@ -106,13 +111,55 @@
 @DefaultSchedule(strategy = SchedulingStrategy.TIMER_DRIVEN, period = "1 min")
 public class QuerySalesforceObject extends AbstractProcessor {
 
+    static final AllowableValue PROPERTY_BASED_QUERY = new AllowableValue("property-based-query", "Property Based Query", "Provide query by properties.");
+    static final AllowableValue CUSTOM_QUERY = new AllowableValue("custom-query", "Custom Query", "Provide custom SOQL query.");
+
+    static final PropertyDescriptor API_URL = new PropertyDescriptor.Builder()

Review Comment:
   Why can't we use `API_URL` and `API_VERSION` from `CommonSalesforceProperties`?



##########
nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/QuerySalesforceObject.java:
##########
@@ -261,6 +321,23 @@ protected Collection<ValidationResult> customValidate(ValidationContext validati
 
     @Override
     public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+        boolean isCustomQuery = CUSTOM_QUERY.getValue().equals(context.getProperty(QUERY_TYPE).getValue());
+        AtomicReference<String> nextRecordsUrl = new AtomicReference<>();
+        AtomicReference<String> totalSize = new AtomicReference<>();

Review Comment:
   These `AtomicReference` objects are not used here. I don't think their creation should be the responsibility of `onTrigger`, rather that of the downstream methods.
   
   The `nextRecordsUrl` constructor would be explicitly called both in those methods but I wouldn't consider that a duplication. These being closer to the `do...while` block they are used in would help understand their purpose much better.



##########
nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/QuerySalesforceObject.java:
##########
@@ -387,11 +461,63 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
         } while (nextRecordsUrl.get() != null);
     }
 
-    private InputStream getResultInputStream(AtomicReference<String> nextRecordsUrl, String querySObject) {
-        if (nextRecordsUrl.get() == null) {
+    private void processCustomQuery(ProcessContext context, ProcessSession session, FlowFile flowFile, AtomicReference<String> nextRecordsUrl, AtomicReference<String> totalSize) {
+        String customQuery = context.getProperty(CUSTOM_SOQL_QUERY).evaluateAttributeExpressions(flowFile).getValue();
+        do {
+            FlowFile outgoingFlowFile;
+            if (flowFile != null) {
+                outgoingFlowFile = session.create(flowFile);
+            } else {
+                outgoingFlowFile = session.create();
+            }
+            Map<String, String> attributes = new HashMap<>();
+            try (InputStream response = getResultInputStream(nextRecordsUrl.get(), customQuery)) {
+                outgoingFlowFile = session.write(outgoingFlowFile, parseHttpResponse(response, nextRecordsUrl, totalSize));
+                int recordCount = nextRecordsUrl.get() != null ? 2000 : Integer.parseInt(totalSize.get()) % 2000;
+                attributes.put(CoreAttributes.MIME_TYPE.key(), "application/json");
+                attributes.put(TOTAL_RECORD_COUNT, String.valueOf(recordCount));
+                session.adjustCounter("Salesforce records processed", recordCount, false);
+                session.putAllAttributes(outgoingFlowFile, attributes);
+                session.transfer(outgoingFlowFile, REL_SUCCESS);
+            } catch (IOException e) {
+                session.remove(outgoingFlowFile);
+                throw new ProcessException("Couldn't get Salesforce records", e);
+            }
+        } while (nextRecordsUrl.get() != null);
+        if (flowFile != null) {
+            session.remove(flowFile);

Review Comment:
   Since we _can_ have incoming flowfiles, we should consider adding an ORIGINAL and a FAILURE relationship.



##########
nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/QuerySalesforceObject.java:
##########
@@ -387,11 +461,63 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
         } while (nextRecordsUrl.get() != null);
     }
 
-    private InputStream getResultInputStream(AtomicReference<String> nextRecordsUrl, String querySObject) {
-        if (nextRecordsUrl.get() == null) {
+    private void processCustomQuery(ProcessContext context, ProcessSession session, FlowFile flowFile, AtomicReference<String> nextRecordsUrl, AtomicReference<String> totalSize) {
+        String customQuery = context.getProperty(CUSTOM_SOQL_QUERY).evaluateAttributeExpressions(flowFile).getValue();
+        do {
+            FlowFile outgoingFlowFile;
+            if (flowFile != null) {
+                outgoingFlowFile = session.create(flowFile);
+            } else {
+                outgoingFlowFile = session.create();
+            }
+            Map<String, String> attributes = new HashMap<>();
+            try (InputStream response = getResultInputStream(nextRecordsUrl.get(), customQuery)) {
+                outgoingFlowFile = session.write(outgoingFlowFile, parseHttpResponse(response, nextRecordsUrl, totalSize));
+                int recordCount = nextRecordsUrl.get() != null ? 2000 : Integer.parseInt(totalSize.get()) % 2000;
+                attributes.put(CoreAttributes.MIME_TYPE.key(), "application/json");
+                attributes.put(TOTAL_RECORD_COUNT, String.valueOf(recordCount));
+                session.adjustCounter("Salesforce records processed", recordCount, false);
+                session.putAllAttributes(outgoingFlowFile, attributes);
+                session.transfer(outgoingFlowFile, REL_SUCCESS);
+            } catch (IOException e) {
+                session.remove(outgoingFlowFile);
+                throw new ProcessException("Couldn't get Salesforce records", e);
+            }
+        } while (nextRecordsUrl.get() != null);
+        if (flowFile != null) {
+            session.remove(flowFile);
+        }
+    }
+
+    private OutputStreamCallback parseHttpResponse(final InputStream in, AtomicReference<String> nextRecordsUrl, AtomicReference<String> totalSize) {
+        nextRecordsUrl.set(null);
+        return out -> {
+            try (JsonParser jsonParser = JSON_FACTORY.createParser(in);
+                 JsonGenerator jsonGenerator = JSON_FACTORY.createGenerator(out, JsonEncoding.UTF8)) {
+                while (jsonParser.nextToken() != null) {
+                    if (jsonParser.getCurrentToken() == JsonToken.FIELD_NAME && jsonParser.getCurrentName()
+                            .equals(TOTAL_SIZE)) {
+                        jsonParser.nextToken();
+                        totalSize.set(jsonParser.getValueAsString());
+                    } else if (jsonParser.getCurrentToken() == JsonToken.FIELD_NAME && jsonParser.getCurrentName()
+                            .equals(NEXT_RECORDS_URL)) {
+                        jsonParser.nextToken();
+                        nextRecordsUrl.set(jsonParser.getValueAsString());
+                    } else if (jsonParser.getCurrentToken() == JsonToken.FIELD_NAME && jsonParser.getCurrentName()
+                            .equals(RECORDS)) {
+                        jsonParser.nextToken();
+                        jsonGenerator.copyCurrentStructure(jsonParser);
+                    }

Review Comment:
   Bit of a code duplication here.
   I would consider something like
   ```suggestion
                   while (jsonParser.nextToken() != null) {
                       if (nextTokenIs(jsonParser, JsonToken.FIELD_NAME, TOTAL_SIZE)) {
                           totalSize.set(jsonParser.getValueAsString());
                       } else if (nextTokenIs(jsonParser, JsonToken.FIELD_NAME, NEXT_RECORDS_URL)) {
                           nextRecordsUrl.set(jsonParser.getValueAsString());
                       } else if (nextTokenIs(jsonParser, JsonToken.FIELD_NAME, RECORDS)) {
                           jsonGenerator.copyCurrentStructure(jsonParser);
                       }
   ```
   with
   ```java
       private boolean nextTokenIs(JsonParser jsonParser, JsonToken fieldName, String value) throws IOException {
           return jsonParser.getCurrentToken() == fieldName && jsonParser.getCurrentName()
                   .equals(value) && jsonParser.nextToken() != null;
       }
   ```
   I do something uncommon here because this boolean method not only checks something but also moves to the next token if the corresponding check evaluates to true. But this type of behaviour in parsers are usually acceptable.



##########
nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/QuerySalesforceObject.java:
##########
@@ -162,6 +212,7 @@ public class QuerySalesforceObject extends AbstractProcessor {
             .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
             .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
             .dependsOn(AGE_FIELD)
+            .dependsOn(QUERY_TYPE, PROPERTY_BASED_QUERY)

Review Comment:
   Currently CUSTOM_QUERY does not support incremental loading. That is probably intentional but in that case we should emphasize this in the `additionalDetails.html` page and maybe even in the `@CapabilityDescription`.



##########
nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/resources/docs/org.apache.nifi.processors.salesforce.QuerySalesforceObject/additionalDetails.html:
##########
@@ -34,8 +34,8 @@ <h3>Description</h3>
 
 <p>
     Objects in Salesforce are database tables, their rows are known as records, and their columns are called fields. The QuerySalesforceObject processor queries Salesforce objects and retrieves their records.
-    The processor constructs the query using SOQL (Salesforce Object Query Language) and retrieves the result record dataset using the Salesforce REST API.
-    The processor utilizes streams and NiFi record-based processing to be able to handle a large number of records and to allow arbitrary output format.
+    The processor constructs the query from parameters or executes a custom SOQL (Salesforce Object Query Language) query and retrieves the result record dataset using the Salesforce REST API.

Review Comment:
   ```suggestion
       The processor constructs the query from processor properties or executes a custom SOQL (Salesforce Object Query Language) query and retrieves the result record dataset using the Salesforce REST API.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] Lehel44 commented on a diff in pull request #6794: NIFI-10966: Add option to QuerySalesforceObject to run custom query

Posted by "Lehel44 (via GitHub)" <gi...@apache.org>.
Lehel44 commented on code in PR #6794:
URL: https://github.com/apache/nifi/pull/6794#discussion_r1113028736


##########
nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/QuerySalesforceObject.java:
##########
@@ -106,13 +111,55 @@
 @DefaultSchedule(strategy = SchedulingStrategy.TIMER_DRIVEN, period = "1 min")
 public class QuerySalesforceObject extends AbstractProcessor {
 
+    static final AllowableValue PROPERTY_BASED_QUERY = new AllowableValue("property-based-query", "Property Based Query", "Provide query by properties.");
+    static final AllowableValue CUSTOM_QUERY = new AllowableValue("custom-query", "Custom Query", "Provide custom SOQL query.");
+
+    static final PropertyDescriptor API_URL = new PropertyDescriptor.Builder()

Review Comment:
   We can use them, thanks for pointing out.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org