You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by GitBox <gi...@apache.org> on 2022/09/08 20:32:23 UTC

[GitHub] [nifi] Lehel44 opened a new pull request, #6379: NIFI-10463: Fix GetHubSpot incremental loading

Lehel44 opened a new pull request, #6379:
URL: https://github.com/apache/nifi/pull/6379

   <!-- Licensed to the Apache Software Foundation (ASF) under one or more -->
   <!-- contributor license agreements.  See the NOTICE file distributed with -->
   <!-- this work for additional information regarding copyright ownership. -->
   <!-- The ASF licenses this file to You under the Apache License, Version 2.0 -->
   <!-- (the "License"); you may not use this file except in compliance with -->
   <!-- the License.  You may obtain a copy of the License at -->
   <!--     http://www.apache.org/licenses/LICENSE-2.0 -->
   <!-- Unless required by applicable law or agreed to in writing, software -->
   <!-- distributed under the License is distributed on an "AS IS" BASIS, -->
   <!-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -->
   <!-- See the License for the specific language governing permissions and -->
   <!-- limitations under the License. -->
   
   # Summary
   
   [NIFI-10463](https://issues.apache.org/jira/browse/NIFI-10463)
   
   # Tracking
   
   Please complete the following tracking steps prior to pull request creation.
   
   ### Issue Tracking
   
   - [ ] [Apache NiFi Jira](https://issues.apache.org/jira/browse/NIFI-10463) issue created
   
   ### Pull Request Tracking
   
   - [ ] Pull Request title starts with Apache NiFi Jira issue number, such as `NIFI-00000`
   - [ ] Pull Request commit message starts with Apache NiFi Jira issue number, as such `NIFI-00000`
   
   ### Pull Request Formatting
   
   - [ ] Pull Request based on current revision of the `main` branch
   - [ ] Pull Request refers to a feature branch with one commit containing changes
   
   # Verification
   
   Please indicate the verification steps performed prior to pull request creation.
   
   ### Build
   
   - [ ] Build completed using `mvn clean install -P contrib-check`
     - [ ] JDK 8
     - [ ] JDK 11
     - [ ] JDK 17
   
   ### Licensing
   
   - [ ] New dependencies are compatible with the [Apache License 2.0](https://apache.org/licenses/LICENSE-2.0) according to the [License Policy](https://www.apache.org/legal/resolved.html)
   - [ ] New dependencies are documented in applicable `LICENSE` and `NOTICE` files
   
   ### Documentation
   
   - [ ] Documentation formatting appears as expected in rendered files
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] Lehel44 commented on a diff in pull request #6379: NIFI-10463: Fix GetHubSpot incremental loading

Posted by GitBox <gi...@apache.org>.
Lehel44 commented on code in PR #6379:
URL: https://github.com/apache/nifi/pull/6379#discussion_r978444386


##########
nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-processors/src/main/java/org/apache/nifi/processors/hubspot/GetHubSpot.java:
##########
@@ -187,61 +243,122 @@ private String getResponseBodyAsString(final ProcessContext context, final HttpR
         }
     }
 
-    private OutputStreamCallback parseHttpResponse(ProcessContext context, String endpoint, StateMap state, HttpResponseEntity response, AtomicInteger objectCountHolder) {
+    private OutputStreamCallback parseHttpResponse(final ProcessContext context, final HttpResponseEntity response, final AtomicInteger total,
+                                                   final Map<String, String> stateMap) {
         return out -> {
             try (final JsonParser jsonParser = JSON_FACTORY.createParser(response.body());
                  final JsonGenerator jsonGenerator = JSON_FACTORY.createGenerator(out, JsonEncoding.UTF8)) {
+                boolean isCursorAvailable = false;
+                final String objectType = context.getProperty(OBJECT_TYPE).getValue();
+                final String cursorKey = String.format(CURSOR_KEY_PATTERN, objectType);
                 while (jsonParser.nextToken() != null) {
+                    if (jsonParser.getCurrentToken() == JsonToken.FIELD_NAME && jsonParser.getCurrentName()
+                            .equals("total")) {
+                        jsonParser.nextToken();
+                        total.set(jsonParser.getIntValue());
+                    }
                     if (jsonParser.getCurrentToken() == JsonToken.FIELD_NAME && jsonParser.getCurrentName()
                             .equals("results")) {
                         jsonParser.nextToken();
                         jsonGenerator.copyCurrentStructure(jsonParser);
-                        objectCountHolder.incrementAndGet();
                     }
                     final String fieldName = jsonParser.getCurrentName();
-                    if (CURSOR_PARAMETER.equals(fieldName)) {
+                    if (PAGING_CURSOR.equals(fieldName)) {
+                        isCursorAvailable = true;
                         jsonParser.nextToken();
-                        Map<String, String> newStateMap = new HashMap<>(state.toMap());
-                        newStateMap.put(endpoint, jsonParser.getText());
-                        updateState(context, newStateMap);
+                        stateMap.put(cursorKey, jsonParser.getText());
                         break;
                     }
                 }
+                if (!isCursorAvailable) {
+                    stateMap.put(cursorKey, NO_PAGING);
+                }
             }
         };
     }
 
-    HttpUriBuilder getBaseUri(final ProcessContext context) {
+    URI getBaseUri(final ProcessContext context) {
         final String path = context.getProperty(OBJECT_TYPE).getValue();
         return webClientServiceProvider.getHttpUriBuilder()
                 .scheme(HTTPS)
                 .host(API_BASE_URI)
-                .encodedPath(path);
+                .encodedPath(path + "/search")
+                .build();
     }
 
-    private HttpResponseEntity getHttpResponseEntity(final String accessToken, final URI uri) {
+    private HttpResponseEntity getHttpResponseEntity(final String accessToken, final URI uri, final String filters) {
+        final JsonInputStreamConverter converter = new JsonInputStreamConverter(filters);
         return webClientServiceProvider.getWebClientService()
-                .get()
+                .post()
                 .uri(uri)
                 .header("Authorization", "Bearer " + accessToken)
+                .header("Content-Type", "application/json")
+                .body(converter.getInputStream(), OptionalLong.of(converter.getByteSize()))
                 .retrieve();
     }
 
-    private URI createUri(final ProcessContext context, final StateMap state) {
-        final String path = context.getProperty(OBJECT_TYPE).getValue();
-        final HttpUriBuilder uriBuilder = getBaseUri(context);
+    String createIncrementalFilters(final ProcessContext context, final Map<String, String> stateMap) {
+        final String limit = context.getProperty(RESULT_LIMIT).getValue();
+        final String objectType = context.getProperty(OBJECT_TYPE).getValue();
+        final HubSpotObjectType hubSpotObjectType = objectTypeLookupMap.get(objectType);
+        final Long incrDelayMs = context.getProperty(INCREMENTAL_DELAY).asTimePeriod(TimeUnit.MILLISECONDS);
+        final String startIncrementalKey = String.format("start: %s", objectType);
+        final String endIncrementalKey = String.format("end: %s", objectType);

Review Comment:
   I think it's necessary because the user can query another object and then return to the previous object and overwrite the state.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] Lehel44 commented on a diff in pull request #6379: NIFI-10463: Fix GetHubSpot incremental loading

Posted by GitBox <gi...@apache.org>.
Lehel44 commented on code in PR #6379:
URL: https://github.com/apache/nifi/pull/6379#discussion_r978438425


##########
nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-processors/src/main/java/org/apache/nifi/processors/hubspot/GetHubSpot.java:
##########
@@ -187,61 +243,122 @@ private String getResponseBodyAsString(final ProcessContext context, final HttpR
         }
     }
 
-    private OutputStreamCallback parseHttpResponse(ProcessContext context, String endpoint, StateMap state, HttpResponseEntity response, AtomicInteger objectCountHolder) {
+    private OutputStreamCallback parseHttpResponse(final ProcessContext context, final HttpResponseEntity response, final AtomicInteger total,
+                                                   final Map<String, String> stateMap) {
         return out -> {
             try (final JsonParser jsonParser = JSON_FACTORY.createParser(response.body());
                  final JsonGenerator jsonGenerator = JSON_FACTORY.createGenerator(out, JsonEncoding.UTF8)) {
+                boolean isCursorAvailable = false;
+                final String objectType = context.getProperty(OBJECT_TYPE).getValue();
+                final String cursorKey = String.format(CURSOR_KEY_PATTERN, objectType);
                 while (jsonParser.nextToken() != null) {
+                    if (jsonParser.getCurrentToken() == JsonToken.FIELD_NAME && jsonParser.getCurrentName()
+                            .equals("total")) {
+                        jsonParser.nextToken();
+                        total.set(jsonParser.getIntValue());
+                    }
                     if (jsonParser.getCurrentToken() == JsonToken.FIELD_NAME && jsonParser.getCurrentName()
                             .equals("results")) {
                         jsonParser.nextToken();
                         jsonGenerator.copyCurrentStructure(jsonParser);
-                        objectCountHolder.incrementAndGet();
                     }
                     final String fieldName = jsonParser.getCurrentName();
-                    if (CURSOR_PARAMETER.equals(fieldName)) {
+                    if (PAGING_CURSOR.equals(fieldName)) {
+                        isCursorAvailable = true;
                         jsonParser.nextToken();
-                        Map<String, String> newStateMap = new HashMap<>(state.toMap());
-                        newStateMap.put(endpoint, jsonParser.getText());
-                        updateState(context, newStateMap);
+                        stateMap.put(cursorKey, jsonParser.getText());
                         break;
                     }
                 }
+                if (!isCursorAvailable) {
+                    stateMap.put(cursorKey, NO_PAGING);
+                }
             }
         };
     }
 
-    HttpUriBuilder getBaseUri(final ProcessContext context) {
+    URI getBaseUri(final ProcessContext context) {
         final String path = context.getProperty(OBJECT_TYPE).getValue();
         return webClientServiceProvider.getHttpUriBuilder()
                 .scheme(HTTPS)
                 .host(API_BASE_URI)
-                .encodedPath(path);
+                .encodedPath(path + "/search")
+                .build();
     }
 
-    private HttpResponseEntity getHttpResponseEntity(final String accessToken, final URI uri) {
+    private HttpResponseEntity getHttpResponseEntity(final String accessToken, final URI uri, final String filters) {
+        final JsonInputStreamConverter converter = new JsonInputStreamConverter(filters);
         return webClientServiceProvider.getWebClientService()
-                .get()
+                .post()
                 .uri(uri)
                 .header("Authorization", "Bearer " + accessToken)
+                .header("Content-Type", "application/json")
+                .body(converter.getInputStream(), OptionalLong.of(converter.getByteSize()))
                 .retrieve();
     }
 
-    private URI createUri(final ProcessContext context, final StateMap state) {
-        final String path = context.getProperty(OBJECT_TYPE).getValue();
-        final HttpUriBuilder uriBuilder = getBaseUri(context);
+    String createIncrementalFilters(final ProcessContext context, final Map<String, String> stateMap) {
+        final String limit = context.getProperty(RESULT_LIMIT).getValue();
+        final String objectType = context.getProperty(OBJECT_TYPE).getValue();
+        final HubSpotObjectType hubSpotObjectType = objectTypeLookupMap.get(objectType);
+        final Long incrDelayMs = context.getProperty(INCREMENTAL_DELAY).asTimePeriod(TimeUnit.MILLISECONDS);
+        final String startIncrementalKey = String.format("start: %s", objectType);
+        final String endIncrementalKey = String.format("end: %s", objectType);
+        final String cursorKey = String.format(CURSOR_KEY_PATTERN, objectType);
+
+        final ObjectNode root = OBJECT_MAPPER.createObjectNode();
+        root.put("limit", limit);
 
-        final boolean isLimitSet = context.getProperty(RESULT_LIMIT).evaluateAttributeExpressions().isSet();
-        if (isLimitSet) {
-            final String limit = context.getProperty(RESULT_LIMIT).getValue();
-            uriBuilder.addQueryParameter(LIMIT_PARAMETER, limit);
+        final String cursor = stateMap.get(cursorKey);
+        if (cursor != null && !NO_PAGING.equals(cursor)) {
+            root.put(PAGING_CURSOR, stateMap.get(cursorKey));
         }
+        final boolean isIncremental = context.getProperty(IS_INCREMENTAL).asBoolean();
+        if (isIncremental) {
+
+            final String hubspotSpecificIncrementalFieldName = hubSpotObjectType.getLastModifiedDateType().getValue();
+            final String lastStartTime = stateMap.get(startIncrementalKey);
+            final String lastEndTime = stateMap.getOrDefault(endIncrementalKey, context.getProperty(INITIAL_INCREMENTAL_FILTER).getValue());
+
+            String currentStartTime;
+            String currentEndTime;
+
+            if (cursor != null && !NO_PAGING.equals(cursor)) {
+                currentStartTime = lastStartTime;
+                // lastEndTime can be null if incremental loading was turned off beforehand
+                currentEndTime = lastEndTime != null ? lastEndTime : String.valueOf(getCurrentEpochTime());
+            } else {
+                currentStartTime = lastEndTime;
+                final long delayedCurrentEndTime = incrDelayMs != null ? getCurrentEpochTime() - incrDelayMs : getCurrentEpochTime();
+                currentEndTime = String.valueOf(delayedCurrentEndTime);
 
-        final String cursor = state.get(path);
-        if (cursor != null) {
-            uriBuilder.addQueryParameter(CURSOR_PARAMETER, cursor);
+                stateMap.put(startIncrementalKey, currentStartTime);
+                stateMap.put(endIncrementalKey, currentEndTime);
+            }
+
+            final ArrayNode filters = OBJECT_MAPPER.createArrayNode();
+
+            if (currentStartTime != null) {
+                final ObjectNode greaterThanFilterNode = OBJECT_MAPPER.createObjectNode();
+                greaterThanFilterNode.put("propertyName", hubspotSpecificIncrementalFieldName);
+                greaterThanFilterNode.put("operator", "GT");

Review Comment:
   Nice catch! Thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] Lehel44 commented on a diff in pull request #6379: NIFI-10463: Fix GetHubSpot incremental loading

Posted by GitBox <gi...@apache.org>.
Lehel44 commented on code in PR #6379:
URL: https://github.com/apache/nifi/pull/6379#discussion_r979479849


##########
nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-processors/src/main/java/org/apache/nifi/processors/hubspot/GetHubSpot.java:
##########
@@ -187,78 +257,158 @@ private String getResponseBodyAsString(final ProcessContext context, final HttpR
         }
     }
 
-    private OutputStreamCallback parseHttpResponse(ProcessContext context, String endpoint, StateMap state, HttpResponseEntity response, AtomicInteger objectCountHolder) {
+    private OutputStreamCallback parseHttpResponse(final ProcessContext context, final HttpResponseEntity response, final AtomicInteger total,
+                                                   final Map<String, String> stateMap) {
         return out -> {
             try (final JsonParser jsonParser = JSON_FACTORY.createParser(response.body());
                  final JsonGenerator jsonGenerator = JSON_FACTORY.createGenerator(out, JsonEncoding.UTF8)) {
+                boolean isCursorAvailable = false;
+                final String objectType = context.getProperty(OBJECT_TYPE).getValue();
+                final String cursorKey = String.format(CURSOR_KEY_PATTERN, objectType);
                 while (jsonParser.nextToken() != null) {
+                    if (jsonParser.getCurrentToken() == JsonToken.FIELD_NAME && jsonParser.getCurrentName()
+                            .equals("total")) {
+                        jsonParser.nextToken();
+                        total.set(jsonParser.getIntValue());
+                    }
                     if (jsonParser.getCurrentToken() == JsonToken.FIELD_NAME && jsonParser.getCurrentName()
                             .equals("results")) {
                         jsonParser.nextToken();
                         jsonGenerator.copyCurrentStructure(jsonParser);
-                        objectCountHolder.incrementAndGet();
                     }
                     final String fieldName = jsonParser.getCurrentName();
-                    if (CURSOR_PARAMETER.equals(fieldName)) {
+                    if (PAGING_CURSOR.equals(fieldName)) {
+                        isCursorAvailable = true;
                         jsonParser.nextToken();
-                        Map<String, String> newStateMap = new HashMap<>(state.toMap());
-                        newStateMap.put(endpoint, jsonParser.getText());
-                        updateState(context, newStateMap);
+                        stateMap.put(cursorKey, jsonParser.getText());
                         break;
                     }
                 }
+                if (!isCursorAvailable) {
+                    stateMap.put(cursorKey, NO_PAGING);
+                }
             }
         };
     }
 
-    HttpUriBuilder getBaseUri(final ProcessContext context) {
+    URI getBaseUri(final ProcessContext context) {
         final String path = context.getProperty(OBJECT_TYPE).getValue();
         return webClientServiceProvider.getHttpUriBuilder()
                 .scheme(HTTPS)
                 .host(API_BASE_URI)
-                .encodedPath(path);
+                .encodedPath(path + "/search")
+                .build();
     }
 
-    private HttpResponseEntity getHttpResponseEntity(final String accessToken, final URI uri) {
-        return webClientServiceProvider.getWebClientService()
-                .get()
-                .uri(uri)
-                .header("Authorization", "Bearer " + accessToken)
-                .retrieve();
+    private HttpResponseEntity getHttpResponseEntity(final String accessToken, final URI uri, final String filters) {
+        final InputStream inputStream = IOUtils.toInputStream(filters, StandardCharsets.UTF_8);
+        try {
+            return webClientServiceProvider.getWebClientService()
+                    .post()
+                    .uri(uri)
+                    .header("Authorization", "Bearer " + accessToken)
+                    .header("Content-Type", "application/json")
+                    .body(inputStream, OptionalLong.of(inputStream.available()))
+                    .retrieve();
+        } catch (IOException e) {
+            throw new ProcessException("Could not transform incremental filters to input stream", e);
+        }
     }
 
-    private URI createUri(final ProcessContext context, final StateMap state) {
-        final String path = context.getProperty(OBJECT_TYPE).getValue();
-        final HttpUriBuilder uriBuilder = getBaseUri(context);
+    private String createIncrementalFilters(final ProcessContext context, final Map<String, String> stateMap) {
+        final String limit = context.getProperty(RESULT_LIMIT).getValue();
+        final String objectType = context.getProperty(OBJECT_TYPE).getValue();
+        final HubSpotObjectType hubSpotObjectType = OBJECT_TYPE_LOOKUP_MAP.get(objectType);
+        final Long incrDelayMs = context.getProperty(INCREMENTAL_DELAY).asTimePeriod(TimeUnit.MILLISECONDS);
+        final String cursorKey = String.format(CURSOR_KEY_PATTERN, objectType);
 
-        final boolean isLimitSet = context.getProperty(RESULT_LIMIT).evaluateAttributeExpressions().isSet();
-        if (isLimitSet) {
-            final String limit = context.getProperty(RESULT_LIMIT).getValue();
-            uriBuilder.addQueryParameter(LIMIT_PARAMETER, limit);
+        final ObjectNode root = OBJECT_MAPPER.createObjectNode();
+        root.put("limit", limit);
+
+        final String cursor = stateMap.get(cursorKey);
+        if (cursor != null && !NO_PAGING.equals(cursor)) {
+            root.put(PAGING_CURSOR, stateMap.get(cursorKey));
         }
+        final boolean isIncremental = context.getProperty(IS_INCREMENTAL).asBoolean();
+        if (isIncremental) {
+            final String initialStartTimeValue = context.getProperty(INCREMENTAL_INITIAL_START_TIME).getValue();
+            String initialStartTimeEpoch = getInitialStartTimeEpoch(initialStartTimeValue);
+            final String hubspotSpecificIncrementalFieldName = hubSpotObjectType.getLastModifiedDateType().getValue();
+            final String lastStartTime = stateMap.get(START_INCREMENTAL_KEY);
+            final String lastEndTime = stateMap.getOrDefault(END_INCREMENTAL_KEY, initialStartTimeEpoch);
+
+            final String currentStartTime;
+            final String currentEndTime;
+
+            if (cursor != null && !NO_PAGING.equals(cursor)) {
+                currentStartTime = lastStartTime;
+                // lastEndTime can be null if incremental loading was turned off beforehand
+                currentEndTime = lastEndTime != null ? lastEndTime : String.valueOf(getCurrentEpochTime());
+            } else {
+                currentStartTime = lastEndTime;
+                final long delayedCurrentEndTime = incrDelayMs != null ? getCurrentEpochTime() - incrDelayMs : getCurrentEpochTime();
+                currentEndTime = String.valueOf(delayedCurrentEndTime);
+
+                stateMap.put(START_INCREMENTAL_KEY, currentStartTime);
+                stateMap.put(END_INCREMENTAL_KEY, currentEndTime);
+            }
+
+            final ArrayNode filters = OBJECT_MAPPER.createArrayNode();
 
-        final String cursor = state.get(path);
-        if (cursor != null) {
-            uriBuilder.addQueryParameter(CURSOR_PARAMETER, cursor);
+            if (currentStartTime != null) {
+                final ObjectNode greaterThanFilterNode = OBJECT_MAPPER.createObjectNode();
+                greaterThanFilterNode.put("propertyName", hubspotSpecificIncrementalFieldName);
+                greaterThanFilterNode.put("operator", "GTE");
+                greaterThanFilterNode.put("value", currentStartTime);
+                filters.add(greaterThanFilterNode);
+            }
+
+            final ObjectNode lessThanFilterNode = OBJECT_MAPPER.createObjectNode();
+            lessThanFilterNode.put("propertyName", hubspotSpecificIncrementalFieldName);
+            lessThanFilterNode.put("operator", "LT");
+            lessThanFilterNode.put("value", currentEndTime);
+            filters.add(lessThanFilterNode);
+
+            root.set("filters", filters);
         }
-        return uriBuilder.build();
+        return root.toString();
     }
 
-    private StateMap getStateMap(final ProcessContext context) {
+    private String getInitialStartTimeEpoch(String initialStartTimeValue) {
+        if (initialStartTimeValue != null) {
+            final TemporalAccessor initialDateTime = DateTimeFormatter.ISO_DATE_TIME.parse(initialStartTimeValue);
+            return String.valueOf(Instant.from(initialDateTime).toEpochMilli());
+        }
+        return null;
+    }
+
+    long getCurrentEpochTime() {
+        return Instant.now().toEpochMilli();
+    }
+
+    private Map<String, String> getStateMap(final ProcessSession session) {
         final StateMap stateMap;
         try {
-            stateMap = context.getStateManager().getState(Scope.CLUSTER);
+            stateMap = session.getState(Scope.CLUSTER);
         } catch (IOException e) {
             throw new ProcessException("State retrieval failed", e);
         }
-        return stateMap;
+        return new HashMap<>(stateMap.toMap());
     }
 
-    private void updateState(ProcessContext context, Map<String, String> newState) {
+    private void updateState(ProcessSession session, Map<String, String> newState) {
         try {
-            context.getStateManager().setState(newState, Scope.CLUSTER);
+            session.setState(newState, Scope.CLUSTER);
         } catch (IOException e) {
             throw new ProcessException("Page cursor update failed", e);
         }
     }
+
+    private void clearState(ProcessSession session) {
+        try {
+            session.clearState(Scope.CLUSTER);

Review Comment:
   It removes the items when the session gets commited. I think that's why the clearState in the beginning of the onTrigger won't work. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] turcsanyip commented on a diff in pull request #6379: NIFI-10463: Fix GetHubSpot incremental loading

Posted by GitBox <gi...@apache.org>.
turcsanyip commented on code in PR #6379:
URL: https://github.com/apache/nifi/pull/6379#discussion_r979460593


##########
nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-processors/src/main/java/org/apache/nifi/processors/hubspot/GetHubSpot.java:
##########
@@ -187,78 +257,158 @@ private String getResponseBodyAsString(final ProcessContext context, final HttpR
         }
     }
 
-    private OutputStreamCallback parseHttpResponse(ProcessContext context, String endpoint, StateMap state, HttpResponseEntity response, AtomicInteger objectCountHolder) {
+    private OutputStreamCallback parseHttpResponse(final ProcessContext context, final HttpResponseEntity response, final AtomicInteger total,
+                                                   final Map<String, String> stateMap) {
         return out -> {
             try (final JsonParser jsonParser = JSON_FACTORY.createParser(response.body());
                  final JsonGenerator jsonGenerator = JSON_FACTORY.createGenerator(out, JsonEncoding.UTF8)) {
+                boolean isCursorAvailable = false;
+                final String objectType = context.getProperty(OBJECT_TYPE).getValue();
+                final String cursorKey = String.format(CURSOR_KEY_PATTERN, objectType);

Review Comment:
   Similar to the `time_window_*` state attributes, please remove the object type suffix here too.



##########
nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-processors/src/main/java/org/apache/nifi/processors/hubspot/GetHubSpot.java:
##########
@@ -187,78 +257,158 @@ private String getResponseBodyAsString(final ProcessContext context, final HttpR
         }
     }
 
-    private OutputStreamCallback parseHttpResponse(ProcessContext context, String endpoint, StateMap state, HttpResponseEntity response, AtomicInteger objectCountHolder) {
+    private OutputStreamCallback parseHttpResponse(final ProcessContext context, final HttpResponseEntity response, final AtomicInteger total,
+                                                   final Map<String, String> stateMap) {
         return out -> {
             try (final JsonParser jsonParser = JSON_FACTORY.createParser(response.body());
                  final JsonGenerator jsonGenerator = JSON_FACTORY.createGenerator(out, JsonEncoding.UTF8)) {
+                boolean isCursorAvailable = false;
+                final String objectType = context.getProperty(OBJECT_TYPE).getValue();
+                final String cursorKey = String.format(CURSOR_KEY_PATTERN, objectType);
                 while (jsonParser.nextToken() != null) {
+                    if (jsonParser.getCurrentToken() == JsonToken.FIELD_NAME && jsonParser.getCurrentName()
+                            .equals("total")) {
+                        jsonParser.nextToken();
+                        total.set(jsonParser.getIntValue());
+                    }
                     if (jsonParser.getCurrentToken() == JsonToken.FIELD_NAME && jsonParser.getCurrentName()
                             .equals("results")) {
                         jsonParser.nextToken();
                         jsonGenerator.copyCurrentStructure(jsonParser);
-                        objectCountHolder.incrementAndGet();
                     }
                     final String fieldName = jsonParser.getCurrentName();
-                    if (CURSOR_PARAMETER.equals(fieldName)) {
+                    if (PAGING_CURSOR.equals(fieldName)) {
+                        isCursorAvailable = true;
                         jsonParser.nextToken();
-                        Map<String, String> newStateMap = new HashMap<>(state.toMap());
-                        newStateMap.put(endpoint, jsonParser.getText());
-                        updateState(context, newStateMap);
+                        stateMap.put(cursorKey, jsonParser.getText());
                         break;
                     }
                 }
+                if (!isCursorAvailable) {
+                    stateMap.put(cursorKey, NO_PAGING);
+                }
             }
         };
     }
 
-    HttpUriBuilder getBaseUri(final ProcessContext context) {
+    URI getBaseUri(final ProcessContext context) {
         final String path = context.getProperty(OBJECT_TYPE).getValue();
         return webClientServiceProvider.getHttpUriBuilder()
                 .scheme(HTTPS)
                 .host(API_BASE_URI)
-                .encodedPath(path);
+                .encodedPath(path + "/search")
+                .build();
     }
 
-    private HttpResponseEntity getHttpResponseEntity(final String accessToken, final URI uri) {
-        return webClientServiceProvider.getWebClientService()
-                .get()
-                .uri(uri)
-                .header("Authorization", "Bearer " + accessToken)
-                .retrieve();
+    private HttpResponseEntity getHttpResponseEntity(final String accessToken, final URI uri, final String filters) {
+        final InputStream inputStream = IOUtils.toInputStream(filters, StandardCharsets.UTF_8);
+        try {
+            return webClientServiceProvider.getWebClientService()
+                    .post()
+                    .uri(uri)
+                    .header("Authorization", "Bearer " + accessToken)
+                    .header("Content-Type", "application/json")
+                    .body(inputStream, OptionalLong.of(inputStream.available()))
+                    .retrieve();
+        } catch (IOException e) {
+            throw new ProcessException("Could not transform incremental filters to input stream", e);
+        }
     }
 
-    private URI createUri(final ProcessContext context, final StateMap state) {
-        final String path = context.getProperty(OBJECT_TYPE).getValue();
-        final HttpUriBuilder uriBuilder = getBaseUri(context);
+    private String createIncrementalFilters(final ProcessContext context, final Map<String, String> stateMap) {
+        final String limit = context.getProperty(RESULT_LIMIT).getValue();
+        final String objectType = context.getProperty(OBJECT_TYPE).getValue();
+        final HubSpotObjectType hubSpotObjectType = OBJECT_TYPE_LOOKUP_MAP.get(objectType);
+        final Long incrDelayMs = context.getProperty(INCREMENTAL_DELAY).asTimePeriod(TimeUnit.MILLISECONDS);
+        final String cursorKey = String.format(CURSOR_KEY_PATTERN, objectType);
 
-        final boolean isLimitSet = context.getProperty(RESULT_LIMIT).evaluateAttributeExpressions().isSet();
-        if (isLimitSet) {
-            final String limit = context.getProperty(RESULT_LIMIT).getValue();
-            uriBuilder.addQueryParameter(LIMIT_PARAMETER, limit);
+        final ObjectNode root = OBJECT_MAPPER.createObjectNode();
+        root.put("limit", limit);
+
+        final String cursor = stateMap.get(cursorKey);
+        if (cursor != null && !NO_PAGING.equals(cursor)) {
+            root.put(PAGING_CURSOR, stateMap.get(cursorKey));
         }
+        final boolean isIncremental = context.getProperty(IS_INCREMENTAL).asBoolean();
+        if (isIncremental) {
+            final String initialStartTimeValue = context.getProperty(INCREMENTAL_INITIAL_START_TIME).getValue();
+            String initialStartTimeEpoch = getInitialStartTimeEpoch(initialStartTimeValue);
+            final String hubspotSpecificIncrementalFieldName = hubSpotObjectType.getLastModifiedDateType().getValue();
+            final String lastStartTime = stateMap.get(START_INCREMENTAL_KEY);
+            final String lastEndTime = stateMap.getOrDefault(END_INCREMENTAL_KEY, initialStartTimeEpoch);
+
+            final String currentStartTime;
+            final String currentEndTime;
+
+            if (cursor != null && !NO_PAGING.equals(cursor)) {
+                currentStartTime = lastStartTime;
+                // lastEndTime can be null if incremental loading was turned off beforehand
+                currentEndTime = lastEndTime != null ? lastEndTime : String.valueOf(getCurrentEpochTime());
+            } else {
+                currentStartTime = lastEndTime;
+                final long delayedCurrentEndTime = incrDelayMs != null ? getCurrentEpochTime() - incrDelayMs : getCurrentEpochTime();
+                currentEndTime = String.valueOf(delayedCurrentEndTime);
+
+                stateMap.put(START_INCREMENTAL_KEY, currentStartTime);
+                stateMap.put(END_INCREMENTAL_KEY, currentEndTime);
+            }
+
+            final ArrayNode filters = OBJECT_MAPPER.createArrayNode();
 
-        final String cursor = state.get(path);
-        if (cursor != null) {
-            uriBuilder.addQueryParameter(CURSOR_PARAMETER, cursor);
+            if (currentStartTime != null) {
+                final ObjectNode greaterThanFilterNode = OBJECT_MAPPER.createObjectNode();
+                greaterThanFilterNode.put("propertyName", hubspotSpecificIncrementalFieldName);
+                greaterThanFilterNode.put("operator", "GTE");
+                greaterThanFilterNode.put("value", currentStartTime);
+                filters.add(greaterThanFilterNode);
+            }
+
+            final ObjectNode lessThanFilterNode = OBJECT_MAPPER.createObjectNode();
+            lessThanFilterNode.put("propertyName", hubspotSpecificIncrementalFieldName);
+            lessThanFilterNode.put("operator", "LT");
+            lessThanFilterNode.put("value", currentEndTime);
+            filters.add(lessThanFilterNode);
+
+            root.set("filters", filters);
         }
-        return uriBuilder.build();
+        return root.toString();
     }
 
-    private StateMap getStateMap(final ProcessContext context) {
+    private String getInitialStartTimeEpoch(String initialStartTimeValue) {
+        if (initialStartTimeValue != null) {
+            final TemporalAccessor initialDateTime = DateTimeFormatter.ISO_DATE_TIME.parse(initialStartTimeValue);
+            return String.valueOf(Instant.from(initialDateTime).toEpochMilli());
+        }
+        return null;
+    }
+
+    long getCurrentEpochTime() {
+        return Instant.now().toEpochMilli();
+    }
+
+    private Map<String, String> getStateMap(final ProcessSession session) {
         final StateMap stateMap;
         try {
-            stateMap = context.getStateManager().getState(Scope.CLUSTER);
+            stateMap = session.getState(Scope.CLUSTER);
         } catch (IOException e) {
             throw new ProcessException("State retrieval failed", e);
         }
-        return stateMap;
+        return new HashMap<>(stateMap.toMap());
     }
 
-    private void updateState(ProcessContext context, Map<String, String> newState) {
+    private void updateState(ProcessSession session, Map<String, String> newState) {
         try {
-            context.getStateManager().setState(newState, Scope.CLUSTER);
+            session.setState(newState, Scope.CLUSTER);
         } catch (IOException e) {
             throw new ProcessException("Page cursor update failed", e);
         }
     }
+
+    private void clearState(ProcessSession session) {
+        try {
+            session.clearState(Scope.CLUSTER);

Review Comment:
   Not sure why, but it does not remove previous items from the state.
   `ProcessContext.getStateManager().clear()` might be a better option for clearing.



##########
nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-processors/src/main/java/org/apache/nifi/processors/hubspot/GetHubSpot.java:
##########
@@ -187,78 +257,158 @@ private String getResponseBodyAsString(final ProcessContext context, final HttpR
         }
     }
 
-    private OutputStreamCallback parseHttpResponse(ProcessContext context, String endpoint, StateMap state, HttpResponseEntity response, AtomicInteger objectCountHolder) {
+    private OutputStreamCallback parseHttpResponse(final ProcessContext context, final HttpResponseEntity response, final AtomicInteger total,
+                                                   final Map<String, String> stateMap) {
         return out -> {
             try (final JsonParser jsonParser = JSON_FACTORY.createParser(response.body());
                  final JsonGenerator jsonGenerator = JSON_FACTORY.createGenerator(out, JsonEncoding.UTF8)) {
+                boolean isCursorAvailable = false;
+                final String objectType = context.getProperty(OBJECT_TYPE).getValue();
+                final String cursorKey = String.format(CURSOR_KEY_PATTERN, objectType);
                 while (jsonParser.nextToken() != null) {
+                    if (jsonParser.getCurrentToken() == JsonToken.FIELD_NAME && jsonParser.getCurrentName()
+                            .equals("total")) {
+                        jsonParser.nextToken();
+                        total.set(jsonParser.getIntValue());
+                    }
                     if (jsonParser.getCurrentToken() == JsonToken.FIELD_NAME && jsonParser.getCurrentName()
                             .equals("results")) {
                         jsonParser.nextToken();
                         jsonGenerator.copyCurrentStructure(jsonParser);
-                        objectCountHolder.incrementAndGet();
                     }
                     final String fieldName = jsonParser.getCurrentName();
-                    if (CURSOR_PARAMETER.equals(fieldName)) {
+                    if (PAGING_CURSOR.equals(fieldName)) {
+                        isCursorAvailable = true;
                         jsonParser.nextToken();
-                        Map<String, String> newStateMap = new HashMap<>(state.toMap());
-                        newStateMap.put(endpoint, jsonParser.getText());
-                        updateState(context, newStateMap);
+                        stateMap.put(cursorKey, jsonParser.getText());
                         break;
                     }
                 }
+                if (!isCursorAvailable) {
+                    stateMap.put(cursorKey, NO_PAGING);
+                }
             }
         };
     }
 
-    HttpUriBuilder getBaseUri(final ProcessContext context) {
+    URI getBaseUri(final ProcessContext context) {
         final String path = context.getProperty(OBJECT_TYPE).getValue();
         return webClientServiceProvider.getHttpUriBuilder()
                 .scheme(HTTPS)
                 .host(API_BASE_URI)
-                .encodedPath(path);
+                .encodedPath(path + "/search")
+                .build();
     }
 
-    private HttpResponseEntity getHttpResponseEntity(final String accessToken, final URI uri) {
-        return webClientServiceProvider.getWebClientService()
-                .get()
-                .uri(uri)
-                .header("Authorization", "Bearer " + accessToken)
-                .retrieve();
+    private HttpResponseEntity getHttpResponseEntity(final String accessToken, final URI uri, final String filters) {
+        final InputStream inputStream = IOUtils.toInputStream(filters, StandardCharsets.UTF_8);
+        try {
+            return webClientServiceProvider.getWebClientService()
+                    .post()
+                    .uri(uri)
+                    .header("Authorization", "Bearer " + accessToken)
+                    .header("Content-Type", "application/json")
+                    .body(inputStream, OptionalLong.of(inputStream.available()))
+                    .retrieve();
+        } catch (IOException e) {
+            throw new ProcessException("Could not transform incremental filters to input stream", e);
+        }
     }
 
-    private URI createUri(final ProcessContext context, final StateMap state) {
-        final String path = context.getProperty(OBJECT_TYPE).getValue();
-        final HttpUriBuilder uriBuilder = getBaseUri(context);
+    private String createIncrementalFilters(final ProcessContext context, final Map<String, String> stateMap) {
+        final String limit = context.getProperty(RESULT_LIMIT).getValue();
+        final String objectType = context.getProperty(OBJECT_TYPE).getValue();
+        final HubSpotObjectType hubSpotObjectType = OBJECT_TYPE_LOOKUP_MAP.get(objectType);
+        final Long incrDelayMs = context.getProperty(INCREMENTAL_DELAY).asTimePeriod(TimeUnit.MILLISECONDS);
+        final String cursorKey = String.format(CURSOR_KEY_PATTERN, objectType);
 
-        final boolean isLimitSet = context.getProperty(RESULT_LIMIT).evaluateAttributeExpressions().isSet();
-        if (isLimitSet) {
-            final String limit = context.getProperty(RESULT_LIMIT).getValue();
-            uriBuilder.addQueryParameter(LIMIT_PARAMETER, limit);
+        final ObjectNode root = OBJECT_MAPPER.createObjectNode();
+        root.put("limit", limit);
+
+        final String cursor = stateMap.get(cursorKey);
+        if (cursor != null && !NO_PAGING.equals(cursor)) {
+            root.put(PAGING_CURSOR, stateMap.get(cursorKey));
         }
+        final boolean isIncremental = context.getProperty(IS_INCREMENTAL).asBoolean();
+        if (isIncremental) {
+            final String initialStartTimeValue = context.getProperty(INCREMENTAL_INITIAL_START_TIME).getValue();
+            String initialStartTimeEpoch = getInitialStartTimeEpoch(initialStartTimeValue);
+            final String hubspotSpecificIncrementalFieldName = hubSpotObjectType.getLastModifiedDateType().getValue();
+            final String lastStartTime = stateMap.get(START_INCREMENTAL_KEY);
+            final String lastEndTime = stateMap.getOrDefault(END_INCREMENTAL_KEY, initialStartTimeEpoch);
+
+            final String currentStartTime;
+            final String currentEndTime;
+
+            if (cursor != null && !NO_PAGING.equals(cursor)) {
+                currentStartTime = lastStartTime;
+                // lastEndTime can be null if incremental loading was turned off beforehand
+                currentEndTime = lastEndTime != null ? lastEndTime : String.valueOf(getCurrentEpochTime());

Review Comment:
   I think it is better to avoid this situation and the state should also be reset when the user changes `Incremental Loading` (similar to` Object Type`).
   In that case `currentEndTime = lastEndTime` would be enough because `lastEndTime` must have a value when the paging cursor is set.
   Otherwise `Incremental Delay` should be applied here too.



##########
nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-processors/src/main/java/org/apache/nifi/processors/hubspot/GetHubSpot.java:
##########
@@ -99,7 +106,43 @@ public class GetHubSpot extends AbstractProcessor {
             .description("The maximum number of results to request for each invocation of the Processor")
             .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
             .required(false)
-            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+            .addValidator(StandardValidators.createLongValidator(1, 100, true))
+            .build();
+
+    static final PropertyDescriptor IS_INCREMENTAL = new PropertyDescriptor.Builder()
+            .name("is-incremental")
+            .displayName("Incremental Loading")
+            .description("The processor can incrementally load the queried objects so that each object is queried exactly once." +
+                    " For each query, the processor queries objects within a time window where the objects were modified between" +
+                    " the previous run time and the current time (optionally adjusted by the Incremental Delay property).")
+            .required(true)
+            .allowableValues("true", "false")
+            .defaultValue("false")
+            .build();
+
+    static final PropertyDescriptor INCREMENTAL_DELAY = new PropertyDescriptor.Builder()
+            .name("incremental-delay")
+            .displayName("Incremental Delay")
+            .description(("The ending timestamp of the time window will be adjusted earlier by the amount configured in this property." +
+                    " For example, with a property value of 10 seconds, an ending timestamp of 12:30:45 would be changed to 12:30:35." +
+                    " Set this property to avoid missing objects when the clock of your local machines and HubSpot servers' clock are not in sync."))
+            .required(true)
+            .defaultValue("3 sec")
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+            .dependsOn(IS_INCREMENTAL, "true")
+            .build();
+
+    static final PropertyDescriptor INCREMENTAL_INITIAL_START_TIME = new PropertyDescriptor.Builder()
+            .name("incremental-initial-start-time")
+            .displayName("Incremental Initial Start Time")
+            .description("This property specifies the start time as Epoch Time that the processor applies when running the first request." +
+                    " The expected format is an ISO-like date-time with the offset and zone if available, such as '2011-12-03T10:15:30'," +
+                    " '2011-12-03T10:15:30+01:00' or '2011-12-03T10:15:30+01:00[Europe/Paris]'")
+            .required(false)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.POSITIVE_LONG_VALIDATOR)

Review Comment:
   It is not possible to enter datetime due to the `Long` validator.
   `ISO8601_INSTANT_VALIDATOR` could be used. It accepts only UTC time (e.g. `2011-12-03T10:15:30Z`), so the examples in the description need to be adjusted in that case.



##########
nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-processors/src/main/java/org/apache/nifi/processors/hubspot/GetHubSpot.java:
##########
@@ -187,78 +257,158 @@ private String getResponseBodyAsString(final ProcessContext context, final HttpR
         }
     }
 
-    private OutputStreamCallback parseHttpResponse(ProcessContext context, String endpoint, StateMap state, HttpResponseEntity response, AtomicInteger objectCountHolder) {
+    private OutputStreamCallback parseHttpResponse(final ProcessContext context, final HttpResponseEntity response, final AtomicInteger total,
+                                                   final Map<String, String> stateMap) {
         return out -> {
             try (final JsonParser jsonParser = JSON_FACTORY.createParser(response.body());
                  final JsonGenerator jsonGenerator = JSON_FACTORY.createGenerator(out, JsonEncoding.UTF8)) {
+                boolean isCursorAvailable = false;
+                final String objectType = context.getProperty(OBJECT_TYPE).getValue();
+                final String cursorKey = String.format(CURSOR_KEY_PATTERN, objectType);
                 while (jsonParser.nextToken() != null) {
+                    if (jsonParser.getCurrentToken() == JsonToken.FIELD_NAME && jsonParser.getCurrentName()
+                            .equals("total")) {
+                        jsonParser.nextToken();
+                        total.set(jsonParser.getIntValue());
+                    }
                     if (jsonParser.getCurrentToken() == JsonToken.FIELD_NAME && jsonParser.getCurrentName()
                             .equals("results")) {
                         jsonParser.nextToken();
                         jsonGenerator.copyCurrentStructure(jsonParser);
-                        objectCountHolder.incrementAndGet();
                     }
                     final String fieldName = jsonParser.getCurrentName();
-                    if (CURSOR_PARAMETER.equals(fieldName)) {
+                    if (PAGING_CURSOR.equals(fieldName)) {
+                        isCursorAvailable = true;
                         jsonParser.nextToken();
-                        Map<String, String> newStateMap = new HashMap<>(state.toMap());
-                        newStateMap.put(endpoint, jsonParser.getText());
-                        updateState(context, newStateMap);
+                        stateMap.put(cursorKey, jsonParser.getText());
                         break;
                     }
                 }
+                if (!isCursorAvailable) {
+                    stateMap.put(cursorKey, NO_PAGING);
+                }
             }
         };
     }
 
-    HttpUriBuilder getBaseUri(final ProcessContext context) {
+    URI getBaseUri(final ProcessContext context) {
         final String path = context.getProperty(OBJECT_TYPE).getValue();
         return webClientServiceProvider.getHttpUriBuilder()
                 .scheme(HTTPS)
                 .host(API_BASE_URI)
-                .encodedPath(path);
+                .encodedPath(path + "/search")
+                .build();
     }
 
-    private HttpResponseEntity getHttpResponseEntity(final String accessToken, final URI uri) {
-        return webClientServiceProvider.getWebClientService()
-                .get()
-                .uri(uri)
-                .header("Authorization", "Bearer " + accessToken)
-                .retrieve();
+    private HttpResponseEntity getHttpResponseEntity(final String accessToken, final URI uri, final String filters) {
+        final InputStream inputStream = IOUtils.toInputStream(filters, StandardCharsets.UTF_8);
+        try {
+            return webClientServiceProvider.getWebClientService()
+                    .post()
+                    .uri(uri)
+                    .header("Authorization", "Bearer " + accessToken)
+                    .header("Content-Type", "application/json")
+                    .body(inputStream, OptionalLong.of(inputStream.available()))
+                    .retrieve();
+        } catch (IOException e) {
+            throw new ProcessException("Could not transform incremental filters to input stream", e);
+        }
     }
 
-    private URI createUri(final ProcessContext context, final StateMap state) {
-        final String path = context.getProperty(OBJECT_TYPE).getValue();
-        final HttpUriBuilder uriBuilder = getBaseUri(context);
+    private String createIncrementalFilters(final ProcessContext context, final Map<String, String> stateMap) {
+        final String limit = context.getProperty(RESULT_LIMIT).getValue();
+        final String objectType = context.getProperty(OBJECT_TYPE).getValue();
+        final HubSpotObjectType hubSpotObjectType = OBJECT_TYPE_LOOKUP_MAP.get(objectType);
+        final Long incrDelayMs = context.getProperty(INCREMENTAL_DELAY).asTimePeriod(TimeUnit.MILLISECONDS);
+        final String cursorKey = String.format(CURSOR_KEY_PATTERN, objectType);
 
-        final boolean isLimitSet = context.getProperty(RESULT_LIMIT).evaluateAttributeExpressions().isSet();
-        if (isLimitSet) {
-            final String limit = context.getProperty(RESULT_LIMIT).getValue();
-            uriBuilder.addQueryParameter(LIMIT_PARAMETER, limit);
+        final ObjectNode root = OBJECT_MAPPER.createObjectNode();
+        root.put("limit", limit);
+
+        final String cursor = stateMap.get(cursorKey);
+        if (cursor != null && !NO_PAGING.equals(cursor)) {
+            root.put(PAGING_CURSOR, stateMap.get(cursorKey));
         }
+        final boolean isIncremental = context.getProperty(IS_INCREMENTAL).asBoolean();
+        if (isIncremental) {
+            final String initialStartTimeValue = context.getProperty(INCREMENTAL_INITIAL_START_TIME).getValue();
+            String initialStartTimeEpoch = getInitialStartTimeEpoch(initialStartTimeValue);
+            final String hubspotSpecificIncrementalFieldName = hubSpotObjectType.getLastModifiedDateType().getValue();
+            final String lastStartTime = stateMap.get(START_INCREMENTAL_KEY);
+            final String lastEndTime = stateMap.getOrDefault(END_INCREMENTAL_KEY, initialStartTimeEpoch);

Review Comment:
   It is quite hard to understand why `lastEndTime` is initialized with `initialStartTimeEpoch`.
   It would be better to assign the End Time value form the state only (which can be null), and use the Initial Start Time when `currentStartTime` is calculated in line 348:
   ```
   currentStartTime = lastEndTime != null ? lastEndTime : getInitialStartTimeEpoch(initialStartTimeValue)`
   ```
   In this case the epoch is calculated only when needed (once).



##########
nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-processors/src/main/java/org/apache/nifi/processors/hubspot/GetHubSpot.java:
##########
@@ -99,7 +106,43 @@ public class GetHubSpot extends AbstractProcessor {
             .description("The maximum number of results to request for each invocation of the Processor")
             .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
             .required(false)
-            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+            .addValidator(StandardValidators.createLongValidator(1, 100, true))
+            .build();
+
+    static final PropertyDescriptor IS_INCREMENTAL = new PropertyDescriptor.Builder()
+            .name("is-incremental")
+            .displayName("Incremental Loading")
+            .description("The processor can incrementally load the queried objects so that each object is queried exactly once." +
+                    " For each query, the processor queries objects within a time window where the objects were modified between" +
+                    " the previous run time and the current time (optionally adjusted by the Incremental Delay property).")
+            .required(true)
+            .allowableValues("true", "false")
+            .defaultValue("false")
+            .build();
+
+    static final PropertyDescriptor INCREMENTAL_DELAY = new PropertyDescriptor.Builder()
+            .name("incremental-delay")
+            .displayName("Incremental Delay")
+            .description(("The ending timestamp of the time window will be adjusted earlier by the amount configured in this property." +
+                    " For example, with a property value of 10 seconds, an ending timestamp of 12:30:45 would be changed to 12:30:35." +
+                    " Set this property to avoid missing objects when the clock of your local machines and HubSpot servers' clock are not in sync."))
+            .required(true)
+            .defaultValue("3 sec")
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+            .dependsOn(IS_INCREMENTAL, "true")
+            .build();
+
+    static final PropertyDescriptor INCREMENTAL_INITIAL_START_TIME = new PropertyDescriptor.Builder()
+            .name("incremental-initial-start-time")
+            .displayName("Incremental Initial Start Time")
+            .description("This property specifies the start time as Epoch Time that the processor applies when running the first request." +

Review Comment:
   ```suggestion
               .description("This property specifies the start time that the processor applies when running the first request." +
   ```



##########
nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-processors/src/main/java/org/apache/nifi/processors/hubspot/GetHubSpot.java:
##########
@@ -187,78 +257,158 @@ private String getResponseBodyAsString(final ProcessContext context, final HttpR
         }
     }
 
-    private OutputStreamCallback parseHttpResponse(ProcessContext context, String endpoint, StateMap state, HttpResponseEntity response, AtomicInteger objectCountHolder) {
+    private OutputStreamCallback parseHttpResponse(final ProcessContext context, final HttpResponseEntity response, final AtomicInteger total,
+                                                   final Map<String, String> stateMap) {
         return out -> {
             try (final JsonParser jsonParser = JSON_FACTORY.createParser(response.body());
                  final JsonGenerator jsonGenerator = JSON_FACTORY.createGenerator(out, JsonEncoding.UTF8)) {
+                boolean isCursorAvailable = false;
+                final String objectType = context.getProperty(OBJECT_TYPE).getValue();
+                final String cursorKey = String.format(CURSOR_KEY_PATTERN, objectType);
                 while (jsonParser.nextToken() != null) {
+                    if (jsonParser.getCurrentToken() == JsonToken.FIELD_NAME && jsonParser.getCurrentName()
+                            .equals("total")) {
+                        jsonParser.nextToken();
+                        total.set(jsonParser.getIntValue());
+                    }
                     if (jsonParser.getCurrentToken() == JsonToken.FIELD_NAME && jsonParser.getCurrentName()
                             .equals("results")) {
                         jsonParser.nextToken();
                         jsonGenerator.copyCurrentStructure(jsonParser);
-                        objectCountHolder.incrementAndGet();
                     }
                     final String fieldName = jsonParser.getCurrentName();
-                    if (CURSOR_PARAMETER.equals(fieldName)) {
+                    if (PAGING_CURSOR.equals(fieldName)) {
+                        isCursorAvailable = true;
                         jsonParser.nextToken();
-                        Map<String, String> newStateMap = new HashMap<>(state.toMap());
-                        newStateMap.put(endpoint, jsonParser.getText());
-                        updateState(context, newStateMap);
+                        stateMap.put(cursorKey, jsonParser.getText());
                         break;
                     }
                 }
+                if (!isCursorAvailable) {
+                    stateMap.put(cursorKey, NO_PAGING);
+                }
             }
         };
     }
 
-    HttpUriBuilder getBaseUri(final ProcessContext context) {
+    URI getBaseUri(final ProcessContext context) {
         final String path = context.getProperty(OBJECT_TYPE).getValue();
         return webClientServiceProvider.getHttpUriBuilder()
                 .scheme(HTTPS)
                 .host(API_BASE_URI)
-                .encodedPath(path);
+                .encodedPath(path + "/search")
+                .build();
     }
 
-    private HttpResponseEntity getHttpResponseEntity(final String accessToken, final URI uri) {
-        return webClientServiceProvider.getWebClientService()
-                .get()
-                .uri(uri)
-                .header("Authorization", "Bearer " + accessToken)
-                .retrieve();
+    private HttpResponseEntity getHttpResponseEntity(final String accessToken, final URI uri, final String filters) {
+        final InputStream inputStream = IOUtils.toInputStream(filters, StandardCharsets.UTF_8);
+        try {
+            return webClientServiceProvider.getWebClientService()
+                    .post()
+                    .uri(uri)
+                    .header("Authorization", "Bearer " + accessToken)
+                    .header("Content-Type", "application/json")
+                    .body(inputStream, OptionalLong.of(inputStream.available()))
+                    .retrieve();
+        } catch (IOException e) {
+            throw new ProcessException("Could not transform incremental filters to input stream", e);
+        }
     }
 
-    private URI createUri(final ProcessContext context, final StateMap state) {
-        final String path = context.getProperty(OBJECT_TYPE).getValue();
-        final HttpUriBuilder uriBuilder = getBaseUri(context);
+    private String createIncrementalFilters(final ProcessContext context, final Map<String, String> stateMap) {
+        final String limit = context.getProperty(RESULT_LIMIT).getValue();
+        final String objectType = context.getProperty(OBJECT_TYPE).getValue();
+        final HubSpotObjectType hubSpotObjectType = OBJECT_TYPE_LOOKUP_MAP.get(objectType);
+        final Long incrDelayMs = context.getProperty(INCREMENTAL_DELAY).asTimePeriod(TimeUnit.MILLISECONDS);
+        final String cursorKey = String.format(CURSOR_KEY_PATTERN, objectType);
 
-        final boolean isLimitSet = context.getProperty(RESULT_LIMIT).evaluateAttributeExpressions().isSet();
-        if (isLimitSet) {
-            final String limit = context.getProperty(RESULT_LIMIT).getValue();
-            uriBuilder.addQueryParameter(LIMIT_PARAMETER, limit);
+        final ObjectNode root = OBJECT_MAPPER.createObjectNode();
+        root.put("limit", limit);
+
+        final String cursor = stateMap.get(cursorKey);
+        if (cursor != null && !NO_PAGING.equals(cursor)) {
+            root.put(PAGING_CURSOR, stateMap.get(cursorKey));
         }

Review Comment:
   ```suggestion
           final String cursor = stateMap.get(cursorKey);
           if (cursor != null && !NO_PAGING.equals(cursor)) {
               root.put(PAGING_CURSOR, cursor);
           }
   ```



##########
nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-processors/src/main/java/org/apache/nifi/processors/hubspot/GetHubSpot.java:
##########
@@ -47,31 +48,37 @@
 import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.web.client.api.HttpResponseEntity;
 import org.apache.nifi.web.client.api.HttpResponseStatus;
-import org.apache.nifi.web.client.api.HttpUriBuilder;
 import org.apache.nifi.web.client.provider.api.WebClientServiceProvider;
 
 import java.io.IOException;
+import java.io.InputStream;
 import java.io.UncheckedIOException;
 import java.net.URI;
 import java.nio.charset.StandardCharsets;
+import java.time.Instant;
+import java.time.format.DateTimeFormatter;
+import java.time.temporal.TemporalAccessor;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.OptionalLong;
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Function;
+import java.util.stream.Collectors;
 
 @PrimaryNodeOnly
 @TriggerSerially
-@TriggerWhenEmpty
 @InputRequirement(Requirement.INPUT_FORBIDDEN)
 @Tags({"hubspot"})
 @CapabilityDescription("Retrieves JSON data from a private HubSpot application."
-        + " Configuring the Result Limit property enables incremental retrieval of results. When this property is set the processor will"
-        + " retrieve new records. This processor is intended to be run on the Primary Node only.")
-@Stateful(scopes = Scope.CLUSTER, description = "When the 'Limit' attribute is set, the paging cursor is saved after executing a request."
-        + " Only the objects after the paging cursor will be retrieved. The maximum number of retrieved objects is the 'Limit' attribute.")
+        + " This processor is intended to be run on the Primary Node only.")
+@Stateful(scopes = Scope.CLUSTER, description = "In case of incremental loading the processor run time is stored in the state." +
+        " When the 'Limit' attribute is set, the paging cursor is saved after executing a request. Only the objects after the paging" +
+        " cursor will be retrieved. The maximum number of retrieved objects can be set in the 'Limit' property.")

Review Comment:
   ```suggestion
   @Stateful(scopes = Scope.CLUSTER, description = "In case of incremental loading, the start and end timestamps of the last query time window are stored in the state." +
           " When the 'Result Limit' attribute is set, the paging cursor is saved after executing a request. Only the objects after the paging" +
           " cursor will be retrieved. The maximum number of retrieved objects can be set in the 'Result Limit' property.")
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] turcsanyip commented on a diff in pull request #6379: NIFI-10463: Fix GetHubSpot incremental loading

Posted by GitBox <gi...@apache.org>.
turcsanyip commented on code in PR #6379:
URL: https://github.com/apache/nifi/pull/6379#discussion_r977962708


##########
nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-processors/src/main/java/org/apache/nifi/processors/hubspot/HubSpotObjectType.java:
##########
@@ -18,97 +18,107 @@
 
 import org.apache.nifi.components.DescribedValue;
 
+import static org.apache.nifi.processors.hubspot.IncrementalFieldType.HS_LAST_MODIFIED_DATE;
+import static org.apache.nifi.processors.hubspot.IncrementalFieldType.LAST_MODIFIED_DATE;
+
 public enum HubSpotObjectType implements DescribedValue {
 
     COMPANIES(
             "/crm/v3/objects/companies",
             "Companies",
             "In HubSpot, the companies object is a standard CRM object. Individual company records can be used to store information about businesses" +
-                    " and organizations within company properties."
+                    " and organizations within company properties.",
+            HS_LAST_MODIFIED_DATE
     ),
     CONTACTS(
             "/crm/v3/objects/contacts",
             "Contacts",
             "In HubSpot, contacts store information about individuals. From marketing automation to smart content, the lead-specific data found in" +
-                    " contact records helps users leverage much of HubSpot's functionality."
+                    " contact records helps users leverage much of HubSpot's functionality.",
+            LAST_MODIFIED_DATE
     ),
     DEALS(
             "/crm/v3/objects/deals",
             "Deals",
             "In HubSpot, a deal represents an ongoing transaction that a sales team is pursuing with a contact or company. It’s tracked through" +
-                    " pipeline stages until won or lost."
+                    " pipeline stages until won or lost.",
+            HS_LAST_MODIFIED_DATE
     ),
     FEEDBACK_SUBMISSIONS(

Review Comment:
   Feedback Submissions API is currently in beta. I'd suggest removing it until it becomes GA.



##########
nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-processors/src/main/resources/META-INF/NOTICE:
##########
@@ -0,0 +1,45 @@
+nifi-airtable-nar
+Copyright 2014-2022 The Apache Software Foundation
+
+This product includes software developed at
+The Apache Software Foundation (http://www.apache.org/).
+
+**************************
+Apache Software License v2
+**************************
+
+  (ASLv2) Apache Commons Lang
+    The following NOTICE information applies:
+      Apache Commons Lang
+      Copyright 2001-2015 The Apache Software Foundation
+
+      This product includes software from the Spring Framework,
+      under the Apache License 2.0 (see: StringUtils.containsWhitespace())

Review Comment:
   I cannot see `Apache Commons Lang` dependency in the HubSpot bundle. If this is correct, please remove this entry.



##########
nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-processors/src/main/java/org/apache/nifi/processors/hubspot/GetHubSpot.java:
##########
@@ -75,6 +82,10 @@
 @DefaultSettings(yieldDuration = "10 sec")
 public class GetHubSpot extends AbstractProcessor {
 
+    static final AllowableValue CREATE_DATE = new AllowableValue("createDate", "Create Date", "The time of the field was created");
+    static final AllowableValue LAST_MODIFIED_DATE = new AllowableValue("lastModifiedDate", "Last Modified Date",
+            "The time of the field was last modified");
+

Review Comment:
   Unused constants.



##########
nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-processors/src/main/java/org/apache/nifi/processors/hubspot/GetHubSpot.java:
##########
@@ -99,7 +110,39 @@ public class GetHubSpot extends AbstractProcessor {
             .description("The maximum number of results to request for each invocation of the Processor")
             .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
             .required(false)
-            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+            .addValidator(StandardValidators.createLongValidator(1, 100, true))
+            .build();
+
+    static final PropertyDescriptor IS_INCREMENTAL = new PropertyDescriptor.Builder()
+            .name("is-incremental")
+            .displayName("Incremental Loading")
+            .description("The processor can incrementally load the queried objects so that each object is queried exactly once." +
+                    " For each query, the processor queries objects which were created or modified after the previous run time" +
+                    " but before the current time.")
+            .required(true)
+            .allowableValues("true", "false")
+            .defaultValue("false")
+            .build();
+
+    static final PropertyDescriptor INCREMENTAL_DELAY = new PropertyDescriptor.Builder()
+            .name("incremental-delay")
+            .displayName("Incremental Delay")
+            .description("The ending timestamp of the time window will be adjusted earlier by the amount configured in this property." +
+                    " For example, with a property value of 10 seconds, an ending timestamp of 12:30:45 would be changed to 12:30:35.")
+            .required(false)

Review Comment:
   I'd suggest setting it required with default value `3s` because some difference between local and server times is always expected.



##########
nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-processors/src/main/java/org/apache/nifi/processors/hubspot/GetHubSpot.java:
##########
@@ -154,17 +207,20 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
         final String accessToken = context.getProperty(ACCESS_TOKEN).getValue();
         final String endpoint = context.getProperty(OBJECT_TYPE).getValue();
 
-        final StateMap state = getStateMap(context);
-        final URI uri = createUri(context, state);
+        final URI uri = getBaseUri(context);
 
-        final HttpResponseEntity response = getHttpResponseEntity(accessToken, uri);
-        final AtomicInteger objectCountHolder = new AtomicInteger();
+        final AtomicInteger total = new AtomicInteger(-1);
+        final StateMap state = getStateMap(context);
+        final Map<String, String> stateMap = new HashMap<>(state.toMap());
+        final String filters = createIncrementalFilters(context, stateMap);
+        final HttpResponseEntity response = getHttpResponseEntity(accessToken, uri, filters);
 
         if (response.statusCode() == HttpResponseStatus.OK.getCode()) {
             FlowFile flowFile = session.create();
-            flowFile = session.write(flowFile, parseHttpResponse(context, endpoint, state, response, objectCountHolder));
-            if (objectCountHolder.get() > 0) {
+            flowFile = session.write(flowFile, parseHttpResponse(context, response, total, stateMap));
+            if (total.get() > 0) {
                 session.transfer(flowFile, REL_SUCCESS);
+                updateState(context, stateMap);
             } else {
                 getLogger().debug("Empty response when requested HubSpot endpoint: [{}]", endpoint);
                 session.remove(flowFile);

Review Comment:
   The processor should yield in case of empty response in order not to hit the HubSpot API too frequently and unnecessarily when no new data is available.



##########
nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-processors/src/main/java/org/apache/nifi/processors/hubspot/GetHubSpot.java:
##########
@@ -187,61 +243,122 @@ private String getResponseBodyAsString(final ProcessContext context, final HttpR
         }
     }
 
-    private OutputStreamCallback parseHttpResponse(ProcessContext context, String endpoint, StateMap state, HttpResponseEntity response, AtomicInteger objectCountHolder) {
+    private OutputStreamCallback parseHttpResponse(final ProcessContext context, final HttpResponseEntity response, final AtomicInteger total,
+                                                   final Map<String, String> stateMap) {
         return out -> {
             try (final JsonParser jsonParser = JSON_FACTORY.createParser(response.body());
                  final JsonGenerator jsonGenerator = JSON_FACTORY.createGenerator(out, JsonEncoding.UTF8)) {
+                boolean isCursorAvailable = false;
+                final String objectType = context.getProperty(OBJECT_TYPE).getValue();
+                final String cursorKey = String.format(CURSOR_KEY_PATTERN, objectType);
                 while (jsonParser.nextToken() != null) {
+                    if (jsonParser.getCurrentToken() == JsonToken.FIELD_NAME && jsonParser.getCurrentName()
+                            .equals("total")) {
+                        jsonParser.nextToken();
+                        total.set(jsonParser.getIntValue());
+                    }
                     if (jsonParser.getCurrentToken() == JsonToken.FIELD_NAME && jsonParser.getCurrentName()
                             .equals("results")) {
                         jsonParser.nextToken();
                         jsonGenerator.copyCurrentStructure(jsonParser);
-                        objectCountHolder.incrementAndGet();
                     }
                     final String fieldName = jsonParser.getCurrentName();
-                    if (CURSOR_PARAMETER.equals(fieldName)) {
+                    if (PAGING_CURSOR.equals(fieldName)) {
+                        isCursorAvailable = true;
                         jsonParser.nextToken();
-                        Map<String, String> newStateMap = new HashMap<>(state.toMap());
-                        newStateMap.put(endpoint, jsonParser.getText());
-                        updateState(context, newStateMap);
+                        stateMap.put(cursorKey, jsonParser.getText());
                         break;
                     }
                 }
+                if (!isCursorAvailable) {
+                    stateMap.put(cursorKey, NO_PAGING);
+                }
             }
         };
     }
 
-    HttpUriBuilder getBaseUri(final ProcessContext context) {
+    URI getBaseUri(final ProcessContext context) {
         final String path = context.getProperty(OBJECT_TYPE).getValue();
         return webClientServiceProvider.getHttpUriBuilder()
                 .scheme(HTTPS)
                 .host(API_BASE_URI)
-                .encodedPath(path);
+                .encodedPath(path + "/search")
+                .build();
     }
 
-    private HttpResponseEntity getHttpResponseEntity(final String accessToken, final URI uri) {
+    private HttpResponseEntity getHttpResponseEntity(final String accessToken, final URI uri, final String filters) {
+        final JsonInputStreamConverter converter = new JsonInputStreamConverter(filters);

Review Comment:
   `IOUtils.toInputStream(filters, StandardCharsets.UTF_8)` could be used with `available()` method for `contentLength`.



##########
nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-processors/src/main/java/org/apache/nifi/processors/hubspot/GetHubSpot.java:
##########
@@ -154,17 +207,20 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
         final String accessToken = context.getProperty(ACCESS_TOKEN).getValue();
         final String endpoint = context.getProperty(OBJECT_TYPE).getValue();
 
-        final StateMap state = getStateMap(context);
-        final URI uri = createUri(context, state);
+        final URI uri = getBaseUri(context);
 
-        final HttpResponseEntity response = getHttpResponseEntity(accessToken, uri);
-        final AtomicInteger objectCountHolder = new AtomicInteger();
+        final AtomicInteger total = new AtomicInteger(-1);
+        final StateMap state = getStateMap(context);
+        final Map<String, String> stateMap = new HashMap<>(state.toMap());

Review Comment:
   The method could return `Map<String, String>` instead. `StateMap` is not used directly.



##########
nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-processors/src/main/java/org/apache/nifi/processors/hubspot/GetHubSpot.java:
##########
@@ -117,18 +160,28 @@ public class GetHubSpot extends AbstractProcessor {
 
     private static final String API_BASE_URI = "api.hubapi.com";
     private static final String HTTPS = "https";
-    private static final String CURSOR_PARAMETER = "after";
-    private static final String LIMIT_PARAMETER = "limit";
     private static final int TOO_MANY_REQUESTS = 429;
     private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
     private static final JsonFactory JSON_FACTORY = OBJECT_MAPPER.getFactory();
+    private static final Map<String, HubSpotObjectType> objectTypeLookupMap = createObjectTypeLookupMap();
+    private static final String NO_PAGING = "no paging";
+    private static final String PAGING_CURSOR = "after";
+    private static final String CURSOR_KEY_PATTERN = "paging_next: %s";
+
+    private static Map<String, HubSpotObjectType> createObjectTypeLookupMap() {
+        return Arrays.stream(HubSpotObjectType.values())
+                .collect(Collectors.toMap(HubSpotObjectType::getValue, Function.identity()));
+    }
 
     private volatile WebClientServiceProvider webClientServiceProvider;
 
     private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS = Collections.unmodifiableList(Arrays.asList(
             OBJECT_TYPE,

Review Comment:
   The processor should reset its state when the user changes the Object Type.



##########
nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-processors/src/main/java/org/apache/nifi/processors/hubspot/GetHubSpot.java:
##########
@@ -99,7 +110,39 @@ public class GetHubSpot extends AbstractProcessor {
             .description("The maximum number of results to request for each invocation of the Processor")
             .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
             .required(false)
-            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+            .addValidator(StandardValidators.createLongValidator(1, 100, true))
+            .build();
+
+    static final PropertyDescriptor IS_INCREMENTAL = new PropertyDescriptor.Builder()
+            .name("is-incremental")
+            .displayName("Incremental Loading")
+            .description("The processor can incrementally load the queried objects so that each object is queried exactly once." +
+                    " For each query, the processor queries objects which were created or modified after the previous run time" +
+                    " but before the current time.")

Review Comment:
   ```suggestion
                       " but before the current time (query time window).")
   ```



##########
nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-processors/src/main/resources/META-INF/LICENSE:
##########
@@ -0,0 +1,202 @@
+
+                                 Apache License
+                           Version 2.0, January 2004
+                        http://www.apache.org/licenses/
+

Review Comment:
   LICENSE / NOTICE files need to be added in the NAR module, not in the processors module.



##########
nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-processors/src/main/java/org/apache/nifi/processors/hubspot/GetHubSpot.java:
##########
@@ -117,18 +160,28 @@ public class GetHubSpot extends AbstractProcessor {
 
     private static final String API_BASE_URI = "api.hubapi.com";
     private static final String HTTPS = "https";
-    private static final String CURSOR_PARAMETER = "after";
-    private static final String LIMIT_PARAMETER = "limit";
     private static final int TOO_MANY_REQUESTS = 429;
     private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
     private static final JsonFactory JSON_FACTORY = OBJECT_MAPPER.getFactory();
+    private static final Map<String, HubSpotObjectType> objectTypeLookupMap = createObjectTypeLookupMap();

Review Comment:
   Please always use upper case for constants.



##########
nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-processors/src/main/java/org/apache/nifi/processors/hubspot/GetHubSpot.java:
##########
@@ -75,6 +82,10 @@
 @DefaultSettings(yieldDuration = "10 sec")
 public class GetHubSpot extends AbstractProcessor {

Review Comment:
   Being a source processor, `@TriggerWhenEmpty` is not applicable here. Please remove it.
   
   Please also update `@Stateful`-s description with incremental loading state.



##########
nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-processors/src/main/java/org/apache/nifi/processors/hubspot/GetHubSpot.java:
##########
@@ -99,7 +110,39 @@ public class GetHubSpot extends AbstractProcessor {
             .description("The maximum number of results to request for each invocation of the Processor")
             .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
             .required(false)
-            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+            .addValidator(StandardValidators.createLongValidator(1, 100, true))
+            .build();
+
+    static final PropertyDescriptor IS_INCREMENTAL = new PropertyDescriptor.Builder()
+            .name("is-incremental")
+            .displayName("Incremental Loading")
+            .description("The processor can incrementally load the queried objects so that each object is queried exactly once." +
+                    " For each query, the processor queries objects which were created or modified after the previous run time" +
+                    " but before the current time.")
+            .required(true)
+            .allowableValues("true", "false")
+            .defaultValue("false")
+            .build();
+
+    static final PropertyDescriptor INCREMENTAL_DELAY = new PropertyDescriptor.Builder()
+            .name("incremental-delay")
+            .displayName("Incremental Delay")
+            .description("The ending timestamp of the time window will be adjusted earlier by the amount configured in this property." +
+                    " For example, with a property value of 10 seconds, an ending timestamp of 12:30:45 would be changed to 12:30:35.")
+            .required(false)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+            .dependsOn(IS_INCREMENTAL, "true")
+            .build();
+
+    static final PropertyDescriptor INITIAL_INCREMENTAL_FILTER = new PropertyDescriptor.Builder()
+            .name("initial-incremental-filter")
+            .displayName("Initial Incremental Start Time")

Review Comment:
   It is a good practice to align the property name, display name and variable name.



##########
nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-processors/src/main/java/org/apache/nifi/processors/hubspot/HubSpotObjectType.java:
##########
@@ -18,97 +18,107 @@
 
 import org.apache.nifi.components.DescribedValue;
 
+import static org.apache.nifi.processors.hubspot.IncrementalFieldType.HS_LAST_MODIFIED_DATE;
+import static org.apache.nifi.processors.hubspot.IncrementalFieldType.LAST_MODIFIED_DATE;
+
 public enum HubSpotObjectType implements DescribedValue {
 
     COMPANIES(
             "/crm/v3/objects/companies",
             "Companies",
             "In HubSpot, the companies object is a standard CRM object. Individual company records can be used to store information about businesses" +
-                    " and organizations within company properties."
+                    " and organizations within company properties.",
+            HS_LAST_MODIFIED_DATE

Review Comment:
   The Additional Details page is not in sync with this entity list. Please update the documentation.



##########
nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-processors/src/main/java/org/apache/nifi/processors/hubspot/GetHubSpot.java:
##########
@@ -99,7 +110,39 @@ public class GetHubSpot extends AbstractProcessor {
             .description("The maximum number of results to request for each invocation of the Processor")
             .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
             .required(false)
-            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+            .addValidator(StandardValidators.createLongValidator(1, 100, true))
+            .build();
+
+    static final PropertyDescriptor IS_INCREMENTAL = new PropertyDescriptor.Builder()
+            .name("is-incremental")
+            .displayName("Incremental Loading")
+            .description("The processor can incrementally load the queried objects so that each object is queried exactly once." +
+                    " For each query, the processor queries objects which were created or modified after the previous run time" +
+                    " but before the current time.")
+            .required(true)
+            .allowableValues("true", "false")
+            .defaultValue("false")
+            .build();
+
+    static final PropertyDescriptor INCREMENTAL_DELAY = new PropertyDescriptor.Builder()
+            .name("incremental-delay")
+            .displayName("Incremental Delay")
+            .description("The ending timestamp of the time window will be adjusted earlier by the amount configured in this property." +
+                    " For example, with a property value of 10 seconds, an ending timestamp of 12:30:45 would be changed to 12:30:35.")

Review Comment:
   ```suggestion
               .description("The ending timestamp of the query time window will be adjusted earlier by the amount configured in this property." +
                       " For example, with a property value of 10 seconds, an ending timestamp of 12:30:45 would be changed to 12:30:35." +
                       " Set this property to avoid missing objects when the clock of your local machines and HubSpot servers' clock are not in sync.")
   ```



##########
nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-processors/src/main/java/org/apache/nifi/processors/hubspot/GetHubSpot.java:
##########
@@ -99,7 +110,39 @@ public class GetHubSpot extends AbstractProcessor {
             .description("The maximum number of results to request for each invocation of the Processor")
             .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
             .required(false)
-            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+            .addValidator(StandardValidators.createLongValidator(1, 100, true))
+            .build();
+
+    static final PropertyDescriptor IS_INCREMENTAL = new PropertyDescriptor.Builder()
+            .name("is-incremental")
+            .displayName("Incremental Loading")
+            .description("The processor can incrementally load the queried objects so that each object is queried exactly once." +
+                    " For each query, the processor queries objects which were created or modified after the previous run time" +
+                    " but before the current time.")
+            .required(true)
+            .allowableValues("true", "false")
+            .defaultValue("false")
+            .build();
+
+    static final PropertyDescriptor INCREMENTAL_DELAY = new PropertyDescriptor.Builder()
+            .name("incremental-delay")
+            .displayName("Incremental Delay")
+            .description("The ending timestamp of the time window will be adjusted earlier by the amount configured in this property." +
+                    " For example, with a property value of 10 seconds, an ending timestamp of 12:30:45 would be changed to 12:30:35.")
+            .required(false)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+            .dependsOn(IS_INCREMENTAL, "true")
+            .build();
+
+    static final PropertyDescriptor INITIAL_INCREMENTAL_FILTER = new PropertyDescriptor.Builder()
+            .name("initial-incremental-filter")
+            .displayName("Initial Incremental Start Time")
+            .description("This property specifies the start time as Epoch Time that the processor applies when running the first request.")

Review Comment:
   It would a better user experience to configure it in human readable format.
   Suggested format: 2022-09-22T21:50:18.000Z



##########
nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-processors/src/main/java/org/apache/nifi/processors/hubspot/IncrementalFieldType.java:
##########
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.hubspot;
+
+public enum IncrementalFieldType {
+    LAST_MODIFIED_DATE("lastmodifieddate"),
+    HS_LAST_MODIFIED_DATE("hs_lastmodifieddate");
+
+    final String value;

Review Comment:
   ```suggestion
       private final String value;
   
   ```



##########
nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-processors/src/main/java/org/apache/nifi/processors/hubspot/GetHubSpot.java:
##########
@@ -187,61 +243,122 @@ private String getResponseBodyAsString(final ProcessContext context, final HttpR
         }
     }
 
-    private OutputStreamCallback parseHttpResponse(ProcessContext context, String endpoint, StateMap state, HttpResponseEntity response, AtomicInteger objectCountHolder) {
+    private OutputStreamCallback parseHttpResponse(final ProcessContext context, final HttpResponseEntity response, final AtomicInteger total,
+                                                   final Map<String, String> stateMap) {
         return out -> {
             try (final JsonParser jsonParser = JSON_FACTORY.createParser(response.body());
                  final JsonGenerator jsonGenerator = JSON_FACTORY.createGenerator(out, JsonEncoding.UTF8)) {
+                boolean isCursorAvailable = false;
+                final String objectType = context.getProperty(OBJECT_TYPE).getValue();
+                final String cursorKey = String.format(CURSOR_KEY_PATTERN, objectType);
                 while (jsonParser.nextToken() != null) {
+                    if (jsonParser.getCurrentToken() == JsonToken.FIELD_NAME && jsonParser.getCurrentName()
+                            .equals("total")) {
+                        jsonParser.nextToken();
+                        total.set(jsonParser.getIntValue());
+                    }
                     if (jsonParser.getCurrentToken() == JsonToken.FIELD_NAME && jsonParser.getCurrentName()
                             .equals("results")) {
                         jsonParser.nextToken();
                         jsonGenerator.copyCurrentStructure(jsonParser);
-                        objectCountHolder.incrementAndGet();
                     }
                     final String fieldName = jsonParser.getCurrentName();
-                    if (CURSOR_PARAMETER.equals(fieldName)) {
+                    if (PAGING_CURSOR.equals(fieldName)) {
+                        isCursorAvailable = true;
                         jsonParser.nextToken();
-                        Map<String, String> newStateMap = new HashMap<>(state.toMap());
-                        newStateMap.put(endpoint, jsonParser.getText());
-                        updateState(context, newStateMap);
+                        stateMap.put(cursorKey, jsonParser.getText());
                         break;
                     }
                 }
+                if (!isCursorAvailable) {
+                    stateMap.put(cursorKey, NO_PAGING);
+                }
             }
         };
     }
 
-    HttpUriBuilder getBaseUri(final ProcessContext context) {
+    URI getBaseUri(final ProcessContext context) {
         final String path = context.getProperty(OBJECT_TYPE).getValue();
         return webClientServiceProvider.getHttpUriBuilder()
                 .scheme(HTTPS)
                 .host(API_BASE_URI)
-                .encodedPath(path);
+                .encodedPath(path + "/search")
+                .build();
     }
 
-    private HttpResponseEntity getHttpResponseEntity(final String accessToken, final URI uri) {
+    private HttpResponseEntity getHttpResponseEntity(final String accessToken, final URI uri, final String filters) {
+        final JsonInputStreamConverter converter = new JsonInputStreamConverter(filters);
         return webClientServiceProvider.getWebClientService()
-                .get()
+                .post()
                 .uri(uri)
                 .header("Authorization", "Bearer " + accessToken)
+                .header("Content-Type", "application/json")
+                .body(converter.getInputStream(), OptionalLong.of(converter.getByteSize()))
                 .retrieve();
     }
 
-    private URI createUri(final ProcessContext context, final StateMap state) {
-        final String path = context.getProperty(OBJECT_TYPE).getValue();
-        final HttpUriBuilder uriBuilder = getBaseUri(context);
+    String createIncrementalFilters(final ProcessContext context, final Map<String, String> stateMap) {
+        final String limit = context.getProperty(RESULT_LIMIT).getValue();
+        final String objectType = context.getProperty(OBJECT_TYPE).getValue();
+        final HubSpotObjectType hubSpotObjectType = objectTypeLookupMap.get(objectType);
+        final Long incrDelayMs = context.getProperty(INCREMENTAL_DELAY).asTimePeriod(TimeUnit.MILLISECONDS);
+        final String startIncrementalKey = String.format("start: %s", objectType);
+        final String endIncrementalKey = String.format("end: %s", objectType);
+        final String cursorKey = String.format(CURSOR_KEY_PATTERN, objectType);
+
+        final ObjectNode root = OBJECT_MAPPER.createObjectNode();
+        root.put("limit", limit);
 
-        final boolean isLimitSet = context.getProperty(RESULT_LIMIT).evaluateAttributeExpressions().isSet();
-        if (isLimitSet) {
-            final String limit = context.getProperty(RESULT_LIMIT).getValue();
-            uriBuilder.addQueryParameter(LIMIT_PARAMETER, limit);
+        final String cursor = stateMap.get(cursorKey);
+        if (cursor != null && !NO_PAGING.equals(cursor)) {
+            root.put(PAGING_CURSOR, stateMap.get(cursorKey));
         }
+        final boolean isIncremental = context.getProperty(IS_INCREMENTAL).asBoolean();
+        if (isIncremental) {
+
+            final String hubspotSpecificIncrementalFieldName = hubSpotObjectType.getLastModifiedDateType().getValue();
+            final String lastStartTime = stateMap.get(startIncrementalKey);
+            final String lastEndTime = stateMap.getOrDefault(endIncrementalKey, context.getProperty(INITIAL_INCREMENTAL_FILTER).getValue());
+
+            String currentStartTime;
+            String currentEndTime;
+
+            if (cursor != null && !NO_PAGING.equals(cursor)) {
+                currentStartTime = lastStartTime;
+                // lastEndTime can be null if incremental loading was turned off beforehand
+                currentEndTime = lastEndTime != null ? lastEndTime : String.valueOf(getCurrentEpochTime());
+            } else {
+                currentStartTime = lastEndTime;
+                final long delayedCurrentEndTime = incrDelayMs != null ? getCurrentEpochTime() - incrDelayMs : getCurrentEpochTime();
+                currentEndTime = String.valueOf(delayedCurrentEndTime);
 
-        final String cursor = state.get(path);
-        if (cursor != null) {
-            uriBuilder.addQueryParameter(CURSOR_PARAMETER, cursor);
+                stateMap.put(startIncrementalKey, currentStartTime);
+                stateMap.put(endIncrementalKey, currentEndTime);
+            }
+
+            final ArrayNode filters = OBJECT_MAPPER.createArrayNode();
+
+            if (currentStartTime != null) {
+                final ObjectNode greaterThanFilterNode = OBJECT_MAPPER.createObjectNode();
+                greaterThanFilterNode.put("propertyName", hubspotSpecificIncrementalFieldName);
+                greaterThanFilterNode.put("operator", "GT");
+                greaterThanFilterNode.put("value", currentStartTime);
+                filters.add(greaterThanFilterNode);
+            }
+
+            final ObjectNode lessThanFilterNode = OBJECT_MAPPER.createObjectNode();
+            lessThanFilterNode.put("propertyName", hubspotSpecificIncrementalFieldName);
+            lessThanFilterNode.put("operator", "LT");
+            lessThanFilterNode.put("value", currentEndTime);
+            filters.add(lessThanFilterNode);
+
+            root.set("filters", filters);
         }
-        return uriBuilder.build();
+        return root.toString();
+    }
+
+    long getCurrentEpochTime() {
+        return Instant.now().toEpochMilli();
     }
 
     private StateMap getStateMap(final ProcessContext context) {

Review Comment:
   Please use `ProcessSession.getState()` and `setState()` instead of `ProcessContext.getStateManager().getState()` and `setState()`because the former is transactional and gets only committed if the session commits successfully (along with the FFs sent out).



##########
nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-processors/src/main/java/org/apache/nifi/processors/hubspot/GetHubSpot.java:
##########
@@ -187,61 +243,122 @@ private String getResponseBodyAsString(final ProcessContext context, final HttpR
         }
     }
 
-    private OutputStreamCallback parseHttpResponse(ProcessContext context, String endpoint, StateMap state, HttpResponseEntity response, AtomicInteger objectCountHolder) {
+    private OutputStreamCallback parseHttpResponse(final ProcessContext context, final HttpResponseEntity response, final AtomicInteger total,
+                                                   final Map<String, String> stateMap) {
         return out -> {
             try (final JsonParser jsonParser = JSON_FACTORY.createParser(response.body());
                  final JsonGenerator jsonGenerator = JSON_FACTORY.createGenerator(out, JsonEncoding.UTF8)) {
+                boolean isCursorAvailable = false;
+                final String objectType = context.getProperty(OBJECT_TYPE).getValue();
+                final String cursorKey = String.format(CURSOR_KEY_PATTERN, objectType);
                 while (jsonParser.nextToken() != null) {
+                    if (jsonParser.getCurrentToken() == JsonToken.FIELD_NAME && jsonParser.getCurrentName()
+                            .equals("total")) {
+                        jsonParser.nextToken();
+                        total.set(jsonParser.getIntValue());
+                    }
                     if (jsonParser.getCurrentToken() == JsonToken.FIELD_NAME && jsonParser.getCurrentName()
                             .equals("results")) {
                         jsonParser.nextToken();
                         jsonGenerator.copyCurrentStructure(jsonParser);
-                        objectCountHolder.incrementAndGet();
                     }
                     final String fieldName = jsonParser.getCurrentName();
-                    if (CURSOR_PARAMETER.equals(fieldName)) {
+                    if (PAGING_CURSOR.equals(fieldName)) {
+                        isCursorAvailable = true;
                         jsonParser.nextToken();
-                        Map<String, String> newStateMap = new HashMap<>(state.toMap());
-                        newStateMap.put(endpoint, jsonParser.getText());
-                        updateState(context, newStateMap);
+                        stateMap.put(cursorKey, jsonParser.getText());
                         break;
                     }
                 }
+                if (!isCursorAvailable) {
+                    stateMap.put(cursorKey, NO_PAGING);
+                }
             }
         };
     }
 
-    HttpUriBuilder getBaseUri(final ProcessContext context) {
+    URI getBaseUri(final ProcessContext context) {
         final String path = context.getProperty(OBJECT_TYPE).getValue();
         return webClientServiceProvider.getHttpUriBuilder()
                 .scheme(HTTPS)
                 .host(API_BASE_URI)
-                .encodedPath(path);
+                .encodedPath(path + "/search")
+                .build();
     }
 
-    private HttpResponseEntity getHttpResponseEntity(final String accessToken, final URI uri) {
+    private HttpResponseEntity getHttpResponseEntity(final String accessToken, final URI uri, final String filters) {
+        final JsonInputStreamConverter converter = new JsonInputStreamConverter(filters);
         return webClientServiceProvider.getWebClientService()
-                .get()
+                .post()
                 .uri(uri)
                 .header("Authorization", "Bearer " + accessToken)
+                .header("Content-Type", "application/json")
+                .body(converter.getInputStream(), OptionalLong.of(converter.getByteSize()))
                 .retrieve();
     }
 
-    private URI createUri(final ProcessContext context, final StateMap state) {
-        final String path = context.getProperty(OBJECT_TYPE).getValue();
-        final HttpUriBuilder uriBuilder = getBaseUri(context);
+    String createIncrementalFilters(final ProcessContext context, final Map<String, String> stateMap) {
+        final String limit = context.getProperty(RESULT_LIMIT).getValue();
+        final String objectType = context.getProperty(OBJECT_TYPE).getValue();
+        final HubSpotObjectType hubSpotObjectType = objectTypeLookupMap.get(objectType);
+        final Long incrDelayMs = context.getProperty(INCREMENTAL_DELAY).asTimePeriod(TimeUnit.MILLISECONDS);
+        final String startIncrementalKey = String.format("start: %s", objectType);
+        final String endIncrementalKey = String.format("end: %s", objectType);
+        final String cursorKey = String.format(CURSOR_KEY_PATTERN, objectType);
+
+        final ObjectNode root = OBJECT_MAPPER.createObjectNode();
+        root.put("limit", limit);
 
-        final boolean isLimitSet = context.getProperty(RESULT_LIMIT).evaluateAttributeExpressions().isSet();
-        if (isLimitSet) {
-            final String limit = context.getProperty(RESULT_LIMIT).getValue();
-            uriBuilder.addQueryParameter(LIMIT_PARAMETER, limit);
+        final String cursor = stateMap.get(cursorKey);
+        if (cursor != null && !NO_PAGING.equals(cursor)) {
+            root.put(PAGING_CURSOR, stateMap.get(cursorKey));
         }
+        final boolean isIncremental = context.getProperty(IS_INCREMENTAL).asBoolean();
+        if (isIncremental) {
+
+            final String hubspotSpecificIncrementalFieldName = hubSpotObjectType.getLastModifiedDateType().getValue();
+            final String lastStartTime = stateMap.get(startIncrementalKey);
+            final String lastEndTime = stateMap.getOrDefault(endIncrementalKey, context.getProperty(INITIAL_INCREMENTAL_FILTER).getValue());
+
+            String currentStartTime;
+            String currentEndTime;
+
+            if (cursor != null && !NO_PAGING.equals(cursor)) {
+                currentStartTime = lastStartTime;
+                // lastEndTime can be null if incremental loading was turned off beforehand
+                currentEndTime = lastEndTime != null ? lastEndTime : String.valueOf(getCurrentEpochTime());
+            } else {
+                currentStartTime = lastEndTime;
+                final long delayedCurrentEndTime = incrDelayMs != null ? getCurrentEpochTime() - incrDelayMs : getCurrentEpochTime();
+                currentEndTime = String.valueOf(delayedCurrentEndTime);
 
-        final String cursor = state.get(path);
-        if (cursor != null) {
-            uriBuilder.addQueryParameter(CURSOR_PARAMETER, cursor);
+                stateMap.put(startIncrementalKey, currentStartTime);
+                stateMap.put(endIncrementalKey, currentEndTime);
+            }
+
+            final ArrayNode filters = OBJECT_MAPPER.createArrayNode();
+
+            if (currentStartTime != null) {
+                final ObjectNode greaterThanFilterNode = OBJECT_MAPPER.createObjectNode();
+                greaterThanFilterNode.put("propertyName", hubspotSpecificIncrementalFieldName);
+                greaterThanFilterNode.put("operator", "GT");

Review Comment:
   One end of the time window must be inclusive in order not to loose objects with timestamp matching the window boundaries.
   ```suggestion
                   greaterThanFilterNode.put("operator", "GTE");
   ```



##########
nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-processors/src/main/java/org/apache/nifi/processors/hubspot/GetHubSpot.java:
##########
@@ -154,17 +207,20 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
         final String accessToken = context.getProperty(ACCESS_TOKEN).getValue();
         final String endpoint = context.getProperty(OBJECT_TYPE).getValue();
 
-        final StateMap state = getStateMap(context);
-        final URI uri = createUri(context, state);
+        final URI uri = getBaseUri(context);
 
-        final HttpResponseEntity response = getHttpResponseEntity(accessToken, uri);
-        final AtomicInteger objectCountHolder = new AtomicInteger();
+        final AtomicInteger total = new AtomicInteger(-1);
+        final StateMap state = getStateMap(context);
+        final Map<String, String> stateMap = new HashMap<>(state.toMap());
+        final String filters = createIncrementalFilters(context, stateMap);
+        final HttpResponseEntity response = getHttpResponseEntity(accessToken, uri, filters);
 
         if (response.statusCode() == HttpResponseStatus.OK.getCode()) {
             FlowFile flowFile = session.create();
-            flowFile = session.write(flowFile, parseHttpResponse(context, endpoint, state, response, objectCountHolder));
-            if (objectCountHolder.get() > 0) {
+            flowFile = session.write(flowFile, parseHttpResponse(context, response, total, stateMap));
+            if (total.get() > 0) {
                 session.transfer(flowFile, REL_SUCCESS);
+                updateState(context, stateMap);

Review Comment:
   I would update the state with the new start/end times even when no new objects have been found in this round in order to make the progress visible in the state variables.



##########
nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-processors/src/main/java/org/apache/nifi/processors/hubspot/GetHubSpot.java:
##########
@@ -187,61 +243,122 @@ private String getResponseBodyAsString(final ProcessContext context, final HttpR
         }
     }
 
-    private OutputStreamCallback parseHttpResponse(ProcessContext context, String endpoint, StateMap state, HttpResponseEntity response, AtomicInteger objectCountHolder) {
+    private OutputStreamCallback parseHttpResponse(final ProcessContext context, final HttpResponseEntity response, final AtomicInteger total,
+                                                   final Map<String, String> stateMap) {
         return out -> {
             try (final JsonParser jsonParser = JSON_FACTORY.createParser(response.body());
                  final JsonGenerator jsonGenerator = JSON_FACTORY.createGenerator(out, JsonEncoding.UTF8)) {
+                boolean isCursorAvailable = false;
+                final String objectType = context.getProperty(OBJECT_TYPE).getValue();
+                final String cursorKey = String.format(CURSOR_KEY_PATTERN, objectType);
                 while (jsonParser.nextToken() != null) {
+                    if (jsonParser.getCurrentToken() == JsonToken.FIELD_NAME && jsonParser.getCurrentName()
+                            .equals("total")) {
+                        jsonParser.nextToken();
+                        total.set(jsonParser.getIntValue());
+                    }
                     if (jsonParser.getCurrentToken() == JsonToken.FIELD_NAME && jsonParser.getCurrentName()
                             .equals("results")) {
                         jsonParser.nextToken();
                         jsonGenerator.copyCurrentStructure(jsonParser);
-                        objectCountHolder.incrementAndGet();
                     }
                     final String fieldName = jsonParser.getCurrentName();
-                    if (CURSOR_PARAMETER.equals(fieldName)) {
+                    if (PAGING_CURSOR.equals(fieldName)) {
+                        isCursorAvailable = true;
                         jsonParser.nextToken();
-                        Map<String, String> newStateMap = new HashMap<>(state.toMap());
-                        newStateMap.put(endpoint, jsonParser.getText());
-                        updateState(context, newStateMap);
+                        stateMap.put(cursorKey, jsonParser.getText());
                         break;
                     }
                 }
+                if (!isCursorAvailable) {
+                    stateMap.put(cursorKey, NO_PAGING);
+                }
             }
         };
     }
 
-    HttpUriBuilder getBaseUri(final ProcessContext context) {
+    URI getBaseUri(final ProcessContext context) {
         final String path = context.getProperty(OBJECT_TYPE).getValue();
         return webClientServiceProvider.getHttpUriBuilder()
                 .scheme(HTTPS)
                 .host(API_BASE_URI)
-                .encodedPath(path);
+                .encodedPath(path + "/search")
+                .build();
     }
 
-    private HttpResponseEntity getHttpResponseEntity(final String accessToken, final URI uri) {
+    private HttpResponseEntity getHttpResponseEntity(final String accessToken, final URI uri, final String filters) {
+        final JsonInputStreamConverter converter = new JsonInputStreamConverter(filters);
         return webClientServiceProvider.getWebClientService()
-                .get()
+                .post()
                 .uri(uri)
                 .header("Authorization", "Bearer " + accessToken)
+                .header("Content-Type", "application/json")
+                .body(converter.getInputStream(), OptionalLong.of(converter.getByteSize()))
                 .retrieve();
     }
 
-    private URI createUri(final ProcessContext context, final StateMap state) {
-        final String path = context.getProperty(OBJECT_TYPE).getValue();
-        final HttpUriBuilder uriBuilder = getBaseUri(context);
+    String createIncrementalFilters(final ProcessContext context, final Map<String, String> stateMap) {
+        final String limit = context.getProperty(RESULT_LIMIT).getValue();
+        final String objectType = context.getProperty(OBJECT_TYPE).getValue();
+        final HubSpotObjectType hubSpotObjectType = objectTypeLookupMap.get(objectType);
+        final Long incrDelayMs = context.getProperty(INCREMENTAL_DELAY).asTimePeriod(TimeUnit.MILLISECONDS);
+        final String startIncrementalKey = String.format("start: %s", objectType);
+        final String endIncrementalKey = String.format("end: %s", objectType);
+        final String cursorKey = String.format(CURSOR_KEY_PATTERN, objectType);
+
+        final ObjectNode root = OBJECT_MAPPER.createObjectNode();
+        root.put("limit", limit);
 
-        final boolean isLimitSet = context.getProperty(RESULT_LIMIT).evaluateAttributeExpressions().isSet();
-        if (isLimitSet) {
-            final String limit = context.getProperty(RESULT_LIMIT).getValue();
-            uriBuilder.addQueryParameter(LIMIT_PARAMETER, limit);
+        final String cursor = stateMap.get(cursorKey);
+        if (cursor != null && !NO_PAGING.equals(cursor)) {
+            root.put(PAGING_CURSOR, stateMap.get(cursorKey));
         }
+        final boolean isIncremental = context.getProperty(IS_INCREMENTAL).asBoolean();
+        if (isIncremental) {
+
+            final String hubspotSpecificIncrementalFieldName = hubSpotObjectType.getLastModifiedDateType().getValue();
+            final String lastStartTime = stateMap.get(startIncrementalKey);
+            final String lastEndTime = stateMap.getOrDefault(endIncrementalKey, context.getProperty(INITIAL_INCREMENTAL_FILTER).getValue());
+
+            String currentStartTime;
+            String currentEndTime;

Review Comment:
   These cloud be `final` too.



##########
nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-processors/src/main/java/org/apache/nifi/processors/hubspot/GetHubSpot.java:
##########
@@ -187,61 +243,122 @@ private String getResponseBodyAsString(final ProcessContext context, final HttpR
         }
     }
 
-    private OutputStreamCallback parseHttpResponse(ProcessContext context, String endpoint, StateMap state, HttpResponseEntity response, AtomicInteger objectCountHolder) {
+    private OutputStreamCallback parseHttpResponse(final ProcessContext context, final HttpResponseEntity response, final AtomicInteger total,
+                                                   final Map<String, String> stateMap) {
         return out -> {
             try (final JsonParser jsonParser = JSON_FACTORY.createParser(response.body());
                  final JsonGenerator jsonGenerator = JSON_FACTORY.createGenerator(out, JsonEncoding.UTF8)) {
+                boolean isCursorAvailable = false;
+                final String objectType = context.getProperty(OBJECT_TYPE).getValue();
+                final String cursorKey = String.format(CURSOR_KEY_PATTERN, objectType);
                 while (jsonParser.nextToken() != null) {
+                    if (jsonParser.getCurrentToken() == JsonToken.FIELD_NAME && jsonParser.getCurrentName()
+                            .equals("total")) {
+                        jsonParser.nextToken();
+                        total.set(jsonParser.getIntValue());
+                    }
                     if (jsonParser.getCurrentToken() == JsonToken.FIELD_NAME && jsonParser.getCurrentName()
                             .equals("results")) {
                         jsonParser.nextToken();
                         jsonGenerator.copyCurrentStructure(jsonParser);
-                        objectCountHolder.incrementAndGet();
                     }
                     final String fieldName = jsonParser.getCurrentName();
-                    if (CURSOR_PARAMETER.equals(fieldName)) {
+                    if (PAGING_CURSOR.equals(fieldName)) {
+                        isCursorAvailable = true;
                         jsonParser.nextToken();
-                        Map<String, String> newStateMap = new HashMap<>(state.toMap());
-                        newStateMap.put(endpoint, jsonParser.getText());
-                        updateState(context, newStateMap);
+                        stateMap.put(cursorKey, jsonParser.getText());
                         break;
                     }
                 }
+                if (!isCursorAvailable) {
+                    stateMap.put(cursorKey, NO_PAGING);
+                }
             }
         };
     }
 
-    HttpUriBuilder getBaseUri(final ProcessContext context) {
+    URI getBaseUri(final ProcessContext context) {
         final String path = context.getProperty(OBJECT_TYPE).getValue();
         return webClientServiceProvider.getHttpUriBuilder()
                 .scheme(HTTPS)
                 .host(API_BASE_URI)
-                .encodedPath(path);
+                .encodedPath(path + "/search")
+                .build();
     }
 
-    private HttpResponseEntity getHttpResponseEntity(final String accessToken, final URI uri) {
+    private HttpResponseEntity getHttpResponseEntity(final String accessToken, final URI uri, final String filters) {
+        final JsonInputStreamConverter converter = new JsonInputStreamConverter(filters);
         return webClientServiceProvider.getWebClientService()
-                .get()
+                .post()
                 .uri(uri)
                 .header("Authorization", "Bearer " + accessToken)
+                .header("Content-Type", "application/json")
+                .body(converter.getInputStream(), OptionalLong.of(converter.getByteSize()))
                 .retrieve();
     }
 
-    private URI createUri(final ProcessContext context, final StateMap state) {
-        final String path = context.getProperty(OBJECT_TYPE).getValue();
-        final HttpUriBuilder uriBuilder = getBaseUri(context);
+    String createIncrementalFilters(final ProcessContext context, final Map<String, String> stateMap) {

Review Comment:
   ```suggestion
       private String createIncrementalFilters(final ProcessContext context, final Map<String, String> stateMap) {
   ```



##########
nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-processors/src/main/java/org/apache/nifi/processors/hubspot/GetHubSpot.java:
##########
@@ -187,61 +243,122 @@ private String getResponseBodyAsString(final ProcessContext context, final HttpR
         }
     }
 
-    private OutputStreamCallback parseHttpResponse(ProcessContext context, String endpoint, StateMap state, HttpResponseEntity response, AtomicInteger objectCountHolder) {
+    private OutputStreamCallback parseHttpResponse(final ProcessContext context, final HttpResponseEntity response, final AtomicInteger total,
+                                                   final Map<String, String> stateMap) {
         return out -> {
             try (final JsonParser jsonParser = JSON_FACTORY.createParser(response.body());
                  final JsonGenerator jsonGenerator = JSON_FACTORY.createGenerator(out, JsonEncoding.UTF8)) {
+                boolean isCursorAvailable = false;
+                final String objectType = context.getProperty(OBJECT_TYPE).getValue();
+                final String cursorKey = String.format(CURSOR_KEY_PATTERN, objectType);
                 while (jsonParser.nextToken() != null) {
+                    if (jsonParser.getCurrentToken() == JsonToken.FIELD_NAME && jsonParser.getCurrentName()
+                            .equals("total")) {
+                        jsonParser.nextToken();
+                        total.set(jsonParser.getIntValue());
+                    }
                     if (jsonParser.getCurrentToken() == JsonToken.FIELD_NAME && jsonParser.getCurrentName()
                             .equals("results")) {
                         jsonParser.nextToken();
                         jsonGenerator.copyCurrentStructure(jsonParser);
-                        objectCountHolder.incrementAndGet();
                     }
                     final String fieldName = jsonParser.getCurrentName();
-                    if (CURSOR_PARAMETER.equals(fieldName)) {
+                    if (PAGING_CURSOR.equals(fieldName)) {
+                        isCursorAvailable = true;
                         jsonParser.nextToken();
-                        Map<String, String> newStateMap = new HashMap<>(state.toMap());
-                        newStateMap.put(endpoint, jsonParser.getText());
-                        updateState(context, newStateMap);
+                        stateMap.put(cursorKey, jsonParser.getText());
                         break;
                     }
                 }
+                if (!isCursorAvailable) {
+                    stateMap.put(cursorKey, NO_PAGING);
+                }
             }
         };
     }
 
-    HttpUriBuilder getBaseUri(final ProcessContext context) {
+    URI getBaseUri(final ProcessContext context) {
         final String path = context.getProperty(OBJECT_TYPE).getValue();
         return webClientServiceProvider.getHttpUriBuilder()
                 .scheme(HTTPS)
                 .host(API_BASE_URI)
-                .encodedPath(path);
+                .encodedPath(path + "/search")
+                .build();
     }
 
-    private HttpResponseEntity getHttpResponseEntity(final String accessToken, final URI uri) {
+    private HttpResponseEntity getHttpResponseEntity(final String accessToken, final URI uri, final String filters) {
+        final JsonInputStreamConverter converter = new JsonInputStreamConverter(filters);
         return webClientServiceProvider.getWebClientService()
-                .get()
+                .post()
                 .uri(uri)
                 .header("Authorization", "Bearer " + accessToken)
+                .header("Content-Type", "application/json")
+                .body(converter.getInputStream(), OptionalLong.of(converter.getByteSize()))
                 .retrieve();
     }
 
-    private URI createUri(final ProcessContext context, final StateMap state) {
-        final String path = context.getProperty(OBJECT_TYPE).getValue();
-        final HttpUriBuilder uriBuilder = getBaseUri(context);
+    String createIncrementalFilters(final ProcessContext context, final Map<String, String> stateMap) {
+        final String limit = context.getProperty(RESULT_LIMIT).getValue();
+        final String objectType = context.getProperty(OBJECT_TYPE).getValue();
+        final HubSpotObjectType hubSpotObjectType = objectTypeLookupMap.get(objectType);
+        final Long incrDelayMs = context.getProperty(INCREMENTAL_DELAY).asTimePeriod(TimeUnit.MILLISECONDS);
+        final String startIncrementalKey = String.format("start: %s", objectType);
+        final String endIncrementalKey = String.format("end: %s", objectType);

Review Comment:
   Is it necessary to add the `objectType` in the state? The processor handles only one object type at a time.
   
   I'd suggest using more descriptive names:
   - query_time_window_start
   - query_time_window_end



##########
nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-processors/src/main/java/org/apache/nifi/processors/hubspot/IncrementalFieldType.java:
##########
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.hubspot;
+
+public enum IncrementalFieldType {
+    LAST_MODIFIED_DATE("lastmodifieddate"),
+    HS_LAST_MODIFIED_DATE("hs_lastmodifieddate");
+
+    final String value;
+
+    IncrementalFieldType(String value) {
+        this.value = value;
+    }
+
+    String getValue() {
+        return value;
+    }

Review Comment:
   ```suggestion
       public String getValue() {
           return value;
       }
   
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] asfgit closed pull request #6379: NIFI-10463: Fix GetHubSpot incremental loading

Posted by GitBox <gi...@apache.org>.
asfgit closed pull request #6379: NIFI-10463: Fix GetHubSpot incremental loading
URL: https://github.com/apache/nifi/pull/6379


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] Lehel44 commented on a diff in pull request #6379: NIFI-10463: Fix GetHubSpot incremental loading

Posted by GitBox <gi...@apache.org>.
Lehel44 commented on code in PR #6379:
URL: https://github.com/apache/nifi/pull/6379#discussion_r978434795


##########
nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-processors/src/main/java/org/apache/nifi/processors/hubspot/GetHubSpot.java:
##########
@@ -187,61 +243,122 @@ private String getResponseBodyAsString(final ProcessContext context, final HttpR
         }
     }
 
-    private OutputStreamCallback parseHttpResponse(ProcessContext context, String endpoint, StateMap state, HttpResponseEntity response, AtomicInteger objectCountHolder) {
+    private OutputStreamCallback parseHttpResponse(final ProcessContext context, final HttpResponseEntity response, final AtomicInteger total,
+                                                   final Map<String, String> stateMap) {
         return out -> {
             try (final JsonParser jsonParser = JSON_FACTORY.createParser(response.body());
                  final JsonGenerator jsonGenerator = JSON_FACTORY.createGenerator(out, JsonEncoding.UTF8)) {
+                boolean isCursorAvailable = false;
+                final String objectType = context.getProperty(OBJECT_TYPE).getValue();
+                final String cursorKey = String.format(CURSOR_KEY_PATTERN, objectType);
                 while (jsonParser.nextToken() != null) {
+                    if (jsonParser.getCurrentToken() == JsonToken.FIELD_NAME && jsonParser.getCurrentName()
+                            .equals("total")) {
+                        jsonParser.nextToken();
+                        total.set(jsonParser.getIntValue());
+                    }
                     if (jsonParser.getCurrentToken() == JsonToken.FIELD_NAME && jsonParser.getCurrentName()
                             .equals("results")) {
                         jsonParser.nextToken();
                         jsonGenerator.copyCurrentStructure(jsonParser);
-                        objectCountHolder.incrementAndGet();
                     }
                     final String fieldName = jsonParser.getCurrentName();
-                    if (CURSOR_PARAMETER.equals(fieldName)) {
+                    if (PAGING_CURSOR.equals(fieldName)) {
+                        isCursorAvailable = true;
                         jsonParser.nextToken();
-                        Map<String, String> newStateMap = new HashMap<>(state.toMap());
-                        newStateMap.put(endpoint, jsonParser.getText());
-                        updateState(context, newStateMap);
+                        stateMap.put(cursorKey, jsonParser.getText());
                         break;
                     }
                 }
+                if (!isCursorAvailable) {
+                    stateMap.put(cursorKey, NO_PAGING);
+                }
             }
         };
     }
 
-    HttpUriBuilder getBaseUri(final ProcessContext context) {
+    URI getBaseUri(final ProcessContext context) {
         final String path = context.getProperty(OBJECT_TYPE).getValue();
         return webClientServiceProvider.getHttpUriBuilder()
                 .scheme(HTTPS)
                 .host(API_BASE_URI)
-                .encodedPath(path);
+                .encodedPath(path + "/search")
+                .build();
     }
 
-    private HttpResponseEntity getHttpResponseEntity(final String accessToken, final URI uri) {
+    private HttpResponseEntity getHttpResponseEntity(final String accessToken, final URI uri, final String filters) {
+        final JsonInputStreamConverter converter = new JsonInputStreamConverter(filters);
         return webClientServiceProvider.getWebClientService()
-                .get()
+                .post()
                 .uri(uri)
                 .header("Authorization", "Bearer " + accessToken)
+                .header("Content-Type", "application/json")
+                .body(converter.getInputStream(), OptionalLong.of(converter.getByteSize()))
                 .retrieve();
     }
 
-    private URI createUri(final ProcessContext context, final StateMap state) {
-        final String path = context.getProperty(OBJECT_TYPE).getValue();
-        final HttpUriBuilder uriBuilder = getBaseUri(context);
+    String createIncrementalFilters(final ProcessContext context, final Map<String, String> stateMap) {
+        final String limit = context.getProperty(RESULT_LIMIT).getValue();
+        final String objectType = context.getProperty(OBJECT_TYPE).getValue();
+        final HubSpotObjectType hubSpotObjectType = objectTypeLookupMap.get(objectType);
+        final Long incrDelayMs = context.getProperty(INCREMENTAL_DELAY).asTimePeriod(TimeUnit.MILLISECONDS);
+        final String startIncrementalKey = String.format("start: %s", objectType);
+        final String endIncrementalKey = String.format("end: %s", objectType);
+        final String cursorKey = String.format(CURSOR_KEY_PATTERN, objectType);
+
+        final ObjectNode root = OBJECT_MAPPER.createObjectNode();
+        root.put("limit", limit);
 
-        final boolean isLimitSet = context.getProperty(RESULT_LIMIT).evaluateAttributeExpressions().isSet();
-        if (isLimitSet) {
-            final String limit = context.getProperty(RESULT_LIMIT).getValue();
-            uriBuilder.addQueryParameter(LIMIT_PARAMETER, limit);
+        final String cursor = stateMap.get(cursorKey);
+        if (cursor != null && !NO_PAGING.equals(cursor)) {
+            root.put(PAGING_CURSOR, stateMap.get(cursorKey));
         }
+        final boolean isIncremental = context.getProperty(IS_INCREMENTAL).asBoolean();
+        if (isIncremental) {
+
+            final String hubspotSpecificIncrementalFieldName = hubSpotObjectType.getLastModifiedDateType().getValue();
+            final String lastStartTime = stateMap.get(startIncrementalKey);
+            final String lastEndTime = stateMap.getOrDefault(endIncrementalKey, context.getProperty(INITIAL_INCREMENTAL_FILTER).getValue());
+
+            String currentStartTime;
+            String currentEndTime;
+
+            if (cursor != null && !NO_PAGING.equals(cursor)) {
+                currentStartTime = lastStartTime;
+                // lastEndTime can be null if incremental loading was turned off beforehand
+                currentEndTime = lastEndTime != null ? lastEndTime : String.valueOf(getCurrentEpochTime());
+            } else {
+                currentStartTime = lastEndTime;
+                final long delayedCurrentEndTime = incrDelayMs != null ? getCurrentEpochTime() - incrDelayMs : getCurrentEpochTime();
+                currentEndTime = String.valueOf(delayedCurrentEndTime);
 
-        final String cursor = state.get(path);
-        if (cursor != null) {
-            uriBuilder.addQueryParameter(CURSOR_PARAMETER, cursor);
+                stateMap.put(startIncrementalKey, currentStartTime);
+                stateMap.put(endIncrementalKey, currentEndTime);
+            }
+
+            final ArrayNode filters = OBJECT_MAPPER.createArrayNode();
+
+            if (currentStartTime != null) {
+                final ObjectNode greaterThanFilterNode = OBJECT_MAPPER.createObjectNode();
+                greaterThanFilterNode.put("propertyName", hubspotSpecificIncrementalFieldName);
+                greaterThanFilterNode.put("operator", "GT");
+                greaterThanFilterNode.put("value", currentStartTime);
+                filters.add(greaterThanFilterNode);
+            }
+
+            final ObjectNode lessThanFilterNode = OBJECT_MAPPER.createObjectNode();
+            lessThanFilterNode.put("propertyName", hubspotSpecificIncrementalFieldName);
+            lessThanFilterNode.put("operator", "LT");
+            lessThanFilterNode.put("value", currentEndTime);
+            filters.add(lessThanFilterNode);
+
+            root.set("filters", filters);
         }
-        return uriBuilder.build();
+        return root.toString();
+    }
+
+    long getCurrentEpochTime() {
+        return Instant.now().toEpochMilli();
     }
 
     private StateMap getStateMap(final ProcessContext context) {

Review Comment:
   I think we don't want to update the state when the session fails.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] Lehel44 commented on a diff in pull request #6379: NIFI-10463: Fix GetHubSpot incremental loading

Posted by GitBox <gi...@apache.org>.
Lehel44 commented on code in PR #6379:
URL: https://github.com/apache/nifi/pull/6379#discussion_r979481249


##########
nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-processors/src/main/java/org/apache/nifi/processors/hubspot/GetHubSpot.java:
##########
@@ -187,78 +257,158 @@ private String getResponseBodyAsString(final ProcessContext context, final HttpR
         }
     }
 
-    private OutputStreamCallback parseHttpResponse(ProcessContext context, String endpoint, StateMap state, HttpResponseEntity response, AtomicInteger objectCountHolder) {
+    private OutputStreamCallback parseHttpResponse(final ProcessContext context, final HttpResponseEntity response, final AtomicInteger total,
+                                                   final Map<String, String> stateMap) {
         return out -> {
             try (final JsonParser jsonParser = JSON_FACTORY.createParser(response.body());
                  final JsonGenerator jsonGenerator = JSON_FACTORY.createGenerator(out, JsonEncoding.UTF8)) {
+                boolean isCursorAvailable = false;
+                final String objectType = context.getProperty(OBJECT_TYPE).getValue();
+                final String cursorKey = String.format(CURSOR_KEY_PATTERN, objectType);
                 while (jsonParser.nextToken() != null) {
+                    if (jsonParser.getCurrentToken() == JsonToken.FIELD_NAME && jsonParser.getCurrentName()
+                            .equals("total")) {
+                        jsonParser.nextToken();
+                        total.set(jsonParser.getIntValue());
+                    }
                     if (jsonParser.getCurrentToken() == JsonToken.FIELD_NAME && jsonParser.getCurrentName()
                             .equals("results")) {
                         jsonParser.nextToken();
                         jsonGenerator.copyCurrentStructure(jsonParser);
-                        objectCountHolder.incrementAndGet();
                     }
                     final String fieldName = jsonParser.getCurrentName();
-                    if (CURSOR_PARAMETER.equals(fieldName)) {
+                    if (PAGING_CURSOR.equals(fieldName)) {
+                        isCursorAvailable = true;
                         jsonParser.nextToken();
-                        Map<String, String> newStateMap = new HashMap<>(state.toMap());
-                        newStateMap.put(endpoint, jsonParser.getText());
-                        updateState(context, newStateMap);
+                        stateMap.put(cursorKey, jsonParser.getText());
                         break;
                     }
                 }
+                if (!isCursorAvailable) {
+                    stateMap.put(cursorKey, NO_PAGING);
+                }
             }
         };
     }
 
-    HttpUriBuilder getBaseUri(final ProcessContext context) {
+    URI getBaseUri(final ProcessContext context) {
         final String path = context.getProperty(OBJECT_TYPE).getValue();
         return webClientServiceProvider.getHttpUriBuilder()
                 .scheme(HTTPS)
                 .host(API_BASE_URI)
-                .encodedPath(path);
+                .encodedPath(path + "/search")
+                .build();
     }
 
-    private HttpResponseEntity getHttpResponseEntity(final String accessToken, final URI uri) {
-        return webClientServiceProvider.getWebClientService()
-                .get()
-                .uri(uri)
-                .header("Authorization", "Bearer " + accessToken)
-                .retrieve();
+    private HttpResponseEntity getHttpResponseEntity(final String accessToken, final URI uri, final String filters) {
+        final InputStream inputStream = IOUtils.toInputStream(filters, StandardCharsets.UTF_8);
+        try {
+            return webClientServiceProvider.getWebClientService()
+                    .post()
+                    .uri(uri)
+                    .header("Authorization", "Bearer " + accessToken)
+                    .header("Content-Type", "application/json")
+                    .body(inputStream, OptionalLong.of(inputStream.available()))
+                    .retrieve();
+        } catch (IOException e) {
+            throw new ProcessException("Could not transform incremental filters to input stream", e);
+        }
     }
 
-    private URI createUri(final ProcessContext context, final StateMap state) {
-        final String path = context.getProperty(OBJECT_TYPE).getValue();
-        final HttpUriBuilder uriBuilder = getBaseUri(context);
+    private String createIncrementalFilters(final ProcessContext context, final Map<String, String> stateMap) {
+        final String limit = context.getProperty(RESULT_LIMIT).getValue();
+        final String objectType = context.getProperty(OBJECT_TYPE).getValue();
+        final HubSpotObjectType hubSpotObjectType = OBJECT_TYPE_LOOKUP_MAP.get(objectType);
+        final Long incrDelayMs = context.getProperty(INCREMENTAL_DELAY).asTimePeriod(TimeUnit.MILLISECONDS);
+        final String cursorKey = String.format(CURSOR_KEY_PATTERN, objectType);
 
-        final boolean isLimitSet = context.getProperty(RESULT_LIMIT).evaluateAttributeExpressions().isSet();
-        if (isLimitSet) {
-            final String limit = context.getProperty(RESULT_LIMIT).getValue();
-            uriBuilder.addQueryParameter(LIMIT_PARAMETER, limit);
+        final ObjectNode root = OBJECT_MAPPER.createObjectNode();
+        root.put("limit", limit);
+
+        final String cursor = stateMap.get(cursorKey);
+        if (cursor != null && !NO_PAGING.equals(cursor)) {
+            root.put(PAGING_CURSOR, stateMap.get(cursorKey));
         }
+        final boolean isIncremental = context.getProperty(IS_INCREMENTAL).asBoolean();
+        if (isIncremental) {
+            final String initialStartTimeValue = context.getProperty(INCREMENTAL_INITIAL_START_TIME).getValue();
+            String initialStartTimeEpoch = getInitialStartTimeEpoch(initialStartTimeValue);
+            final String hubspotSpecificIncrementalFieldName = hubSpotObjectType.getLastModifiedDateType().getValue();
+            final String lastStartTime = stateMap.get(START_INCREMENTAL_KEY);
+            final String lastEndTime = stateMap.getOrDefault(END_INCREMENTAL_KEY, initialStartTimeEpoch);

Review Comment:
   Very good idea, thank you!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org