You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by GitBox <gi...@apache.org> on 2022/09/23 09:19:00 UTC

[GitHub] [nifi] Lehel44 commented on a diff in pull request #6379: NIFI-10463: Fix GetHubSpot incremental loading

Lehel44 commented on code in PR #6379:
URL: https://github.com/apache/nifi/pull/6379#discussion_r978434795


##########
nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-processors/src/main/java/org/apache/nifi/processors/hubspot/GetHubSpot.java:
##########
@@ -187,61 +243,122 @@ private String getResponseBodyAsString(final ProcessContext context, final HttpR
         }
     }
 
-    private OutputStreamCallback parseHttpResponse(ProcessContext context, String endpoint, StateMap state, HttpResponseEntity response, AtomicInteger objectCountHolder) {
+    private OutputStreamCallback parseHttpResponse(final ProcessContext context, final HttpResponseEntity response, final AtomicInteger total,
+                                                   final Map<String, String> stateMap) {
         return out -> {
             try (final JsonParser jsonParser = JSON_FACTORY.createParser(response.body());
                  final JsonGenerator jsonGenerator = JSON_FACTORY.createGenerator(out, JsonEncoding.UTF8)) {
+                boolean isCursorAvailable = false;
+                final String objectType = context.getProperty(OBJECT_TYPE).getValue();
+                final String cursorKey = String.format(CURSOR_KEY_PATTERN, objectType);
                 while (jsonParser.nextToken() != null) {
+                    if (jsonParser.getCurrentToken() == JsonToken.FIELD_NAME && jsonParser.getCurrentName()
+                            .equals("total")) {
+                        jsonParser.nextToken();
+                        total.set(jsonParser.getIntValue());
+                    }
                     if (jsonParser.getCurrentToken() == JsonToken.FIELD_NAME && jsonParser.getCurrentName()
                             .equals("results")) {
                         jsonParser.nextToken();
                         jsonGenerator.copyCurrentStructure(jsonParser);
-                        objectCountHolder.incrementAndGet();
                     }
                     final String fieldName = jsonParser.getCurrentName();
-                    if (CURSOR_PARAMETER.equals(fieldName)) {
+                    if (PAGING_CURSOR.equals(fieldName)) {
+                        isCursorAvailable = true;
                         jsonParser.nextToken();
-                        Map<String, String> newStateMap = new HashMap<>(state.toMap());
-                        newStateMap.put(endpoint, jsonParser.getText());
-                        updateState(context, newStateMap);
+                        stateMap.put(cursorKey, jsonParser.getText());
                         break;
                     }
                 }
+                if (!isCursorAvailable) {
+                    stateMap.put(cursorKey, NO_PAGING);
+                }
             }
         };
     }
 
-    HttpUriBuilder getBaseUri(final ProcessContext context) {
+    URI getBaseUri(final ProcessContext context) {
         final String path = context.getProperty(OBJECT_TYPE).getValue();
         return webClientServiceProvider.getHttpUriBuilder()
                 .scheme(HTTPS)
                 .host(API_BASE_URI)
-                .encodedPath(path);
+                .encodedPath(path + "/search")
+                .build();
     }
 
-    private HttpResponseEntity getHttpResponseEntity(final String accessToken, final URI uri) {
+    private HttpResponseEntity getHttpResponseEntity(final String accessToken, final URI uri, final String filters) {
+        final JsonInputStreamConverter converter = new JsonInputStreamConverter(filters);
         return webClientServiceProvider.getWebClientService()
-                .get()
+                .post()
                 .uri(uri)
                 .header("Authorization", "Bearer " + accessToken)
+                .header("Content-Type", "application/json")
+                .body(converter.getInputStream(), OptionalLong.of(converter.getByteSize()))
                 .retrieve();
     }
 
-    private URI createUri(final ProcessContext context, final StateMap state) {
-        final String path = context.getProperty(OBJECT_TYPE).getValue();
-        final HttpUriBuilder uriBuilder = getBaseUri(context);
+    String createIncrementalFilters(final ProcessContext context, final Map<String, String> stateMap) {
+        final String limit = context.getProperty(RESULT_LIMIT).getValue();
+        final String objectType = context.getProperty(OBJECT_TYPE).getValue();
+        final HubSpotObjectType hubSpotObjectType = objectTypeLookupMap.get(objectType);
+        final Long incrDelayMs = context.getProperty(INCREMENTAL_DELAY).asTimePeriod(TimeUnit.MILLISECONDS);
+        final String startIncrementalKey = String.format("start: %s", objectType);
+        final String endIncrementalKey = String.format("end: %s", objectType);
+        final String cursorKey = String.format(CURSOR_KEY_PATTERN, objectType);
+
+        final ObjectNode root = OBJECT_MAPPER.createObjectNode();
+        root.put("limit", limit);
 
-        final boolean isLimitSet = context.getProperty(RESULT_LIMIT).evaluateAttributeExpressions().isSet();
-        if (isLimitSet) {
-            final String limit = context.getProperty(RESULT_LIMIT).getValue();
-            uriBuilder.addQueryParameter(LIMIT_PARAMETER, limit);
+        final String cursor = stateMap.get(cursorKey);
+        if (cursor != null && !NO_PAGING.equals(cursor)) {
+            root.put(PAGING_CURSOR, stateMap.get(cursorKey));
         }
+        final boolean isIncremental = context.getProperty(IS_INCREMENTAL).asBoolean();
+        if (isIncremental) {
+
+            final String hubspotSpecificIncrementalFieldName = hubSpotObjectType.getLastModifiedDateType().getValue();
+            final String lastStartTime = stateMap.get(startIncrementalKey);
+            final String lastEndTime = stateMap.getOrDefault(endIncrementalKey, context.getProperty(INITIAL_INCREMENTAL_FILTER).getValue());
+
+            String currentStartTime;
+            String currentEndTime;
+
+            if (cursor != null && !NO_PAGING.equals(cursor)) {
+                currentStartTime = lastStartTime;
+                // lastEndTime can be null if incremental loading was turned off beforehand
+                currentEndTime = lastEndTime != null ? lastEndTime : String.valueOf(getCurrentEpochTime());
+            } else {
+                currentStartTime = lastEndTime;
+                final long delayedCurrentEndTime = incrDelayMs != null ? getCurrentEpochTime() - incrDelayMs : getCurrentEpochTime();
+                currentEndTime = String.valueOf(delayedCurrentEndTime);
 
-        final String cursor = state.get(path);
-        if (cursor != null) {
-            uriBuilder.addQueryParameter(CURSOR_PARAMETER, cursor);
+                stateMap.put(startIncrementalKey, currentStartTime);
+                stateMap.put(endIncrementalKey, currentEndTime);
+            }
+
+            final ArrayNode filters = OBJECT_MAPPER.createArrayNode();
+
+            if (currentStartTime != null) {
+                final ObjectNode greaterThanFilterNode = OBJECT_MAPPER.createObjectNode();
+                greaterThanFilterNode.put("propertyName", hubspotSpecificIncrementalFieldName);
+                greaterThanFilterNode.put("operator", "GT");
+                greaterThanFilterNode.put("value", currentStartTime);
+                filters.add(greaterThanFilterNode);
+            }
+
+            final ObjectNode lessThanFilterNode = OBJECT_MAPPER.createObjectNode();
+            lessThanFilterNode.put("propertyName", hubspotSpecificIncrementalFieldName);
+            lessThanFilterNode.put("operator", "LT");
+            lessThanFilterNode.put("value", currentEndTime);
+            filters.add(lessThanFilterNode);
+
+            root.set("filters", filters);
         }
-        return uriBuilder.build();
+        return root.toString();
+    }
+
+    long getCurrentEpochTime() {
+        return Instant.now().toEpochMilli();
     }
 
     private StateMap getStateMap(final ProcessContext context) {

Review Comment:
   I think we don't want to update the state when the session fails.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org