You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by GitBox <gi...@apache.org> on 2022/11/20 22:29:40 UTC

[GitHub] [nifi] turcsanyip commented on a diff in pull request #6504: NIFI-10618: Add Asana connector

turcsanyip commented on code in PR #6504:
URL: https://github.com/apache/nifi/pull/6504#discussion_r1027320316


##########
nifi-nar-bundles/nifi-asana-bundle/nifi-asana-services-api/pom.xml:
##########
@@ -0,0 +1,39 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements. See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License. You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>nifi-asana-bundle</artifactId>
+        <version>1.19.0-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>nifi-asana-services-api</artifactId>
+    <packaging>jar</packaging>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-api</artifactId>
+            <version>1.19.0-SNAPSHOT</version>
+        </dependency>

Review Comment:
   `nifi-api`'s version and scope are defined in `dependecyManagement` in parent pom and they do not need to be specified here.
   ```suggestion
           <dependency>
               <groupId>org.apache.nifi</groupId>
               <artifactId>nifi-api</artifactId>
           </dependency>
   ```



##########
nifi-nar-bundles/nifi-asana-bundle/nifi-asana-services/pom.xml:
##########
@@ -0,0 +1,64 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements. See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License. You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>nifi-asana-bundle</artifactId>
+        <version>1.19.0-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>nifi-asana-services</artifactId>
+    <packaging>jar</packaging>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-api</artifactId>
+            <version>1.19.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-mock</artifactId>
+            <version>1.19.0-SNAPSHOT</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>com.squareup.okhttp3</groupId>
+            <artifactId>mockwebserver</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>commons-io</groupId>
+            <artifactId>commons-io</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.asana</groupId>
+            <artifactId>asana</artifactId>
+        </dependency>

Review Comment:
   The `asana` dependency should be `provided` here because it is available from `nifi-asana-services-api-nar` which is the parent nar of this nar (jar-s from the parent nar will be loaded first and they are available in the child nar-s).
   ```suggestion
           <dependency>
               <groupId>com.asana</groupId>
               <artifactId>asana</artifactId>
               <scope>provided</scope>
           </dependency>
   ```



##########
nifi-nar-bundles/nifi-asana-bundle/nifi-asana-processors/pom.xml:
##########
@@ -0,0 +1,69 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements. See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License. You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>nifi-asana-bundle</artifactId>
+        <version>1.19.0-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>nifi-asana-processors</artifactId>
+    <packaging>jar</packaging>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-api</artifactId>
+            <version>1.19.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-mock</artifactId>
+            <version>1.19.0-SNAPSHOT</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>commons-io</groupId>
+            <artifactId>commons-io</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-collections4</artifactId>
+            <version>4.4</version>
+        </dependency>
+        <dependency>
+            <groupId>com.asana</groupId>
+            <artifactId>asana</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-utils</artifactId>
+            <version>1.19.0-SNAPSHOT</version>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-asana-services-api</artifactId>
+        </dependency>

Review Comment:
   Similar to `nifi-asana-services`' pom:
   - no version needed for `nifi-api`
   - `compile` scope for `nifi-utils` is redundant
   - `asana` and `nifi-asana-services-api` should be provided



##########
nifi-nar-bundles/nifi-asana-bundle/nifi-asana-processors/src/main/java/org/apache/nifi/processors/asana/GetAsanaObject.java:
##########
@@ -0,0 +1,457 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.asana;
+
+import static java.lang.String.format;
+import static java.util.Collections.singletonMap;
+import static java.util.stream.Collectors.groupingBy;
+import static java.util.stream.Collectors.joining;
+import static org.apache.commons.collections4.ListUtils.partition;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import org.apache.http.entity.ContentType;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.PrimaryNodeOnly;
+import org.apache.nifi.annotation.behavior.Stateful;
+import org.apache.nifi.annotation.behavior.TriggerSerially;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.components.state.StateMap;
+import org.apache.nifi.controller.asana.AsanaClient;
+import org.apache.nifi.controller.asana.AsanaClientProviderService;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.asana.utils.AsanaObject;
+import org.apache.nifi.processors.asana.utils.AsanaObjectFetcher;
+import org.apache.nifi.processors.asana.utils.AsanaObjectState;
+import org.apache.nifi.processors.asana.utils.AsanaProjectEventFetcher;
+import org.apache.nifi.processors.asana.utils.AsanaProjectFetcher;
+import org.apache.nifi.processors.asana.utils.AsanaProjectMembershipFetcher;
+import org.apache.nifi.processors.asana.utils.AsanaProjectStatusAttachmentFetcher;
+import org.apache.nifi.processors.asana.utils.AsanaProjectStatusFetcher;
+import org.apache.nifi.processors.asana.utils.AsanaStoryFetcher;
+import org.apache.nifi.processors.asana.utils.AsanaTagFetcher;
+import org.apache.nifi.processors.asana.utils.AsanaTaskAttachmentFetcher;
+import org.apache.nifi.processors.asana.utils.AsanaTaskFetcher;
+import org.apache.nifi.processors.asana.utils.AsanaTeamFetcher;
+import org.apache.nifi.processors.asana.utils.AsanaTeamMemberFetcher;
+import org.apache.nifi.processors.asana.utils.AsanaUserFetcher;
+import org.apache.nifi.reporting.InitializationException;
+
+@TriggerSerially
+@PrimaryNodeOnly
+@Stateful(scopes = {Scope.CLUSTER}, description = "Fingerprints of items in the last successful query are stored in order to enable incremental loading and change detection.")
+@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
+@WritesAttribute(attribute = GetAsanaObject.ASANA_GID, description = "Global ID of the object in Asana.")
+@Tags({"asana", "source", "ingest"})
+@CapabilityDescription("This processor collects data from Asana")
+public class GetAsanaObject extends AbstractProcessor {
+
+    protected static final String ASANA_GID = "asana.gid";
+    protected static final String AV_NAME_COLLECT_TASKS = "asana-collect-tasks";
+    protected static final String AV_NAME_COLLECT_TASK_ATTACHMENTS = "asana-collect-task-attachments";
+    protected static final String AV_NAME_COLLECT_PROJECTS = "asana-collect-projects";
+    protected static final String AV_NAME_COLLECT_TAGS = "asana-collect-tags";
+    protected static final String AV_NAME_COLLECT_USERS = "asana-collect-users";
+    protected static final String AV_NAME_COLLECT_PROJECT_MEMBERS = "asana-collect-project-members";
+    protected static final String AV_NAME_COLLECT_TEAMS = "asana-collect-teams";
+    protected static final String AV_NAME_COLLECT_TEAM_MEMBERS = "asana-collect-team-members";
+    protected static final String AV_NAME_COLLECT_STORIES = "asana-collect-stories";
+    protected static final String AV_NAME_COLLECT_PROJECT_STATUS_UPDATES = "asana-collect-project-status-updates";
+    protected static final String AV_NAME_COLLECT_PROJECT_STATUS_ATTACHMENTS = "asana-collect-project-status-attachments";
+    protected static final String AV_NAME_COLLECT_PROJECT_EVENTS = "asana-collect-project-events";
+    protected static final String ASANA_CONTROLLER_SERVICE = "asana-controller-service";
+    protected static final String ASANA_OBJECT_TYPE = "asana-object-type";
+    protected static final String ASANA_PROJECT_NAME = "asana-project-name";
+    protected static final String ASANA_SECTION_NAME = "asana-section-name";
+    protected static final String ASANA_TAG_NAME = "asana-tag-name";
+    protected static final String ASANA_TEAM_NAME = "asana-team-name";
+    protected static final String ASANA_OUTPUT_BATCH_SIZE = "asana-output-batch-size";
+    protected static final String REL_NAME_NEW = "new";
+    protected static final String REL_NAME_UPDATED = "updated";
+    protected static final String REL_NAME_REMOVED = "removed";
+
+    protected static final AllowableValue AV_COLLECT_TASKS = new AllowableValue(
+            AV_NAME_COLLECT_TASKS,
+            "Tasks",
+            "Collect tasks matching to the specified conditions.");
+
+    protected static final AllowableValue AV_COLLECT_TASK_ATTACHMENTS = new AllowableValue(
+            AV_NAME_COLLECT_TASK_ATTACHMENTS,
+            "Task Attachments",
+            "Collect attached files of tasks matching to the specified conditions."
+    );
+
+    protected static final AllowableValue AV_COLLECT_PROJECTS = new AllowableValue(
+            AV_NAME_COLLECT_PROJECTS,
+            "Projects",
+            "Collect projects of the workspace."
+    );
+
+    protected static final AllowableValue AV_COLLECT_TAGS = new AllowableValue(
+            AV_NAME_COLLECT_TAGS,
+            "Tags",
+            "Collect tags of the workspace."
+    );
+
+    protected static final AllowableValue AV_COLLECT_USERS = new AllowableValue(
+            AV_NAME_COLLECT_USERS,
+            "Users",
+            "Collect users assigned to the workspace."
+    );
+
+    protected static final AllowableValue AV_COLLECT_PROJECT_MEMBERS = new AllowableValue(
+            AV_NAME_COLLECT_PROJECT_MEMBERS,
+            "Members of a Project",
+            "Collect users assigned to the specified project."
+    );
+
+    protected static final AllowableValue AV_COLLECT_TEAMS = new AllowableValue(
+            AV_NAME_COLLECT_TEAMS,
+            "Teams",
+            "Collect teams of the workspace."
+    );
+
+    protected static final AllowableValue AV_COLLECT_TEAM_MEMBERS = new AllowableValue(
+            AV_NAME_COLLECT_TEAM_MEMBERS,
+            "Team Members",
+            "Collect users assigned to the specified team."
+    );
+
+    protected static final AllowableValue AV_COLLECT_STORIES = new AllowableValue(
+            AV_NAME_COLLECT_STORIES,
+            "Stories of Tasks",
+            "Collect stories (comments) of of tasks matching to the specified conditions."
+    );
+
+    protected static final AllowableValue AV_COLLECT_PROJECT_STATUS_UPDATES = new AllowableValue(
+            AV_NAME_COLLECT_PROJECT_STATUS_UPDATES,
+            "Status Updates of a Project",
+            "Collect status updates of the specified project."
+    );
+
+    protected static final AllowableValue AV_COLLECT_PROJECT_STATUS_ATTACHMENTS = new AllowableValue(
+            AV_NAME_COLLECT_PROJECT_STATUS_ATTACHMENTS,
+            "Attachments of Status Updates",
+            "Collect attached files of project status updates."
+    );
+
+    protected static final AllowableValue AV_COLLECT_PROJECT_EVENTS = new AllowableValue(
+            AV_NAME_COLLECT_PROJECT_EVENTS,
+            "Events of a Project",
+            "Collect various events happening on the specified project and on its' tasks."
+    );
+
+    protected static final PropertyDescriptor PROP_ASANA_CONTROLLER_SERVICE = new PropertyDescriptor.Builder()
+            .name(ASANA_CONTROLLER_SERVICE)
+            .displayName("Asana Controller Service")
+            .description("Specify which controller service to use for accessing Asana.")
+            .required(true)
+            .identifiesControllerService(AsanaClientProviderService.class)
+            .build();
+
+    protected static final PropertyDescriptor PROP_ASANA_OBJECT_TYPE = new PropertyDescriptor.Builder()
+            .name(ASANA_OBJECT_TYPE)
+            .displayName("Object Type")
+            .description("Specify what kind of objects to be collected from Asana")
+            .required(true)
+            .allowableValues(
+                    AV_COLLECT_TASKS,
+                    AV_COLLECT_TASK_ATTACHMENTS,
+                    AV_COLLECT_PROJECTS,
+                    AV_COLLECT_TAGS,
+                    AV_COLLECT_USERS,
+                    AV_COLLECT_PROJECT_MEMBERS,
+                    AV_COLLECT_TEAMS,
+                    AV_COLLECT_TEAM_MEMBERS,
+                    AV_COLLECT_STORIES,
+                    AV_COLLECT_PROJECT_STATUS_UPDATES,
+                    AV_COLLECT_PROJECT_STATUS_ATTACHMENTS,
+                    AV_COLLECT_PROJECT_EVENTS)
+            .defaultValue(AV_COLLECT_TASKS.getValue())
+            .build();
+
+    protected static final PropertyDescriptor PROP_ASANA_PROJECT = new PropertyDescriptor.Builder()
+            .name(ASANA_PROJECT_NAME)
+            .displayName("Project Name")
+            .description("Fetch only objects in this project. Case sensitive.")
+            .required(true)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .dependsOn(
+                PROP_ASANA_OBJECT_TYPE,
+                    AV_COLLECT_TASKS,
+                    AV_COLLECT_TASK_ATTACHMENTS,
+                    AV_COLLECT_PROJECT_MEMBERS,
+                    AV_COLLECT_STORIES,
+                    AV_COLLECT_PROJECT_STATUS_UPDATES,
+                    AV_COLLECT_PROJECT_STATUS_ATTACHMENTS,
+                    AV_COLLECT_PROJECT_EVENTS)
+            .build();
+
+    protected static final PropertyDescriptor PROP_ASANA_SECTION = new PropertyDescriptor.Builder()
+            .name(ASANA_SECTION_NAME)
+            .displayName("Section Name")
+            .description("Fetch only objects in this section. Case sensitive.")
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .dependsOn(PROP_ASANA_OBJECT_TYPE, AV_COLLECT_TASKS, AV_COLLECT_TASK_ATTACHMENTS, AV_COLLECT_STORIES)
+            .build();
+
+    protected static final PropertyDescriptor PROP_ASANA_TAG = new PropertyDescriptor.Builder()
+            .name(ASANA_TAG_NAME)
+            .displayName("Tag")
+            .description("Fetch only objects having this tag. Case sensitive.")
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .dependsOn(PROP_ASANA_OBJECT_TYPE, AV_COLLECT_TASKS, AV_COLLECT_TASK_ATTACHMENTS, AV_COLLECT_STORIES)
+            .build();
+
+    protected static final PropertyDescriptor PROP_ASANA_TEAM_NAME = new PropertyDescriptor.Builder()
+            .name(ASANA_TEAM_NAME)
+            .displayName("Team")
+            .description("Team name. Case sensitive.")
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .dependsOn(PROP_ASANA_OBJECT_TYPE, AV_COLLECT_TEAM_MEMBERS)
+            .build();
+
+    protected static final PropertyDescriptor PROP_ASANA_OUTPUT_BATCH_SIZE = new PropertyDescriptor.Builder()
+            .name(ASANA_OUTPUT_BATCH_SIZE)
+            .displayName("Output Batch Size")
+            .description("The number of items batched together in a single Flow File. If set to 1 (default), then each item is"
+                    + " transferred in a separate Flow File and each will have an asana.gid attribute, to help identifying"
+                    + " the fetched item on the server side, if needed. If the batch size is greater than 1, then the"
+                    + " specified amount of items are batched together in a single Flow File as a Json array, and the"
+                    + " Flow Files won't have the asana.gid attribute.")
+            .defaultValue("1")
+            .required(true)
+            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+            .build();
+
+    protected static final List<PropertyDescriptor> DESCRIPTORS = Collections.unmodifiableList(Arrays.asList(
+            PROP_ASANA_CONTROLLER_SERVICE,
+            PROP_ASANA_OBJECT_TYPE,
+            PROP_ASANA_PROJECT,
+            PROP_ASANA_SECTION,
+            PROP_ASANA_TEAM_NAME,
+            PROP_ASANA_TAG,
+            PROP_ASANA_OUTPUT_BATCH_SIZE
+    ));
+
+    protected static final Relationship REL_NEW = new Relationship.Builder()
+            .name(REL_NAME_NEW)
+            .description("Newly collected objects are routed to this relationship.")
+            .build();
+
+    protected static final Relationship REL_UPDATED = new Relationship.Builder()
+            .name(REL_NAME_UPDATED)
+            .description("Objects that have already been collected earlier, but were updated since, are routed to this relationship.")
+            .build();
+
+    protected static final Relationship REL_REMOVED = new Relationship.Builder()
+            .name(REL_NAME_REMOVED)
+            .description("Notification about deleted objects are routed to this relationship. "
+                    + "Flow files will not have any payload. IDs of the resources no longer exist "
+                    + "are carried by the asana.gid attribute of the generated FlowFiles.")
+            .build();
+
+    protected static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
+            REL_NEW,
+            REL_UPDATED,
+            REL_REMOVED
+    )));
+
+    private static final Scope STATE_STORAGE_SCOPE = Scope.CLUSTER;
+
+    private volatile AsanaObjectFetcher objectFetcher;
+    private volatile Integer batchSize;
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return RELATIONSHIPS;
+    }
+
+    @Override
+    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return DESCRIPTORS;
+    }
+
+    @OnScheduled
+    public synchronized void onScheduled(final ProcessContext context) throws InitializationException {

Review Comment:
   We don't use `synchronized` for `onScheduled()`. It is not necessary, because the framework always calls it on a single thread.



##########
nifi-nar-bundles/nifi-asana-bundle/nifi-asana-services-api-nar/src/main/resources/META-INF/NOTICE:
##########
@@ -0,0 +1,51 @@
+nifi-asana-nar
+Copyright 2015-2022 The Apache Software Foundation
+
+This product includes software developed at
+The Apache Software Foundation (http://www.apache.org/).
+
+===========================================
+Apache Software License v2
+===========================================
+
+The following binary components are provided under the Apache Software License v2
+
+  (ASLv2) Apache Commons IO
+    The following NOTICE information applies:
+      Apache Commons IO
+      Copyright 2002-2017 The Apache Software Foundation
+
+  (ASLv2) Jackson JSON processor
+    The following NOTICE information applies:
+      # Jackson JSON processor
+
+      Jackson is a high-performance, Free/Open Source JSON processing library.
+      It was originally written by Tatu Saloranta (tatu.saloranta@iki.fi), and has
+      been in development since 2007.
+      It is currently developed by a community of developers, as well as supported
+      commercially by FasterXML.com.
+
+      ## Licensing
+
+      Jackson core and extension components may licensed under different licenses.
+      To find the details that apply to this artifact see the accompanying LICENSE file.
+      For more information, including possible other licensing options, contact
+      FasterXML.com (http://fasterxml.com).
+
+      ## Credits
+
+      A list of contributors may be found from CREDITS file, which is included
+      in some artifacts (usually source distributions); but is always available
+      from the source code management (SCM) system project uses.
+
+
+===========================================
+The MIT License
+===========================================
+
+The following binary components are provided under the MIT License
+
+  (MIT License) Java client library for the Asana API
+    The following NOTICE information applies:
+      Asana
+      Copyright (c) 2015

Review Comment:
   Asana library brings in a couple of transitive dependencies that also need to be added in the NOTICE file (e.g. checker-qual). Please check the jar list in the nar file after the build and add the necessary entries in the NOTICE file.



##########
nifi-nar-bundles/nifi-asana-bundle/nifi-asana-services/pom.xml:
##########
@@ -0,0 +1,64 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements. See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License. You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>nifi-asana-bundle</artifactId>
+        <version>1.19.0-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>nifi-asana-services</artifactId>
+    <packaging>jar</packaging>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-api</artifactId>
+            <version>1.19.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-mock</artifactId>
+            <version>1.19.0-SNAPSHOT</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>com.squareup.okhttp3</groupId>
+            <artifactId>mockwebserver</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>commons-io</groupId>
+            <artifactId>commons-io</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.asana</groupId>
+            <artifactId>asana</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-utils</artifactId>
+            <version>1.19.0-SNAPSHOT</version>
+            <scope>compile</scope>
+        </dependency>

Review Comment:
   `compile` scope is the default and we don't use it explicitly if not needed.
   ```suggestion
           <dependency>
               <groupId>org.apache.nifi</groupId>
               <artifactId>nifi-utils</artifactId>
               <version>1.19.0-SNAPSHOT</version>
           </dependency>
   ```



##########
nifi-nar-bundles/nifi-asana-bundle/nifi-asana-services/pom.xml:
##########
@@ -0,0 +1,64 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements. See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License. You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>nifi-asana-bundle</artifactId>
+        <version>1.19.0-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>nifi-asana-services</artifactId>
+    <packaging>jar</packaging>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-api</artifactId>
+            <version>1.19.0-SNAPSHOT</version>
+        </dependency>

Review Comment:
   ```suggestion
           <dependency>
               <groupId>org.apache.nifi</groupId>
               <artifactId>nifi-api</artifactId>
           </dependency>
   ```



##########
nifi-nar-bundles/nifi-asana-bundle/nifi-asana-services-api-nar/src/main/resources/META-INF/NOTICE:
##########
@@ -0,0 +1,51 @@
+nifi-asana-nar
+Copyright 2015-2022 The Apache Software Foundation
+
+This product includes software developed at
+The Apache Software Foundation (http://www.apache.org/).
+
+===========================================
+Apache Software License v2
+===========================================
+
+The following binary components are provided under the Apache Software License v2
+
+  (ASLv2) Apache Commons IO
+    The following NOTICE information applies:
+      Apache Commons IO
+      Copyright 2002-2017 The Apache Software Foundation
+
+  (ASLv2) Jackson JSON processor
+    The following NOTICE information applies:
+      # Jackson JSON processor
+
+      Jackson is a high-performance, Free/Open Source JSON processing library.
+      It was originally written by Tatu Saloranta (tatu.saloranta@iki.fi), and has
+      been in development since 2007.
+      It is currently developed by a community of developers, as well as supported
+      commercially by FasterXML.com.
+
+      ## Licensing
+
+      Jackson core and extension components may licensed under different licenses.
+      To find the details that apply to this artifact see the accompanying LICENSE file.
+      For more information, including possible other licensing options, contact
+      FasterXML.com (http://fasterxml.com).
+
+      ## Credits
+
+      A list of contributors may be found from CREDITS file, which is included
+      in some artifacts (usually source distributions); but is always available
+      from the source code management (SCM) system project uses.

Review Comment:
   As far as I see, Jackson is not present in the built nar so this entry should be deleted.



##########
nifi-nar-bundles/nifi-asana-bundle/nifi-asana-processors/pom.xml:
##########
@@ -0,0 +1,69 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements. See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License. You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>nifi-asana-bundle</artifactId>
+        <version>1.19.0-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>nifi-asana-processors</artifactId>
+    <packaging>jar</packaging>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-api</artifactId>
+            <version>1.19.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-mock</artifactId>
+            <version>1.19.0-SNAPSHOT</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>commons-io</groupId>
+            <artifactId>commons-io</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-collections4</artifactId>
+            <version>4.4</version>
+        </dependency>
+        <dependency>
+            <groupId>com.asana</groupId>
+            <artifactId>asana</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-utils</artifactId>
+            <version>1.19.0-SNAPSHOT</version>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-asana-services-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-junit-jupiter</artifactId>
+            <scope>test</scope>
+        </dependency>

Review Comment:
   This is a duplicated dependency and should be deleted. `mockito-junit-jupiter` comes from the project root pom.



##########
nifi-nar-bundles/nifi-asana-bundle/nifi-asana-services/pom.xml:
##########
@@ -0,0 +1,64 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements. See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License. You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>nifi-asana-bundle</artifactId>
+        <version>1.19.0-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>nifi-asana-services</artifactId>
+    <packaging>jar</packaging>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-api</artifactId>
+            <version>1.19.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-mock</artifactId>
+            <version>1.19.0-SNAPSHOT</version>
+            <scope>test</scope>
+        </dependency>

Review Comment:
   Could you please organize the dependencies in all the poms using the following order:
   - nifi prod dependencies
   - 3rd party prod dependencies
   - test dependencies
   



##########
nifi-nar-bundles/nifi-asana-bundle/nifi-asana-services/pom.xml:
##########
@@ -0,0 +1,64 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements. See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License. You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>nifi-asana-bundle</artifactId>
+        <version>1.19.0-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>nifi-asana-services</artifactId>
+    <packaging>jar</packaging>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-api</artifactId>
+            <version>1.19.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-mock</artifactId>
+            <version>1.19.0-SNAPSHOT</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>com.squareup.okhttp3</groupId>
+            <artifactId>mockwebserver</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>commons-io</groupId>
+            <artifactId>commons-io</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.asana</groupId>
+            <artifactId>asana</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-utils</artifactId>
+            <version>1.19.0-SNAPSHOT</version>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-asana-services-api</artifactId>
+        </dependency>

Review Comment:
   Similar to the `asana` dependency, it should be `provided`.
   ```suggestion
           <dependency>
               <groupId>org.apache.nifi</groupId>
               <artifactId>nifi-asana-services-api</artifactId>
               <scope>provided</scope>
           </dependency>
   ```



##########
nifi-nar-bundles/nifi-asana-bundle/nifi-asana-processors-nar/src/main/resources/META-INF/NOTICE:
##########
@@ -0,0 +1,51 @@
+nifi-asana-nar

Review Comment:
   ```suggestion
   nifi-asana-processors-nar
   ```
   Please fix the header for the other NOTICE files too. I must match the name of the nar bundle.



##########
nifi-nar-bundles/nifi-asana-bundle/nifi-asana-services-nar/src/main/resources/META-INF/NOTICE:
##########
@@ -0,0 +1,51 @@
+nifi-asana-nar
+Copyright 2015-2022 The Apache Software Foundation
+
+This product includes software developed at
+The Apache Software Foundation (http://www.apache.org/).
+
+===========================================
+Apache Software License v2
+===========================================
+
+The following binary components are provided under the Apache Software License v2
+
+  (ASLv2) Apache Commons IO
+    The following NOTICE information applies:
+      Apache Commons IO
+      Copyright 2002-2017 The Apache Software Foundation
+
+  (ASLv2) Jackson JSON processor
+    The following NOTICE information applies:
+      # Jackson JSON processor
+
+      Jackson is a high-performance, Free/Open Source JSON processing library.
+      It was originally written by Tatu Saloranta (tatu.saloranta@iki.fi), and has
+      been in development since 2007.
+      It is currently developed by a community of developers, as well as supported
+      commercially by FasterXML.com.
+
+      ## Licensing
+
+      Jackson core and extension components may licensed under different licenses.
+      To find the details that apply to this artifact see the accompanying LICENSE file.
+      For more information, including possible other licensing options, contact
+      FasterXML.com (http://fasterxml.com).
+
+      ## Credits
+
+      A list of contributors may be found from CREDITS file, which is included
+      in some artifacts (usually source distributions); but is always available
+      from the source code management (SCM) system project uses.

Review Comment:
   Jackson is not bundled, it should be deleted.



##########
nifi-nar-bundles/nifi-asana-bundle/nifi-asana-services-nar/src/main/resources/META-INF/NOTICE:
##########
@@ -0,0 +1,51 @@
+nifi-asana-nar
+Copyright 2015-2022 The Apache Software Foundation
+
+This product includes software developed at
+The Apache Software Foundation (http://www.apache.org/).
+
+===========================================
+Apache Software License v2
+===========================================
+
+The following binary components are provided under the Apache Software License v2
+
+  (ASLv2) Apache Commons IO
+    The following NOTICE information applies:
+      Apache Commons IO
+      Copyright 2002-2017 The Apache Software Foundation
+
+  (ASLv2) Jackson JSON processor
+    The following NOTICE information applies:
+      # Jackson JSON processor
+
+      Jackson is a high-performance, Free/Open Source JSON processing library.
+      It was originally written by Tatu Saloranta (tatu.saloranta@iki.fi), and has
+      been in development since 2007.
+      It is currently developed by a community of developers, as well as supported
+      commercially by FasterXML.com.
+
+      ## Licensing
+
+      Jackson core and extension components may licensed under different licenses.
+      To find the details that apply to this artifact see the accompanying LICENSE file.
+      For more information, including possible other licensing options, contact
+      FasterXML.com (http://fasterxml.com).
+
+      ## Credits
+
+      A list of contributors may be found from CREDITS file, which is included
+      in some artifacts (usually source distributions); but is always available
+      from the source code management (SCM) system project uses.
+
+
+===========================================
+The MIT License
+===========================================
+
+The following binary components are provided under the MIT License
+
+  (MIT License) Java client library for the Asana API
+    The following NOTICE information applies:
+      Asana
+      Copyright (c) 2015

Review Comment:
   `asana` dependency is provided in this nar (see my earlier comment) and therefore this entry should be deleted.



##########
nifi-nar-bundles/nifi-asana-bundle/nifi-asana-services-api/src/main/java/org/apache/nifi/controller/asana/AsanaClient.java:
##########
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.asana;
+
+import com.asana.models.Attachment;
+import com.asana.models.Project;
+import com.asana.models.ProjectMembership;
+import com.asana.models.ProjectStatus;
+import com.asana.models.Section;
+import com.asana.models.Story;
+import com.asana.models.Tag;
+import com.asana.models.Task;
+import com.asana.models.Team;
+import com.asana.models.User;
+
+import java.util.Map;
+
+/**
+ * This interface represents a client to Asana REST server, with some basic filtering options built in.
+ */
+public interface AsanaClient {
+    /**
+     * Find & retrieve an Asana project by its name. If multiple projects match, returns the first.
+     * If there is no match, then {@link RuntimeException} is thrown. Note that constant ordering

Review Comment:
   Please do not throw raw `RuntimeException`s in the code. Please use `ProcessException` or `UncheckedIOException` instead, or you can also define a custom runtime exception like `AsanaException`.



##########
nifi-nar-bundles/nifi-asana-bundle/nifi-asana-processors/src/main/java/org/apache/nifi/processors/asana/GetAsanaObject.java:
##########
@@ -0,0 +1,457 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.asana;
+
+import static java.lang.String.format;
+import static java.util.Collections.singletonMap;
+import static java.util.stream.Collectors.groupingBy;
+import static java.util.stream.Collectors.joining;
+import static org.apache.commons.collections4.ListUtils.partition;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import org.apache.http.entity.ContentType;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.PrimaryNodeOnly;
+import org.apache.nifi.annotation.behavior.Stateful;
+import org.apache.nifi.annotation.behavior.TriggerSerially;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.components.state.StateMap;
+import org.apache.nifi.controller.asana.AsanaClient;
+import org.apache.nifi.controller.asana.AsanaClientProviderService;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.asana.utils.AsanaObject;
+import org.apache.nifi.processors.asana.utils.AsanaObjectFetcher;
+import org.apache.nifi.processors.asana.utils.AsanaObjectState;
+import org.apache.nifi.processors.asana.utils.AsanaProjectEventFetcher;
+import org.apache.nifi.processors.asana.utils.AsanaProjectFetcher;
+import org.apache.nifi.processors.asana.utils.AsanaProjectMembershipFetcher;
+import org.apache.nifi.processors.asana.utils.AsanaProjectStatusAttachmentFetcher;
+import org.apache.nifi.processors.asana.utils.AsanaProjectStatusFetcher;
+import org.apache.nifi.processors.asana.utils.AsanaStoryFetcher;
+import org.apache.nifi.processors.asana.utils.AsanaTagFetcher;
+import org.apache.nifi.processors.asana.utils.AsanaTaskAttachmentFetcher;
+import org.apache.nifi.processors.asana.utils.AsanaTaskFetcher;
+import org.apache.nifi.processors.asana.utils.AsanaTeamFetcher;
+import org.apache.nifi.processors.asana.utils.AsanaTeamMemberFetcher;
+import org.apache.nifi.processors.asana.utils.AsanaUserFetcher;
+import org.apache.nifi.reporting.InitializationException;
+
+@TriggerSerially
+@PrimaryNodeOnly
+@Stateful(scopes = {Scope.CLUSTER}, description = "Fingerprints of items in the last successful query are stored in order to enable incremental loading and change detection.")
+@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
+@WritesAttribute(attribute = GetAsanaObject.ASANA_GID, description = "Global ID of the object in Asana.")
+@Tags({"asana", "source", "ingest"})
+@CapabilityDescription("This processor collects data from Asana")
+public class GetAsanaObject extends AbstractProcessor {
+
+    protected static final String ASANA_GID = "asana.gid";
+    protected static final String AV_NAME_COLLECT_TASKS = "asana-collect-tasks";
+    protected static final String AV_NAME_COLLECT_TASK_ATTACHMENTS = "asana-collect-task-attachments";
+    protected static final String AV_NAME_COLLECT_PROJECTS = "asana-collect-projects";
+    protected static final String AV_NAME_COLLECT_TAGS = "asana-collect-tags";
+    protected static final String AV_NAME_COLLECT_USERS = "asana-collect-users";
+    protected static final String AV_NAME_COLLECT_PROJECT_MEMBERS = "asana-collect-project-members";
+    protected static final String AV_NAME_COLLECT_TEAMS = "asana-collect-teams";
+    protected static final String AV_NAME_COLLECT_TEAM_MEMBERS = "asana-collect-team-members";
+    protected static final String AV_NAME_COLLECT_STORIES = "asana-collect-stories";
+    protected static final String AV_NAME_COLLECT_PROJECT_STATUS_UPDATES = "asana-collect-project-status-updates";
+    protected static final String AV_NAME_COLLECT_PROJECT_STATUS_ATTACHMENTS = "asana-collect-project-status-attachments";
+    protected static final String AV_NAME_COLLECT_PROJECT_EVENTS = "asana-collect-project-events";

Review Comment:
   I'd suggest creating an `enum` for these allowable values, like [HubSpotObjectType](https://github.com/apache/nifi/blob/main/nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-processors/src/main/java/org/apache/nifi/processors/hubspot/HubSpotObjectType.java).



##########
nifi-nar-bundles/nifi-asana-bundle/nifi-asana-processors-nar/src/main/resources/META-INF/NOTICE:
##########
@@ -0,0 +1,51 @@
+nifi-asana-nar
+Copyright 2015-2022 The Apache Software Foundation
+
+This product includes software developed at
+The Apache Software Foundation (http://www.apache.org/).
+
+===========================================
+Apache Software License v2
+===========================================
+
+The following binary components are provided under the Apache Software License v2
+
+  (ASLv2) Apache Commons IO
+    The following NOTICE information applies:
+      Apache Commons IO
+      Copyright 2002-2017 The Apache Software Foundation
+
+  (ASLv2) Jackson JSON processor
+    The following NOTICE information applies:
+      # Jackson JSON processor
+
+      Jackson is a high-performance, Free/Open Source JSON processing library.
+      It was originally written by Tatu Saloranta (tatu.saloranta@iki.fi), and has
+      been in development since 2007.
+      It is currently developed by a community of developers, as well as supported
+      commercially by FasterXML.com.
+
+      ## Licensing
+
+      Jackson core and extension components may licensed under different licenses.
+      To find the details that apply to this artifact see the accompanying LICENSE file.
+      For more information, including possible other licensing options, contact
+      FasterXML.com (http://fasterxml.com).
+
+      ## Credits
+
+      A list of contributors may be found from CREDITS file, which is included
+      in some artifacts (usually source distributions); but is always available
+      from the source code management (SCM) system project uses.
+
+
+===========================================
+The MIT License
+===========================================
+
+The following binary components are provided under the MIT License
+
+  (MIT License) Java client library for the Asana API
+    The following NOTICE information applies:
+      Asana
+      Copyright (c) 2015

Review Comment:
   Similar to `nifi-asana-services-nar`'s NOTICE file:
   - Jackson is not present
   - Asana is provided and should be delete from here



##########
nifi-nar-bundles/nifi-asana-bundle/nifi-asana-processors/src/main/java/org/apache/nifi/processors/asana/GetAsanaObject.java:
##########
@@ -0,0 +1,457 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.asana;
+
+import static java.lang.String.format;
+import static java.util.Collections.singletonMap;
+import static java.util.stream.Collectors.groupingBy;
+import static java.util.stream.Collectors.joining;
+import static org.apache.commons.collections4.ListUtils.partition;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import org.apache.http.entity.ContentType;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.PrimaryNodeOnly;
+import org.apache.nifi.annotation.behavior.Stateful;
+import org.apache.nifi.annotation.behavior.TriggerSerially;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.components.state.StateMap;
+import org.apache.nifi.controller.asana.AsanaClient;
+import org.apache.nifi.controller.asana.AsanaClientProviderService;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.asana.utils.AsanaObject;
+import org.apache.nifi.processors.asana.utils.AsanaObjectFetcher;
+import org.apache.nifi.processors.asana.utils.AsanaObjectState;
+import org.apache.nifi.processors.asana.utils.AsanaProjectEventFetcher;
+import org.apache.nifi.processors.asana.utils.AsanaProjectFetcher;
+import org.apache.nifi.processors.asana.utils.AsanaProjectMembershipFetcher;
+import org.apache.nifi.processors.asana.utils.AsanaProjectStatusAttachmentFetcher;
+import org.apache.nifi.processors.asana.utils.AsanaProjectStatusFetcher;
+import org.apache.nifi.processors.asana.utils.AsanaStoryFetcher;
+import org.apache.nifi.processors.asana.utils.AsanaTagFetcher;
+import org.apache.nifi.processors.asana.utils.AsanaTaskAttachmentFetcher;
+import org.apache.nifi.processors.asana.utils.AsanaTaskFetcher;
+import org.apache.nifi.processors.asana.utils.AsanaTeamFetcher;
+import org.apache.nifi.processors.asana.utils.AsanaTeamMemberFetcher;
+import org.apache.nifi.processors.asana.utils.AsanaUserFetcher;
+import org.apache.nifi.reporting.InitializationException;
+
+@TriggerSerially
+@PrimaryNodeOnly
+@Stateful(scopes = {Scope.CLUSTER}, description = "Fingerprints of items in the last successful query are stored in order to enable incremental loading and change detection.")
+@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
+@WritesAttribute(attribute = GetAsanaObject.ASANA_GID, description = "Global ID of the object in Asana.")
+@Tags({"asana", "source", "ingest"})
+@CapabilityDescription("This processor collects data from Asana")
+public class GetAsanaObject extends AbstractProcessor {
+
+    protected static final String ASANA_GID = "asana.gid";
+    protected static final String AV_NAME_COLLECT_TASKS = "asana-collect-tasks";
+    protected static final String AV_NAME_COLLECT_TASK_ATTACHMENTS = "asana-collect-task-attachments";
+    protected static final String AV_NAME_COLLECT_PROJECTS = "asana-collect-projects";
+    protected static final String AV_NAME_COLLECT_TAGS = "asana-collect-tags";
+    protected static final String AV_NAME_COLLECT_USERS = "asana-collect-users";
+    protected static final String AV_NAME_COLLECT_PROJECT_MEMBERS = "asana-collect-project-members";
+    protected static final String AV_NAME_COLLECT_TEAMS = "asana-collect-teams";
+    protected static final String AV_NAME_COLLECT_TEAM_MEMBERS = "asana-collect-team-members";
+    protected static final String AV_NAME_COLLECT_STORIES = "asana-collect-stories";
+    protected static final String AV_NAME_COLLECT_PROJECT_STATUS_UPDATES = "asana-collect-project-status-updates";
+    protected static final String AV_NAME_COLLECT_PROJECT_STATUS_ATTACHMENTS = "asana-collect-project-status-attachments";
+    protected static final String AV_NAME_COLLECT_PROJECT_EVENTS = "asana-collect-project-events";
+    protected static final String ASANA_CONTROLLER_SERVICE = "asana-controller-service";
+    protected static final String ASANA_OBJECT_TYPE = "asana-object-type";
+    protected static final String ASANA_PROJECT_NAME = "asana-project-name";
+    protected static final String ASANA_SECTION_NAME = "asana-section-name";
+    protected static final String ASANA_TAG_NAME = "asana-tag-name";
+    protected static final String ASANA_TEAM_NAME = "asana-team-name";
+    protected static final String ASANA_OUTPUT_BATCH_SIZE = "asana-output-batch-size";
+    protected static final String REL_NAME_NEW = "new";
+    protected static final String REL_NAME_UPDATED = "updated";
+    protected static final String REL_NAME_REMOVED = "removed";
+
+    protected static final AllowableValue AV_COLLECT_TASKS = new AllowableValue(
+            AV_NAME_COLLECT_TASKS,
+            "Tasks",
+            "Collect tasks matching to the specified conditions.");
+
+    protected static final AllowableValue AV_COLLECT_TASK_ATTACHMENTS = new AllowableValue(
+            AV_NAME_COLLECT_TASK_ATTACHMENTS,
+            "Task Attachments",
+            "Collect attached files of tasks matching to the specified conditions."
+    );
+
+    protected static final AllowableValue AV_COLLECT_PROJECTS = new AllowableValue(
+            AV_NAME_COLLECT_PROJECTS,
+            "Projects",
+            "Collect projects of the workspace."
+    );
+
+    protected static final AllowableValue AV_COLLECT_TAGS = new AllowableValue(
+            AV_NAME_COLLECT_TAGS,
+            "Tags",
+            "Collect tags of the workspace."
+    );
+
+    protected static final AllowableValue AV_COLLECT_USERS = new AllowableValue(
+            AV_NAME_COLLECT_USERS,
+            "Users",
+            "Collect users assigned to the workspace."
+    );
+
+    protected static final AllowableValue AV_COLLECT_PROJECT_MEMBERS = new AllowableValue(
+            AV_NAME_COLLECT_PROJECT_MEMBERS,
+            "Members of a Project",
+            "Collect users assigned to the specified project."
+    );
+
+    protected static final AllowableValue AV_COLLECT_TEAMS = new AllowableValue(
+            AV_NAME_COLLECT_TEAMS,
+            "Teams",
+            "Collect teams of the workspace."
+    );
+
+    protected static final AllowableValue AV_COLLECT_TEAM_MEMBERS = new AllowableValue(
+            AV_NAME_COLLECT_TEAM_MEMBERS,
+            "Team Members",
+            "Collect users assigned to the specified team."
+    );
+
+    protected static final AllowableValue AV_COLLECT_STORIES = new AllowableValue(
+            AV_NAME_COLLECT_STORIES,
+            "Stories of Tasks",
+            "Collect stories (comments) of of tasks matching to the specified conditions."
+    );
+
+    protected static final AllowableValue AV_COLLECT_PROJECT_STATUS_UPDATES = new AllowableValue(
+            AV_NAME_COLLECT_PROJECT_STATUS_UPDATES,
+            "Status Updates of a Project",
+            "Collect status updates of the specified project."
+    );
+
+    protected static final AllowableValue AV_COLLECT_PROJECT_STATUS_ATTACHMENTS = new AllowableValue(
+            AV_NAME_COLLECT_PROJECT_STATUS_ATTACHMENTS,
+            "Attachments of Status Updates",
+            "Collect attached files of project status updates."
+    );
+
+    protected static final AllowableValue AV_COLLECT_PROJECT_EVENTS = new AllowableValue(
+            AV_NAME_COLLECT_PROJECT_EVENTS,
+            "Events of a Project",
+            "Collect various events happening on the specified project and on its' tasks."
+    );
+
+    protected static final PropertyDescriptor PROP_ASANA_CONTROLLER_SERVICE = new PropertyDescriptor.Builder()
+            .name(ASANA_CONTROLLER_SERVICE)
+            .displayName("Asana Controller Service")
+            .description("Specify which controller service to use for accessing Asana.")
+            .required(true)
+            .identifiesControllerService(AsanaClientProviderService.class)
+            .build();
+
+    protected static final PropertyDescriptor PROP_ASANA_OBJECT_TYPE = new PropertyDescriptor.Builder()
+            .name(ASANA_OBJECT_TYPE)
+            .displayName("Object Type")
+            .description("Specify what kind of objects to be collected from Asana")
+            .required(true)
+            .allowableValues(
+                    AV_COLLECT_TASKS,
+                    AV_COLLECT_TASK_ATTACHMENTS,
+                    AV_COLLECT_PROJECTS,
+                    AV_COLLECT_TAGS,
+                    AV_COLLECT_USERS,
+                    AV_COLLECT_PROJECT_MEMBERS,
+                    AV_COLLECT_TEAMS,
+                    AV_COLLECT_TEAM_MEMBERS,
+                    AV_COLLECT_STORIES,
+                    AV_COLLECT_PROJECT_STATUS_UPDATES,
+                    AV_COLLECT_PROJECT_STATUS_ATTACHMENTS,
+                    AV_COLLECT_PROJECT_EVENTS)
+            .defaultValue(AV_COLLECT_TASKS.getValue())
+            .build();
+
+    protected static final PropertyDescriptor PROP_ASANA_PROJECT = new PropertyDescriptor.Builder()
+            .name(ASANA_PROJECT_NAME)
+            .displayName("Project Name")
+            .description("Fetch only objects in this project. Case sensitive.")
+            .required(true)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .dependsOn(
+                PROP_ASANA_OBJECT_TYPE,
+                    AV_COLLECT_TASKS,
+                    AV_COLLECT_TASK_ATTACHMENTS,
+                    AV_COLLECT_PROJECT_MEMBERS,
+                    AV_COLLECT_STORIES,
+                    AV_COLLECT_PROJECT_STATUS_UPDATES,
+                    AV_COLLECT_PROJECT_STATUS_ATTACHMENTS,
+                    AV_COLLECT_PROJECT_EVENTS)
+            .build();
+
+    protected static final PropertyDescriptor PROP_ASANA_SECTION = new PropertyDescriptor.Builder()
+            .name(ASANA_SECTION_NAME)
+            .displayName("Section Name")
+            .description("Fetch only objects in this section. Case sensitive.")
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .dependsOn(PROP_ASANA_OBJECT_TYPE, AV_COLLECT_TASKS, AV_COLLECT_TASK_ATTACHMENTS, AV_COLLECT_STORIES)
+            .build();
+
+    protected static final PropertyDescriptor PROP_ASANA_TAG = new PropertyDescriptor.Builder()
+            .name(ASANA_TAG_NAME)
+            .displayName("Tag")
+            .description("Fetch only objects having this tag. Case sensitive.")
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .dependsOn(PROP_ASANA_OBJECT_TYPE, AV_COLLECT_TASKS, AV_COLLECT_TASK_ATTACHMENTS, AV_COLLECT_STORIES)
+            .build();
+
+    protected static final PropertyDescriptor PROP_ASANA_TEAM_NAME = new PropertyDescriptor.Builder()
+            .name(ASANA_TEAM_NAME)
+            .displayName("Team")
+            .description("Team name. Case sensitive.")
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .dependsOn(PROP_ASANA_OBJECT_TYPE, AV_COLLECT_TEAM_MEMBERS)
+            .build();
+
+    protected static final PropertyDescriptor PROP_ASANA_OUTPUT_BATCH_SIZE = new PropertyDescriptor.Builder()
+            .name(ASANA_OUTPUT_BATCH_SIZE)
+            .displayName("Output Batch Size")
+            .description("The number of items batched together in a single Flow File. If set to 1 (default), then each item is"
+                    + " transferred in a separate Flow File and each will have an asana.gid attribute, to help identifying"
+                    + " the fetched item on the server side, if needed. If the batch size is greater than 1, then the"
+                    + " specified amount of items are batched together in a single Flow File as a Json array, and the"
+                    + " Flow Files won't have the asana.gid attribute.")
+            .defaultValue("1")
+            .required(true)
+            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+            .build();
+
+    protected static final List<PropertyDescriptor> DESCRIPTORS = Collections.unmodifiableList(Arrays.asList(
+            PROP_ASANA_CONTROLLER_SERVICE,
+            PROP_ASANA_OBJECT_TYPE,
+            PROP_ASANA_PROJECT,
+            PROP_ASANA_SECTION,
+            PROP_ASANA_TEAM_NAME,
+            PROP_ASANA_TAG,
+            PROP_ASANA_OUTPUT_BATCH_SIZE
+    ));
+
+    protected static final Relationship REL_NEW = new Relationship.Builder()
+            .name(REL_NAME_NEW)
+            .description("Newly collected objects are routed to this relationship.")
+            .build();
+
+    protected static final Relationship REL_UPDATED = new Relationship.Builder()
+            .name(REL_NAME_UPDATED)
+            .description("Objects that have already been collected earlier, but were updated since, are routed to this relationship.")
+            .build();
+
+    protected static final Relationship REL_REMOVED = new Relationship.Builder()
+            .name(REL_NAME_REMOVED)
+            .description("Notification about deleted objects are routed to this relationship. "
+                    + "Flow files will not have any payload. IDs of the resources no longer exist "
+                    + "are carried by the asana.gid attribute of the generated FlowFiles.")
+            .build();
+
+    protected static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
+            REL_NEW,
+            REL_UPDATED,
+            REL_REMOVED
+    )));
+
+    private static final Scope STATE_STORAGE_SCOPE = Scope.CLUSTER;
+
+    private volatile AsanaObjectFetcher objectFetcher;
+    private volatile Integer batchSize;
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return RELATIONSHIPS;
+    }
+
+    @Override
+    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return DESCRIPTORS;
+    }
+
+    @OnScheduled
+    public synchronized void onScheduled(final ProcessContext context) throws InitializationException {
+        AsanaClientProviderService controllerService = context.getProperty(PROP_ASANA_CONTROLLER_SERVICE).asControllerService(AsanaClientProviderService.class);
+        AsanaClient client = controllerService.createClient();
+        batchSize = context.getProperty(PROP_ASANA_OUTPUT_BATCH_SIZE).asInteger();
+
+        try {
+            getLogger().debug("Initializing object fetcher...");
+            objectFetcher = createObjectFetcher(context, client);
+        } catch (Exception e) {
+            throw new InitializationException(e);
+        }
+    }
+
+    @Override
+    public synchronized void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {

Review Comment:
   We don't use `synchronized` for `onTrigger()`. `@TriggerSerially` ensures that the framework will call it on a single thread at a time.



##########
nifi-nar-bundles/nifi-asana-bundle/nifi-asana-processors/src/main/java/org/apache/nifi/processors/asana/GetAsanaObject.java:
##########
@@ -0,0 +1,457 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.asana;
+
+import static java.lang.String.format;
+import static java.util.Collections.singletonMap;
+import static java.util.stream.Collectors.groupingBy;
+import static java.util.stream.Collectors.joining;
+import static org.apache.commons.collections4.ListUtils.partition;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import org.apache.http.entity.ContentType;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.PrimaryNodeOnly;
+import org.apache.nifi.annotation.behavior.Stateful;
+import org.apache.nifi.annotation.behavior.TriggerSerially;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.components.state.StateMap;
+import org.apache.nifi.controller.asana.AsanaClient;
+import org.apache.nifi.controller.asana.AsanaClientProviderService;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.asana.utils.AsanaObject;
+import org.apache.nifi.processors.asana.utils.AsanaObjectFetcher;
+import org.apache.nifi.processors.asana.utils.AsanaObjectState;
+import org.apache.nifi.processors.asana.utils.AsanaProjectEventFetcher;
+import org.apache.nifi.processors.asana.utils.AsanaProjectFetcher;
+import org.apache.nifi.processors.asana.utils.AsanaProjectMembershipFetcher;
+import org.apache.nifi.processors.asana.utils.AsanaProjectStatusAttachmentFetcher;
+import org.apache.nifi.processors.asana.utils.AsanaProjectStatusFetcher;
+import org.apache.nifi.processors.asana.utils.AsanaStoryFetcher;
+import org.apache.nifi.processors.asana.utils.AsanaTagFetcher;
+import org.apache.nifi.processors.asana.utils.AsanaTaskAttachmentFetcher;
+import org.apache.nifi.processors.asana.utils.AsanaTaskFetcher;
+import org.apache.nifi.processors.asana.utils.AsanaTeamFetcher;
+import org.apache.nifi.processors.asana.utils.AsanaTeamMemberFetcher;
+import org.apache.nifi.processors.asana.utils.AsanaUserFetcher;
+import org.apache.nifi.reporting.InitializationException;
+
+@TriggerSerially
+@PrimaryNodeOnly
+@Stateful(scopes = {Scope.CLUSTER}, description = "Fingerprints of items in the last successful query are stored in order to enable incremental loading and change detection.")
+@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
+@WritesAttribute(attribute = GetAsanaObject.ASANA_GID, description = "Global ID of the object in Asana.")
+@Tags({"asana", "source", "ingest"})
+@CapabilityDescription("This processor collects data from Asana")
+public class GetAsanaObject extends AbstractProcessor {
+
+    protected static final String ASANA_GID = "asana.gid";
+    protected static final String AV_NAME_COLLECT_TASKS = "asana-collect-tasks";
+    protected static final String AV_NAME_COLLECT_TASK_ATTACHMENTS = "asana-collect-task-attachments";
+    protected static final String AV_NAME_COLLECT_PROJECTS = "asana-collect-projects";
+    protected static final String AV_NAME_COLLECT_TAGS = "asana-collect-tags";
+    protected static final String AV_NAME_COLLECT_USERS = "asana-collect-users";
+    protected static final String AV_NAME_COLLECT_PROJECT_MEMBERS = "asana-collect-project-members";
+    protected static final String AV_NAME_COLLECT_TEAMS = "asana-collect-teams";
+    protected static final String AV_NAME_COLLECT_TEAM_MEMBERS = "asana-collect-team-members";
+    protected static final String AV_NAME_COLLECT_STORIES = "asana-collect-stories";
+    protected static final String AV_NAME_COLLECT_PROJECT_STATUS_UPDATES = "asana-collect-project-status-updates";
+    protected static final String AV_NAME_COLLECT_PROJECT_STATUS_ATTACHMENTS = "asana-collect-project-status-attachments";
+    protected static final String AV_NAME_COLLECT_PROJECT_EVENTS = "asana-collect-project-events";
+    protected static final String ASANA_CONTROLLER_SERVICE = "asana-controller-service";
+    protected static final String ASANA_OBJECT_TYPE = "asana-object-type";
+    protected static final String ASANA_PROJECT_NAME = "asana-project-name";
+    protected static final String ASANA_SECTION_NAME = "asana-section-name";
+    protected static final String ASANA_TAG_NAME = "asana-tag-name";
+    protected static final String ASANA_TEAM_NAME = "asana-team-name";
+    protected static final String ASANA_OUTPUT_BATCH_SIZE = "asana-output-batch-size";
+    protected static final String REL_NAME_NEW = "new";
+    protected static final String REL_NAME_UPDATED = "updated";
+    protected static final String REL_NAME_REMOVED = "removed";
+
+    protected static final AllowableValue AV_COLLECT_TASKS = new AllowableValue(
+            AV_NAME_COLLECT_TASKS,
+            "Tasks",
+            "Collect tasks matching to the specified conditions.");
+
+    protected static final AllowableValue AV_COLLECT_TASK_ATTACHMENTS = new AllowableValue(
+            AV_NAME_COLLECT_TASK_ATTACHMENTS,
+            "Task Attachments",
+            "Collect attached files of tasks matching to the specified conditions."
+    );
+
+    protected static final AllowableValue AV_COLLECT_PROJECTS = new AllowableValue(
+            AV_NAME_COLLECT_PROJECTS,
+            "Projects",
+            "Collect projects of the workspace."
+    );
+
+    protected static final AllowableValue AV_COLLECT_TAGS = new AllowableValue(
+            AV_NAME_COLLECT_TAGS,
+            "Tags",
+            "Collect tags of the workspace."
+    );
+
+    protected static final AllowableValue AV_COLLECT_USERS = new AllowableValue(
+            AV_NAME_COLLECT_USERS,
+            "Users",
+            "Collect users assigned to the workspace."
+    );
+
+    protected static final AllowableValue AV_COLLECT_PROJECT_MEMBERS = new AllowableValue(
+            AV_NAME_COLLECT_PROJECT_MEMBERS,
+            "Members of a Project",
+            "Collect users assigned to the specified project."
+    );
+
+    protected static final AllowableValue AV_COLLECT_TEAMS = new AllowableValue(
+            AV_NAME_COLLECT_TEAMS,
+            "Teams",
+            "Collect teams of the workspace."
+    );
+
+    protected static final AllowableValue AV_COLLECT_TEAM_MEMBERS = new AllowableValue(
+            AV_NAME_COLLECT_TEAM_MEMBERS,
+            "Team Members",
+            "Collect users assigned to the specified team."
+    );
+
+    protected static final AllowableValue AV_COLLECT_STORIES = new AllowableValue(
+            AV_NAME_COLLECT_STORIES,
+            "Stories of Tasks",
+            "Collect stories (comments) of of tasks matching to the specified conditions."
+    );
+
+    protected static final AllowableValue AV_COLLECT_PROJECT_STATUS_UPDATES = new AllowableValue(
+            AV_NAME_COLLECT_PROJECT_STATUS_UPDATES,
+            "Status Updates of a Project",
+            "Collect status updates of the specified project."
+    );
+
+    protected static final AllowableValue AV_COLLECT_PROJECT_STATUS_ATTACHMENTS = new AllowableValue(
+            AV_NAME_COLLECT_PROJECT_STATUS_ATTACHMENTS,
+            "Attachments of Status Updates",
+            "Collect attached files of project status updates."
+    );
+
+    protected static final AllowableValue AV_COLLECT_PROJECT_EVENTS = new AllowableValue(
+            AV_NAME_COLLECT_PROJECT_EVENTS,
+            "Events of a Project",
+            "Collect various events happening on the specified project and on its' tasks."
+    );
+
+    protected static final PropertyDescriptor PROP_ASANA_CONTROLLER_SERVICE = new PropertyDescriptor.Builder()
+            .name(ASANA_CONTROLLER_SERVICE)
+            .displayName("Asana Controller Service")
+            .description("Specify which controller service to use for accessing Asana.")
+            .required(true)
+            .identifiesControllerService(AsanaClientProviderService.class)
+            .build();
+
+    protected static final PropertyDescriptor PROP_ASANA_OBJECT_TYPE = new PropertyDescriptor.Builder()
+            .name(ASANA_OBJECT_TYPE)
+            .displayName("Object Type")
+            .description("Specify what kind of objects to be collected from Asana")
+            .required(true)
+            .allowableValues(
+                    AV_COLLECT_TASKS,
+                    AV_COLLECT_TASK_ATTACHMENTS,
+                    AV_COLLECT_PROJECTS,
+                    AV_COLLECT_TAGS,
+                    AV_COLLECT_USERS,
+                    AV_COLLECT_PROJECT_MEMBERS,
+                    AV_COLLECT_TEAMS,
+                    AV_COLLECT_TEAM_MEMBERS,
+                    AV_COLLECT_STORIES,
+                    AV_COLLECT_PROJECT_STATUS_UPDATES,
+                    AV_COLLECT_PROJECT_STATUS_ATTACHMENTS,
+                    AV_COLLECT_PROJECT_EVENTS)
+            .defaultValue(AV_COLLECT_TASKS.getValue())
+            .build();
+
+    protected static final PropertyDescriptor PROP_ASANA_PROJECT = new PropertyDescriptor.Builder()
+            .name(ASANA_PROJECT_NAME)
+            .displayName("Project Name")
+            .description("Fetch only objects in this project. Case sensitive.")
+            .required(true)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .dependsOn(
+                PROP_ASANA_OBJECT_TYPE,
+                    AV_COLLECT_TASKS,
+                    AV_COLLECT_TASK_ATTACHMENTS,
+                    AV_COLLECT_PROJECT_MEMBERS,
+                    AV_COLLECT_STORIES,
+                    AV_COLLECT_PROJECT_STATUS_UPDATES,
+                    AV_COLLECT_PROJECT_STATUS_ATTACHMENTS,
+                    AV_COLLECT_PROJECT_EVENTS)
+            .build();
+
+    protected static final PropertyDescriptor PROP_ASANA_SECTION = new PropertyDescriptor.Builder()
+            .name(ASANA_SECTION_NAME)
+            .displayName("Section Name")
+            .description("Fetch only objects in this section. Case sensitive.")
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .dependsOn(PROP_ASANA_OBJECT_TYPE, AV_COLLECT_TASKS, AV_COLLECT_TASK_ATTACHMENTS, AV_COLLECT_STORIES)
+            .build();
+
+    protected static final PropertyDescriptor PROP_ASANA_TAG = new PropertyDescriptor.Builder()
+            .name(ASANA_TAG_NAME)
+            .displayName("Tag")
+            .description("Fetch only objects having this tag. Case sensitive.")
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .dependsOn(PROP_ASANA_OBJECT_TYPE, AV_COLLECT_TASKS, AV_COLLECT_TASK_ATTACHMENTS, AV_COLLECT_STORIES)
+            .build();
+
+    protected static final PropertyDescriptor PROP_ASANA_TEAM_NAME = new PropertyDescriptor.Builder()
+            .name(ASANA_TEAM_NAME)
+            .displayName("Team")
+            .description("Team name. Case sensitive.")
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .dependsOn(PROP_ASANA_OBJECT_TYPE, AV_COLLECT_TEAM_MEMBERS)
+            .build();
+
+    protected static final PropertyDescriptor PROP_ASANA_OUTPUT_BATCH_SIZE = new PropertyDescriptor.Builder()
+            .name(ASANA_OUTPUT_BATCH_SIZE)
+            .displayName("Output Batch Size")
+            .description("The number of items batched together in a single Flow File. If set to 1 (default), then each item is"
+                    + " transferred in a separate Flow File and each will have an asana.gid attribute, to help identifying"
+                    + " the fetched item on the server side, if needed. If the batch size is greater than 1, then the"
+                    + " specified amount of items are batched together in a single Flow File as a Json array, and the"
+                    + " Flow Files won't have the asana.gid attribute.")
+            .defaultValue("1")
+            .required(true)
+            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+            .build();
+
+    protected static final List<PropertyDescriptor> DESCRIPTORS = Collections.unmodifiableList(Arrays.asList(
+            PROP_ASANA_CONTROLLER_SERVICE,
+            PROP_ASANA_OBJECT_TYPE,
+            PROP_ASANA_PROJECT,
+            PROP_ASANA_SECTION,
+            PROP_ASANA_TEAM_NAME,
+            PROP_ASANA_TAG,
+            PROP_ASANA_OUTPUT_BATCH_SIZE
+    ));
+
+    protected static final Relationship REL_NEW = new Relationship.Builder()
+            .name(REL_NAME_NEW)
+            .description("Newly collected objects are routed to this relationship.")
+            .build();
+
+    protected static final Relationship REL_UPDATED = new Relationship.Builder()
+            .name(REL_NAME_UPDATED)
+            .description("Objects that have already been collected earlier, but were updated since, are routed to this relationship.")
+            .build();
+
+    protected static final Relationship REL_REMOVED = new Relationship.Builder()
+            .name(REL_NAME_REMOVED)
+            .description("Notification about deleted objects are routed to this relationship. "
+                    + "Flow files will not have any payload. IDs of the resources no longer exist "
+                    + "are carried by the asana.gid attribute of the generated FlowFiles.")
+            .build();
+
+    protected static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
+            REL_NEW,
+            REL_UPDATED,
+            REL_REMOVED
+    )));
+
+    private static final Scope STATE_STORAGE_SCOPE = Scope.CLUSTER;
+
+    private volatile AsanaObjectFetcher objectFetcher;
+    private volatile Integer batchSize;
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return RELATIONSHIPS;
+    }
+
+    @Override
+    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return DESCRIPTORS;
+    }
+
+    @OnScheduled
+    public synchronized void onScheduled(final ProcessContext context) throws InitializationException {
+        AsanaClientProviderService controllerService = context.getProperty(PROP_ASANA_CONTROLLER_SERVICE).asControllerService(AsanaClientProviderService.class);
+        AsanaClient client = controllerService.createClient();
+        batchSize = context.getProperty(PROP_ASANA_OUTPUT_BATCH_SIZE).asInteger();
+
+        try {
+            getLogger().debug("Initializing object fetcher...");
+            objectFetcher = createObjectFetcher(context, client);
+        } catch (Exception e) {
+            throw new InitializationException(e);
+        }
+    }
+
+    @Override
+    public synchronized void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+        try {
+            Map<String, String> state = recoverState(context).orElse(Collections.emptyMap());
+            getLogger().debug("Attempting to load state: {}", state);
+            objectFetcher.loadState(state);
+        } catch (Exception e) {
+            getLogger().info("Failed to recover state. Falling back to clean start.");
+            objectFetcher.clearState();
+        }
+        getLogger().debug("Initial state: {}", objectFetcher.saveState());
+
+        Collection<FlowFile> newItems = new ArrayList<>();
+        Collection<FlowFile> updatedItems = new ArrayList<>();
+        Collection<FlowFile> removedItems = new ArrayList<>();
+        Map<AsanaObjectState, Collection<FlowFile>> flowFiles = new HashMap<>();
+        flowFiles.put(AsanaObjectState.NEW, newItems);
+        flowFiles.put(AsanaObjectState.UPDATED, updatedItems);
+        flowFiles.put(AsanaObjectState.REMOVED, removedItems);
+
+        List<AsanaObject> allObjects = new ArrayList<>();
+
+        AsanaObject nextObject;
+        while ((nextObject = objectFetcher.fetchNext()) != null) {
+            allObjects.add(nextObject);
+        }
+
+        Map<AsanaObjectState, List<AsanaObject>> allObjectsByState = allObjects.stream()
+                .collect(groupingBy(AsanaObject::getState));
+
+        if (batchSize == 1) {
+            allObjectsByState
+                    .forEach((asanaObjectState, asanaObjects) -> asanaObjects.forEach(
+                            asanaObject -> {
+                                final Map<String, String> attributes = new HashMap<>(2);
+                                attributes.put(CoreAttributes.MIME_TYPE.key(), ContentType.APPLICATION_JSON.getMimeType());
+                                attributes.put(ASANA_GID, asanaObject.getGid());
+                                FlowFile flowFile = createFlowFileWithStringPayload(session, asanaObject.getContent());
+                                flowFile = session.putAllAttributes(flowFile, attributes);
+                                flowFiles.get(asanaObject.getState()).add(flowFile);
+                            }
+                    ));
+        } else {
+            allObjectsByState
+                    .forEach((asanaObjectState, asanaObjects) -> partition(asanaObjects, batchSize).forEach(
+                            asanaObjectsInPartition -> {
+                                FlowFile flowFile = createFlowFileWithStringPayload(session, format("[%s]",
+                                        asanaObjectsInPartition.stream().map(AsanaObject::getContent)
+                                                .collect(joining(","))));
+                                flowFile = session.putAllAttributes(flowFile,
+                                        singletonMap(CoreAttributes.MIME_TYPE.key(),
+                                                ContentType.APPLICATION_JSON.getMimeType()));
+                                flowFiles.get(asanaObjectState).add(flowFile);
+                            }
+                    ));
+        }
+
+        if (flowFiles.values().stream().allMatch(Collection::isEmpty)) {
+            context.yield();
+            getLogger().debug("Yielding, as there are no new FlowFiles.");

Review Comment:
   I believe we can return at this point.



##########
nifi-nar-bundles/nifi-asana-bundle/nifi-asana-processors/src/main/java/org/apache/nifi/processors/asana/GetAsanaObject.java:
##########
@@ -0,0 +1,457 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.asana;
+
+import static java.lang.String.format;
+import static java.util.Collections.singletonMap;
+import static java.util.stream.Collectors.groupingBy;
+import static java.util.stream.Collectors.joining;
+import static org.apache.commons.collections4.ListUtils.partition;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import org.apache.http.entity.ContentType;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.PrimaryNodeOnly;
+import org.apache.nifi.annotation.behavior.Stateful;
+import org.apache.nifi.annotation.behavior.TriggerSerially;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.components.state.StateMap;
+import org.apache.nifi.controller.asana.AsanaClient;
+import org.apache.nifi.controller.asana.AsanaClientProviderService;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.asana.utils.AsanaObject;
+import org.apache.nifi.processors.asana.utils.AsanaObjectFetcher;
+import org.apache.nifi.processors.asana.utils.AsanaObjectState;
+import org.apache.nifi.processors.asana.utils.AsanaProjectEventFetcher;
+import org.apache.nifi.processors.asana.utils.AsanaProjectFetcher;
+import org.apache.nifi.processors.asana.utils.AsanaProjectMembershipFetcher;
+import org.apache.nifi.processors.asana.utils.AsanaProjectStatusAttachmentFetcher;
+import org.apache.nifi.processors.asana.utils.AsanaProjectStatusFetcher;
+import org.apache.nifi.processors.asana.utils.AsanaStoryFetcher;
+import org.apache.nifi.processors.asana.utils.AsanaTagFetcher;
+import org.apache.nifi.processors.asana.utils.AsanaTaskAttachmentFetcher;
+import org.apache.nifi.processors.asana.utils.AsanaTaskFetcher;
+import org.apache.nifi.processors.asana.utils.AsanaTeamFetcher;
+import org.apache.nifi.processors.asana.utils.AsanaTeamMemberFetcher;
+import org.apache.nifi.processors.asana.utils.AsanaUserFetcher;
+import org.apache.nifi.reporting.InitializationException;
+
+@TriggerSerially
+@PrimaryNodeOnly
+@Stateful(scopes = {Scope.CLUSTER}, description = "Fingerprints of items in the last successful query are stored in order to enable incremental loading and change detection.")
+@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
+@WritesAttribute(attribute = GetAsanaObject.ASANA_GID, description = "Global ID of the object in Asana.")
+@Tags({"asana", "source", "ingest"})
+@CapabilityDescription("This processor collects data from Asana")
+public class GetAsanaObject extends AbstractProcessor {
+
+    protected static final String ASANA_GID = "asana.gid";
+    protected static final String AV_NAME_COLLECT_TASKS = "asana-collect-tasks";
+    protected static final String AV_NAME_COLLECT_TASK_ATTACHMENTS = "asana-collect-task-attachments";
+    protected static final String AV_NAME_COLLECT_PROJECTS = "asana-collect-projects";
+    protected static final String AV_NAME_COLLECT_TAGS = "asana-collect-tags";
+    protected static final String AV_NAME_COLLECT_USERS = "asana-collect-users";
+    protected static final String AV_NAME_COLLECT_PROJECT_MEMBERS = "asana-collect-project-members";
+    protected static final String AV_NAME_COLLECT_TEAMS = "asana-collect-teams";
+    protected static final String AV_NAME_COLLECT_TEAM_MEMBERS = "asana-collect-team-members";
+    protected static final String AV_NAME_COLLECT_STORIES = "asana-collect-stories";
+    protected static final String AV_NAME_COLLECT_PROJECT_STATUS_UPDATES = "asana-collect-project-status-updates";
+    protected static final String AV_NAME_COLLECT_PROJECT_STATUS_ATTACHMENTS = "asana-collect-project-status-attachments";
+    protected static final String AV_NAME_COLLECT_PROJECT_EVENTS = "asana-collect-project-events";
+    protected static final String ASANA_CONTROLLER_SERVICE = "asana-controller-service";
+    protected static final String ASANA_OBJECT_TYPE = "asana-object-type";
+    protected static final String ASANA_PROJECT_NAME = "asana-project-name";
+    protected static final String ASANA_SECTION_NAME = "asana-section-name";
+    protected static final String ASANA_TAG_NAME = "asana-tag-name";
+    protected static final String ASANA_TEAM_NAME = "asana-team-name";
+    protected static final String ASANA_OUTPUT_BATCH_SIZE = "asana-output-batch-size";
+    protected static final String REL_NAME_NEW = "new";
+    protected static final String REL_NAME_UPDATED = "updated";
+    protected static final String REL_NAME_REMOVED = "removed";
+
+    protected static final AllowableValue AV_COLLECT_TASKS = new AllowableValue(
+            AV_NAME_COLLECT_TASKS,
+            "Tasks",
+            "Collect tasks matching to the specified conditions.");
+
+    protected static final AllowableValue AV_COLLECT_TASK_ATTACHMENTS = new AllowableValue(
+            AV_NAME_COLLECT_TASK_ATTACHMENTS,
+            "Task Attachments",
+            "Collect attached files of tasks matching to the specified conditions."
+    );
+
+    protected static final AllowableValue AV_COLLECT_PROJECTS = new AllowableValue(
+            AV_NAME_COLLECT_PROJECTS,
+            "Projects",
+            "Collect projects of the workspace."
+    );
+
+    protected static final AllowableValue AV_COLLECT_TAGS = new AllowableValue(
+            AV_NAME_COLLECT_TAGS,
+            "Tags",
+            "Collect tags of the workspace."
+    );
+
+    protected static final AllowableValue AV_COLLECT_USERS = new AllowableValue(
+            AV_NAME_COLLECT_USERS,
+            "Users",
+            "Collect users assigned to the workspace."
+    );
+
+    protected static final AllowableValue AV_COLLECT_PROJECT_MEMBERS = new AllowableValue(
+            AV_NAME_COLLECT_PROJECT_MEMBERS,
+            "Members of a Project",
+            "Collect users assigned to the specified project."
+    );
+
+    protected static final AllowableValue AV_COLLECT_TEAMS = new AllowableValue(
+            AV_NAME_COLLECT_TEAMS,
+            "Teams",
+            "Collect teams of the workspace."
+    );
+
+    protected static final AllowableValue AV_COLLECT_TEAM_MEMBERS = new AllowableValue(
+            AV_NAME_COLLECT_TEAM_MEMBERS,
+            "Team Members",
+            "Collect users assigned to the specified team."
+    );
+
+    protected static final AllowableValue AV_COLLECT_STORIES = new AllowableValue(
+            AV_NAME_COLLECT_STORIES,
+            "Stories of Tasks",
+            "Collect stories (comments) of of tasks matching to the specified conditions."
+    );
+
+    protected static final AllowableValue AV_COLLECT_PROJECT_STATUS_UPDATES = new AllowableValue(
+            AV_NAME_COLLECT_PROJECT_STATUS_UPDATES,
+            "Status Updates of a Project",
+            "Collect status updates of the specified project."
+    );
+
+    protected static final AllowableValue AV_COLLECT_PROJECT_STATUS_ATTACHMENTS = new AllowableValue(
+            AV_NAME_COLLECT_PROJECT_STATUS_ATTACHMENTS,
+            "Attachments of Status Updates",
+            "Collect attached files of project status updates."
+    );
+
+    protected static final AllowableValue AV_COLLECT_PROJECT_EVENTS = new AllowableValue(
+            AV_NAME_COLLECT_PROJECT_EVENTS,
+            "Events of a Project",
+            "Collect various events happening on the specified project and on its' tasks."
+    );
+
+    protected static final PropertyDescriptor PROP_ASANA_CONTROLLER_SERVICE = new PropertyDescriptor.Builder()
+            .name(ASANA_CONTROLLER_SERVICE)
+            .displayName("Asana Controller Service")
+            .description("Specify which controller service to use for accessing Asana.")
+            .required(true)
+            .identifiesControllerService(AsanaClientProviderService.class)
+            .build();
+
+    protected static final PropertyDescriptor PROP_ASANA_OBJECT_TYPE = new PropertyDescriptor.Builder()
+            .name(ASANA_OBJECT_TYPE)
+            .displayName("Object Type")
+            .description("Specify what kind of objects to be collected from Asana")
+            .required(true)
+            .allowableValues(
+                    AV_COLLECT_TASKS,
+                    AV_COLLECT_TASK_ATTACHMENTS,
+                    AV_COLLECT_PROJECTS,
+                    AV_COLLECT_TAGS,
+                    AV_COLLECT_USERS,
+                    AV_COLLECT_PROJECT_MEMBERS,
+                    AV_COLLECT_TEAMS,
+                    AV_COLLECT_TEAM_MEMBERS,
+                    AV_COLLECT_STORIES,
+                    AV_COLLECT_PROJECT_STATUS_UPDATES,
+                    AV_COLLECT_PROJECT_STATUS_ATTACHMENTS,
+                    AV_COLLECT_PROJECT_EVENTS)
+            .defaultValue(AV_COLLECT_TASKS.getValue())
+            .build();
+
+    protected static final PropertyDescriptor PROP_ASANA_PROJECT = new PropertyDescriptor.Builder()
+            .name(ASANA_PROJECT_NAME)
+            .displayName("Project Name")
+            .description("Fetch only objects in this project. Case sensitive.")
+            .required(true)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .dependsOn(
+                PROP_ASANA_OBJECT_TYPE,
+                    AV_COLLECT_TASKS,
+                    AV_COLLECT_TASK_ATTACHMENTS,
+                    AV_COLLECT_PROJECT_MEMBERS,
+                    AV_COLLECT_STORIES,
+                    AV_COLLECT_PROJECT_STATUS_UPDATES,
+                    AV_COLLECT_PROJECT_STATUS_ATTACHMENTS,
+                    AV_COLLECT_PROJECT_EVENTS)
+            .build();
+
+    protected static final PropertyDescriptor PROP_ASANA_SECTION = new PropertyDescriptor.Builder()
+            .name(ASANA_SECTION_NAME)
+            .displayName("Section Name")
+            .description("Fetch only objects in this section. Case sensitive.")
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .dependsOn(PROP_ASANA_OBJECT_TYPE, AV_COLLECT_TASKS, AV_COLLECT_TASK_ATTACHMENTS, AV_COLLECT_STORIES)
+            .build();
+
+    protected static final PropertyDescriptor PROP_ASANA_TAG = new PropertyDescriptor.Builder()
+            .name(ASANA_TAG_NAME)
+            .displayName("Tag")
+            .description("Fetch only objects having this tag. Case sensitive.")
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .dependsOn(PROP_ASANA_OBJECT_TYPE, AV_COLLECT_TASKS, AV_COLLECT_TASK_ATTACHMENTS, AV_COLLECT_STORIES)
+            .build();
+
+    protected static final PropertyDescriptor PROP_ASANA_TEAM_NAME = new PropertyDescriptor.Builder()
+            .name(ASANA_TEAM_NAME)
+            .displayName("Team")
+            .description("Team name. Case sensitive.")
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .dependsOn(PROP_ASANA_OBJECT_TYPE, AV_COLLECT_TEAM_MEMBERS)
+            .build();
+
+    protected static final PropertyDescriptor PROP_ASANA_OUTPUT_BATCH_SIZE = new PropertyDescriptor.Builder()
+            .name(ASANA_OUTPUT_BATCH_SIZE)
+            .displayName("Output Batch Size")
+            .description("The number of items batched together in a single Flow File. If set to 1 (default), then each item is"
+                    + " transferred in a separate Flow File and each will have an asana.gid attribute, to help identifying"
+                    + " the fetched item on the server side, if needed. If the batch size is greater than 1, then the"
+                    + " specified amount of items are batched together in a single Flow File as a Json array, and the"
+                    + " Flow Files won't have the asana.gid attribute.")
+            .defaultValue("1")
+            .required(true)
+            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+            .build();
+
+    protected static final List<PropertyDescriptor> DESCRIPTORS = Collections.unmodifiableList(Arrays.asList(
+            PROP_ASANA_CONTROLLER_SERVICE,
+            PROP_ASANA_OBJECT_TYPE,
+            PROP_ASANA_PROJECT,
+            PROP_ASANA_SECTION,
+            PROP_ASANA_TEAM_NAME,
+            PROP_ASANA_TAG,
+            PROP_ASANA_OUTPUT_BATCH_SIZE
+    ));
+
+    protected static final Relationship REL_NEW = new Relationship.Builder()
+            .name(REL_NAME_NEW)
+            .description("Newly collected objects are routed to this relationship.")
+            .build();
+
+    protected static final Relationship REL_UPDATED = new Relationship.Builder()
+            .name(REL_NAME_UPDATED)
+            .description("Objects that have already been collected earlier, but were updated since, are routed to this relationship.")
+            .build();
+
+    protected static final Relationship REL_REMOVED = new Relationship.Builder()
+            .name(REL_NAME_REMOVED)
+            .description("Notification about deleted objects are routed to this relationship. "
+                    + "Flow files will not have any payload. IDs of the resources no longer exist "
+                    + "are carried by the asana.gid attribute of the generated FlowFiles.")
+            .build();
+
+    protected static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
+            REL_NEW,
+            REL_UPDATED,
+            REL_REMOVED
+    )));
+
+    private static final Scope STATE_STORAGE_SCOPE = Scope.CLUSTER;
+
+    private volatile AsanaObjectFetcher objectFetcher;
+    private volatile Integer batchSize;
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return RELATIONSHIPS;
+    }
+
+    @Override
+    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return DESCRIPTORS;
+    }
+
+    @OnScheduled
+    public synchronized void onScheduled(final ProcessContext context) throws InitializationException {
+        AsanaClientProviderService controllerService = context.getProperty(PROP_ASANA_CONTROLLER_SERVICE).asControllerService(AsanaClientProviderService.class);
+        AsanaClient client = controllerService.createClient();
+        batchSize = context.getProperty(PROP_ASANA_OUTPUT_BATCH_SIZE).asInteger();
+
+        try {
+            getLogger().debug("Initializing object fetcher...");
+            objectFetcher = createObjectFetcher(context, client);
+        } catch (Exception e) {
+            throw new InitializationException(e);
+        }
+    }
+
+    @Override
+    public synchronized void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+        try {
+            Map<String, String> state = recoverState(context).orElse(Collections.emptyMap());
+            getLogger().debug("Attempting to load state: {}", state);
+            objectFetcher.loadState(state);
+        } catch (Exception e) {
+            getLogger().info("Failed to recover state. Falling back to clean start.");
+            objectFetcher.clearState();
+        }
+        getLogger().debug("Initial state: {}", objectFetcher.saveState());
+
+        Collection<FlowFile> newItems = new ArrayList<>();
+        Collection<FlowFile> updatedItems = new ArrayList<>();
+        Collection<FlowFile> removedItems = new ArrayList<>();
+        Map<AsanaObjectState, Collection<FlowFile>> flowFiles = new HashMap<>();
+        flowFiles.put(AsanaObjectState.NEW, newItems);
+        flowFiles.put(AsanaObjectState.UPDATED, updatedItems);
+        flowFiles.put(AsanaObjectState.REMOVED, removedItems);
+
+        List<AsanaObject> allObjects = new ArrayList<>();
+
+        AsanaObject nextObject;
+        while ((nextObject = objectFetcher.fetchNext()) != null) {
+            allObjects.add(nextObject);
+        }
+
+        Map<AsanaObjectState, List<AsanaObject>> allObjectsByState = allObjects.stream()
+                .collect(groupingBy(AsanaObject::getState));
+
+        if (batchSize == 1) {
+            allObjectsByState
+                    .forEach((asanaObjectState, asanaObjects) -> asanaObjects.forEach(
+                            asanaObject -> {
+                                final Map<String, String> attributes = new HashMap<>(2);
+                                attributes.put(CoreAttributes.MIME_TYPE.key(), ContentType.APPLICATION_JSON.getMimeType());
+                                attributes.put(ASANA_GID, asanaObject.getGid());
+                                FlowFile flowFile = createFlowFileWithStringPayload(session, asanaObject.getContent());
+                                flowFile = session.putAllAttributes(flowFile, attributes);
+                                flowFiles.get(asanaObject.getState()).add(flowFile);
+                            }
+                    ));
+        } else {
+            allObjectsByState
+                    .forEach((asanaObjectState, asanaObjects) -> partition(asanaObjects, batchSize).forEach(
+                            asanaObjectsInPartition -> {
+                                FlowFile flowFile = createFlowFileWithStringPayload(session, format("[%s]",
+                                        asanaObjectsInPartition.stream().map(AsanaObject::getContent)
+                                                .collect(joining(","))));
+                                flowFile = session.putAllAttributes(flowFile,
+                                        singletonMap(CoreAttributes.MIME_TYPE.key(),
+                                                ContentType.APPLICATION_JSON.getMimeType()));
+                                flowFiles.get(asanaObjectState).add(flowFile);
+                            }
+                    ));
+        }
+
+        if (flowFiles.values().stream().allMatch(Collection::isEmpty)) {
+            context.yield();
+            getLogger().debug("Yielding, as there are no new FlowFiles.");
+        } else {
+            session.transfer(newItems, REL_NEW);
+            session.transfer(updatedItems, REL_UPDATED);
+            session.transfer(removedItems, REL_REMOVED);
+            session.commitAsync();
+        }
+        Map<String, String> state = objectFetcher.saveState();
+        try {
+            persistState(state, context);
+        } catch (IOException e) {
+            throw new ProcessException(e);
+        }

Review Comment:
   You can handle the state in a transaction with the session if you use `session.setState()` instead of `context.getStateManager().setState()` in `persistState()`. I would also move the exception handling into that method.
   The code would look like in that case:
   ```suggestion
           } else {
               session.transfer(newItems, REL_NEW);
               session.transfer(updatedItems, REL_UPDATED);
               session.transfer(removedItems, REL_REMOVED);
               
               Map<String, String> state = objectFetcher.saveState();
               persistState(state, session);
               
               session.commitAsync();
           }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org