You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by mc...@apache.org on 2020/01/08 18:27:00 UTC

[nifi] branch master updated: NIFI-6939: Upgrade Atlas client dependency to 2.0.0 NIFI-6939: Review changes

This is an automated email from the ASF dual-hosted git repository.

mcgilman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/master by this push:
     new cc74534  NIFI-6939: Upgrade Atlas client dependency to 2.0.0 NIFI-6939: Review changes
cc74534 is described below

commit cc74534bc0c4556816e2f3d9b04dbf76f73ab22c
Author: Peter Turcsanyi <tu...@apache.org>
AuthorDate: Thu Dec 19 16:53:42 2019 +0100

    NIFI-6939: Upgrade Atlas client dependency to 2.0.0
    NIFI-6939: Review changes
    
    This closes #3944
---
 .../nifi-atlas-reporting-task/pom.xml              |  37 ++-----
 .../org/apache/nifi/atlas/hook/NiFiAtlasHook.java  |  21 ++--
 .../apache/nifi/atlas/hook/NotificationSender.java |  66 +++++++-----
 .../AbstractNiFiProvenanceEventAnalyzer.java       |   2 +-
 .../apache/nifi/atlas/provenance/DataSetRefs.java  |   2 +-
 .../provenance/analyzer/AbstractHiveAnalyzer.java  |   2 +-
 .../nifi/atlas/provenance/analyzer/FilePath.java   |   2 +-
 .../nifi/atlas/provenance/analyzer/HBaseTable.java |   2 +-
 .../nifi/atlas/provenance/analyzer/HDFSPath.java   |   2 +-
 .../nifi/atlas/provenance/analyzer/Hive2JDBC.java  |   2 +-
 .../nifi/atlas/provenance/analyzer/KafkaTopic.java |   2 +-
 .../atlas/provenance/analyzer/NiFiRemotePort.java  |   2 +-
 .../provenance/analyzer/NiFiRootGroupPort.java     |   2 +-
 .../provenance/analyzer/PutHiveStreaming.java      |   2 +-
 .../analyzer/unknown/UnknownDataSet.java           |   2 +-
 .../provenance/analyzer/unknown/UnknownInput.java  |   2 +-
 .../provenance/analyzer/unknown/UnknownOutput.java |   2 +-
 .../lineage/AbstractLineageStrategy.java           |  25 +++--
 .../lineage/CompleteFlowPathLineage.java           |   2 +-
 .../atlas/provenance/lineage/LineageContext.java   |   4 +-
 .../provenance/lineage/SimpleFlowPathLineage.java  |   2 +-
 .../nifi/atlas/reporting/ReportLineageToAtlas.java |  71 +++++++------
 .../org/apache/nifi/atlas/security/Kerberos.java   |  18 ++--
 .../atlas/emulator/AtlasAPIV2ServerEmulator.java   |  23 ++--
 .../emulator/AtlasNotificationServerEmulator.java  |   8 +-
 .../apache/nifi/atlas/emulator/EmbeddedKafka.java  |  73 ++-----------
 .../apache/nifi/atlas/hook/TestNiFiAtlasHook.java  |  10 +-
 .../nifi/atlas/hook/TestNotificationSender.java    | 116 +++++++++++----------
 .../atlas/provenance/analyzer/TestHBaseTable.java  |   2 +-
 .../atlas/provenance/analyzer/TestHDFSPath.java    |   2 +-
 .../atlas/provenance/analyzer/TestHive2JDBC.java   |   2 +-
 .../atlas/provenance/analyzer/TestKafkaTopic.java  |   2 +-
 .../provenance/analyzer/TestNiFiRemotePort.java    |   2 +-
 .../provenance/analyzer/TestNiFiRootGroupPort.java |   2 +-
 .../provenance/analyzer/TestPutHiveStreaming.java  |   2 +-
 .../provenance/analyzer/TestUnknownDataSet.java    |   2 +-
 .../atlas/reporting/ITReportLineageToAtlas.java    |  34 +++++-
 .../atlas/reporting/TestReportLineageToAtlas.java  | 109 ++++++++++++++++---
 .../test/resources/atlas-application.properties    |   2 +
 .../src/test/resources/server.properties           |  61 +++++++----
 nifi-nar-bundles/nifi-atlas-bundle/pom.xml         |  86 +--------------
 41 files changed, 403 insertions(+), 409 deletions(-)

diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/pom.xml b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/pom.xml
index 353cf51..d9d8e39 100644
--- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/pom.xml
+++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/pom.xml
@@ -48,49 +48,24 @@
             <groupId>org.apache.nifi</groupId>
             <artifactId>nifi-kerberos-credentials-service-api</artifactId>
         </dependency>
-        <!-- Explicitly force beanutils 1.9.3 because versions prior to 1.9.2 had a vuln
-             Can remove this once atlas client which depends on hadoop-common uses a more recent version -->
+
+        <!-- Explicitly force beanutils 1.9.4 in order to avoid vulnerabilities in earlier versions.
+             Can remove this once atlas client which depends on hadoop-common uses a more recent version. -->
         <dependency>
             <groupId>commons-beanutils</groupId>
             <artifactId>commons-beanutils</artifactId>
             <version>1.9.4</version>
         </dependency>
+
         <dependency>
             <groupId>org.apache.atlas</groupId>
-            <artifactId>atlas-client</artifactId>
-            <!-- Exclude dependencies to reduce NAR file size -->
-            <exclusions>
-                <!-- NOTE: Scala is required by atlas notification -->
-                <!--
-                   fastutil-6.5.16.jar is 16MB.
-                   'fastutil' is only used by
-                   org.apache.atlas.repository.memory.AttributeStores
-                   which is deprecated as being part of V1 API.
-                -->
-                <exclusion>
-                    <groupId>it.unimi.dsi</groupId>
-                    <artifactId>fastutil</artifactId>
-                </exclusion>
-                <!-- Explicit dep referred to in POM above.  commons-beanutils and commons-beanutils-core merged in 1.9.0 -->
-                <exclusion>
-                    <groupId>commons-beanutils</groupId>
-                    <artifactId>commons-beanutils-core</artifactId>
-                </exclusion>
-            </exclusions>
+            <artifactId>atlas-client-v2</artifactId>
         </dependency>
         <dependency>
             <groupId>org.apache.atlas</groupId>
             <artifactId>atlas-notification</artifactId>
         </dependency>
-        <!--
-        NOTE: Could not use nifi-hadoop-libraries-nar because hadoop-client uses httpclient-4.2.5,
-        but atlas-client uses httpclient-4.5.3.
-        <dependency>
-            <groupId>org.apache.hadoop</groupId>
-            <artifactId>hadoop-common</artifactId>
-            <scope>provided</scope>
-        </dependency>
-        -->
+
         <dependency>
             <groupId>org.codehaus.jettison</groupId>
             <artifactId>jettison</artifactId>
diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/hook/NiFiAtlasHook.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/hook/NiFiAtlasHook.java
index 4962a70..3f778e3 100644
--- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/hook/NiFiAtlasHook.java
+++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/hook/NiFiAtlasHook.java
@@ -17,7 +17,7 @@
 package org.apache.nifi.atlas.hook;
 
 import org.apache.atlas.hook.AtlasHook;
-import org.apache.atlas.notification.hook.HookNotification.HookNotificationMessage;
+import org.apache.atlas.model.notification.HookNotification;
 import org.apache.nifi.atlas.NiFiAtlasClient;
 import org.apache.nifi.atlas.provenance.lineage.LineageContext;
 
@@ -26,38 +26,29 @@ import java.util.List;
 
 /**
  * This class is not thread-safe as it holds uncommitted notification messages within instance.
- * {@link #addMessage(HookNotificationMessage)} and {@link #commitMessages()} should be used serially from a single thread.
+ * {@link #addMessage(HookNotification)} and {@link #commitMessages()} should be used serially from a single thread.
  */
 public class NiFiAtlasHook extends AtlasHook implements LineageContext {
 
     public static final String NIFI_USER = "nifi";
 
-    private static final String CONF_PREFIX = "atlas.hook.nifi.";
-    private static final String HOOK_NUM_RETRIES = CONF_PREFIX + "numRetries";
-
     private NiFiAtlasClient atlasClient;
 
     public void setAtlasClient(NiFiAtlasClient atlasClient) {
         this.atlasClient = atlasClient;
     }
 
-    @Override
-    protected String getNumberOfRetriesPropertyKey() {
-        return HOOK_NUM_RETRIES;
-    }
-
-
-    private final List<HookNotificationMessage> messages = new ArrayList<>();
+    private final List<HookNotification> messages = new ArrayList<>();
 
     @Override
-    public void addMessage(HookNotificationMessage message) {
+    public void addMessage(HookNotification message) {
         messages.add(message);
     }
 
     public void commitMessages() {
         final NotificationSender notificationSender = createNotificationSender();
         notificationSender.setAtlasClient(atlasClient);
-        List<HookNotificationMessage> messagesBatch = new ArrayList<>(messages);
+        List<HookNotification> messagesBatch = new ArrayList<>(messages);
         messages.clear();
         notificationSender.send(messagesBatch, this::notifyEntities);
     }
@@ -72,7 +63,7 @@ public class NiFiAtlasHook extends AtlasHook implements LineageContext {
         return new NotificationSender();
     }
 
-    List<HookNotificationMessage> getMessages() {
+    List<HookNotification> getMessages() {
         return messages;
     }
 }
diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/hook/NotificationSender.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/hook/NotificationSender.java
index 66c21df..0ae711f 100644
--- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/hook/NotificationSender.java
+++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/hook/NotificationSender.java
@@ -20,9 +20,11 @@ import com.sun.jersey.api.client.ClientResponse;
 import org.apache.atlas.AtlasServiceException;
 import org.apache.atlas.model.instance.AtlasEntity;
 import org.apache.atlas.model.instance.AtlasObjectId;
-import org.apache.atlas.notification.hook.HookNotification;
-import org.apache.atlas.typesystem.Referenceable;
-import org.apache.atlas.typesystem.persistence.Id;
+import org.apache.atlas.model.notification.HookNotification;
+import org.apache.atlas.v1.model.instance.Id;
+import org.apache.atlas.v1.model.instance.Referenceable;
+import org.apache.atlas.v1.model.notification.HookNotificationV1;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.nifi.atlas.AtlasUtils;
 import org.apache.nifi.atlas.NiFiAtlasClient;
 import org.apache.nifi.util.Tuple;
@@ -39,23 +41,24 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
-import java.util.function.Consumer;
+import java.util.UUID;
+import java.util.function.BiConsumer;
 import java.util.function.Predicate;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 import static java.util.stream.Collectors.groupingBy;
 import static java.util.stream.Collectors.toMap;
-import static org.apache.atlas.notification.hook.HookNotification.HookNotificationType.ENTITY_CREATE;
-import static org.apache.atlas.notification.hook.HookNotification.HookNotificationType.ENTITY_PARTIAL_UPDATE;
+import static org.apache.atlas.model.notification.HookNotification.HookNotificationType.ENTITY_CREATE;
+import static org.apache.atlas.model.notification.HookNotification.HookNotificationType.ENTITY_PARTIAL_UPDATE;
 import static org.apache.nifi.atlas.AtlasUtils.toTypedQualifiedName;
-import static org.apache.nifi.atlas.hook.NiFiAtlasHook.NIFI_USER;
 import static org.apache.nifi.atlas.NiFiTypes.ATTR_GUID;
 import static org.apache.nifi.atlas.NiFiTypes.ATTR_INPUTS;
 import static org.apache.nifi.atlas.NiFiTypes.ATTR_OUTPUTS;
 import static org.apache.nifi.atlas.NiFiTypes.ATTR_QUALIFIED_NAME;
 import static org.apache.nifi.atlas.NiFiTypes.ATTR_TYPENAME;
 import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_FLOW_PATH;
+import static org.apache.nifi.atlas.hook.NiFiAtlasHook.NIFI_USER;
 
 /**
  * This class implements Atlas hook notification message deduplication mechanism.
@@ -175,7 +178,7 @@ class NotificationSender {
      * <p>Send hook notification messages.
      * In order to notify relationships between 'nifi_flow_path' and its inputs/outputs, this method sends messages in following order:</p>
      * <ol>
-     *     <li>As a a single {@link org.apache.atlas.notification.hook.HookNotification.EntityCreateRequest} message:
+     *     <li>As a a single {@link org.apache.atlas.v1.model.notification.HookNotificationV1.EntityCreateRequest} message:
      *         <ul>
      *             <li>New entities except 'nifi_flow_path', including DataSets such as 'nifi_queue', 'kafka_topic' or 'hive_table' ... etc,
      *             so that 'nifi_flow_path' can refer</li>
@@ -190,21 +193,21 @@ class NotificationSender {
      * @param messages list of messages to be sent
      * @param notifier responsible for sending notification messages, its accept method can be called multiple times
      */
-    void send(final List<HookNotification.HookNotificationMessage> messages, final Consumer<List<HookNotification.HookNotificationMessage>> notifier) {
+    void send(final List<HookNotification> messages, final BiConsumer<List<HookNotification>, UserGroupInformation> notifier) {
         logger.info("Sending {} messages to Atlas", messages.size());
 
         final Metrics metrics = new Metrics();
         try {
             metrics.totalMessages = messages.size();
 
-            final Map<Boolean, List<HookNotification.HookNotificationMessage>> createAndOthers = messages.stream().collect(groupingBy(msg -> ENTITY_CREATE.equals(msg.getType())));
+            final Map<Boolean, List<HookNotification>> createAndOthers = messages.stream().collect(groupingBy(msg -> ENTITY_CREATE.equals(msg.getType())));
 
-            final List<HookNotification.HookNotificationMessage> creates = safeGet(createAndOthers, true);
+            final List<HookNotification> creates = safeGet(createAndOthers, true);
             metrics.createMessages = creates.size();
 
             final Map<Boolean, List<Referenceable>> newFlowPathsAndOtherEntities = creates.stream()
-                    .flatMap(msg -> ((HookNotification.EntityCreateRequest) msg).getEntities().stream())
-                    .collect(groupingBy(ref -> TYPE_NIFI_FLOW_PATH.equals(ref.typeName)));
+                    .flatMap(msg -> ((HookNotificationV1.EntityCreateRequest) msg).getEntities().stream())
+                    .collect(groupingBy(ref -> TYPE_NIFI_FLOW_PATH.equals(ref.getTypeName())));
 
             // Deduplicate same entity creation messages.
             final List<Referenceable> newEntitiesExceptFlowPaths = safeGet(newFlowPathsAndOtherEntities, false)
@@ -227,28 +230,28 @@ class NotificationSender {
             newEntities.addAll(newEntitiesExceptFlowPaths);
             newEntities.addAll(newFlowPaths);
             if (!newEntities.isEmpty()) {
-                notifier.accept(Collections.singletonList(new HookNotification.EntityCreateRequest(NIFI_USER, newEntities)));
+                notifier.accept(Collections.singletonList(new HookNotificationV1.EntityCreateRequest(NIFI_USER, newEntities)), null);
             }
 
-            final Map<Boolean, List<HookNotification.HookNotificationMessage>> partialNiFiFlowPathUpdateAndOthers
+            final Map<Boolean, List<HookNotification>> partialNiFiFlowPathUpdateAndOthers
                     = safeGet(createAndOthers, false).stream().collect(groupingBy(msg
                     -> ENTITY_PARTIAL_UPDATE.equals(msg.getType())
-                    && TYPE_NIFI_FLOW_PATH.equals(((HookNotification.EntityPartialUpdateRequest)msg).getTypeName())
-                    && ATTR_QUALIFIED_NAME.equals(((HookNotification.EntityPartialUpdateRequest)msg).getAttribute())
+                    && TYPE_NIFI_FLOW_PATH.equals(((HookNotificationV1.EntityPartialUpdateRequest)msg).getTypeName())
+                    && ATTR_QUALIFIED_NAME.equals(((HookNotificationV1.EntityPartialUpdateRequest)msg).getAttribute())
             ));
 
 
             // These updates are made against existing flow path entities.
-            final List<HookNotification.HookNotificationMessage> partialNiFiFlowPathUpdates = safeGet(partialNiFiFlowPathUpdateAndOthers, true);
-            final List<HookNotification.HookNotificationMessage> otherMessages = safeGet(partialNiFiFlowPathUpdateAndOthers, false);
+            final List<HookNotification> partialNiFiFlowPathUpdates = safeGet(partialNiFiFlowPathUpdateAndOthers, true);
+            final List<HookNotification> otherMessages = safeGet(partialNiFiFlowPathUpdateAndOthers, false);
             metrics.partialNiFiFlowPathUpdates = partialNiFiFlowPathUpdates.size();
             metrics.otherMessages = otherMessages.size();
 
 
             // 2. Notify de-duplicated 'nifi_flow_path' updates
-            final List<HookNotification.HookNotificationMessage> deduplicatedMessages = partialNiFiFlowPathUpdates.stream().map(msg -> (HookNotification.EntityPartialUpdateRequest) msg)
+            final List<HookNotification> deduplicatedMessages = partialNiFiFlowPathUpdates.stream().map(msg -> (HookNotificationV1.EntityPartialUpdateRequest) msg)
                     // Group by nifi_flow_path qualifiedName value.
-                    .collect(groupingBy(HookNotification.EntityPartialUpdateRequest::getAttributeValue)).entrySet().stream()
+                    .collect(groupingBy(HookNotificationV1.EntityPartialUpdateRequest::getAttributeValue)).entrySet().stream()
                     .map(entry -> {
                         final String flowPathQualifiedName = entry.getKey();
                         final Map<String, Referenceable> distinctInputs;
@@ -274,7 +277,7 @@ class NotificationSender {
                         }
 
                         // Merge all inputs and outputs for this nifi_flow_path.
-                        for (HookNotification.EntityPartialUpdateRequest msg : entry.getValue()) {
+                        for (HookNotificationV1.EntityPartialUpdateRequest msg : entry.getValue()) {
                             fromReferenceable(msg.getEntity().get(ATTR_INPUTS), metrics)
                                     .entrySet().stream().filter(ref -> !distinctInputs.containsKey(ref.getKey()))
                                     .forEach(ref -> distinctInputs.put(ref.getKey(), ref.getValue()));
@@ -290,17 +293,17 @@ class NotificationSender {
                         // org.json4s.package$MappingException: Can't find ScalaSig for class org.apache.atlas.typesystem.Referenceable
                         flowPathRef.set(ATTR_INPUTS, new ArrayList<>(distinctInputs.values()));
                         flowPathRef.set(ATTR_OUTPUTS, new ArrayList<>(distinctOutputs.values()));
-                        return new HookNotification.EntityPartialUpdateRequest(NIFI_USER, TYPE_NIFI_FLOW_PATH,
+                        return new HookNotificationV1.EntityPartialUpdateRequest(NIFI_USER, TYPE_NIFI_FLOW_PATH,
                                 ATTR_QUALIFIED_NAME, flowPathQualifiedName, flowPathRef);
                     })
                     .filter(Objects::nonNull)
                     .collect(Collectors.toList());
 
             metrics.uniquePartialNiFiFlowPathUpdates = deduplicatedMessages.size();
-            notifier.accept(deduplicatedMessages);
+            notifier.accept(deduplicatedMessages, null);
 
             // 3. Notify other messages
-            notifier.accept(otherMessages);
+            notifier.accept(otherMessages, null);
 
         } finally {
             logger.info(metrics.toLogString("Finished"));
@@ -380,7 +383,7 @@ class NotificationSender {
             final String typedRefQualifiedName = toTypedQualifiedName(typeName, refQualifiedName);
 
             final Referenceable refFromCacheIfAvailable = typedQualifiedNameToRef.computeIfAbsent(typedRefQualifiedName, k -> {
-                if (id.isAssigned()) {
+                if (isAssigned(id)) {
                     // If this referenceable has Guid assigned, then add this one to cache.
                     guidToTypedQualifiedName.put(id._getId(), typedRefQualifiedName);
                 }
@@ -391,4 +394,15 @@ class NotificationSender {
         }).filter(tuple -> tuple.getValue() != null)
                 .collect(toMap(Tuple::getKey, Tuple::getValue));
     }
+
+    // Copy of org.apache.atlas.typesystem.persistence.Id.isAssigned() from v0.8.1. This method does not exists in v2.0.0.
+    private boolean isAssigned(Id id) {
+        try {
+            UUID.fromString(id.getId());
+        } catch (IllegalArgumentException e) {
+            return false;
+        }
+
+        return true;
+    }
 }
diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/AbstractNiFiProvenanceEventAnalyzer.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/AbstractNiFiProvenanceEventAnalyzer.java
index 069276a..9269fdf 100644
--- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/AbstractNiFiProvenanceEventAnalyzer.java
+++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/AbstractNiFiProvenanceEventAnalyzer.java
@@ -16,7 +16,7 @@
  */
 package org.apache.nifi.atlas.provenance;
 
-import org.apache.atlas.typesystem.Referenceable;
+import org.apache.atlas.v1.model.instance.Referenceable;
 import org.apache.nifi.provenance.ProvenanceEventType;
 
 import java.net.MalformedURLException;
diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/DataSetRefs.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/DataSetRefs.java
index 4f745d1..4b038e4 100644
--- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/DataSetRefs.java
+++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/DataSetRefs.java
@@ -16,7 +16,7 @@
  */
 package org.apache.nifi.atlas.provenance;
 
-import org.apache.atlas.typesystem.Referenceable;
+import org.apache.atlas.v1.model.instance.Referenceable;
 
 import java.util.Collections;
 import java.util.LinkedHashSet;
diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/AbstractHiveAnalyzer.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/AbstractHiveAnalyzer.java
index 879b2ee..3b3b105 100644
--- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/AbstractHiveAnalyzer.java
+++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/AbstractHiveAnalyzer.java
@@ -16,7 +16,7 @@
  */
 package org.apache.nifi.atlas.provenance.analyzer;
 
-import org.apache.atlas.typesystem.Referenceable;
+import org.apache.atlas.v1.model.instance.Referenceable;
 import org.apache.nifi.atlas.provenance.AbstractNiFiProvenanceEventAnalyzer;
 import org.apache.nifi.util.Tuple;
 
diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/FilePath.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/FilePath.java
index 37df736..7a1a589 100644
--- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/FilePath.java
+++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/FilePath.java
@@ -16,7 +16,7 @@
  */
 package org.apache.nifi.atlas.provenance.analyzer;
 
-import org.apache.atlas.typesystem.Referenceable;
+import org.apache.atlas.v1.model.instance.Referenceable;
 import org.apache.nifi.atlas.provenance.AbstractNiFiProvenanceEventAnalyzer;
 import org.apache.nifi.atlas.provenance.AnalysisContext;
 import org.apache.nifi.atlas.provenance.DataSetRefs;
diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/HBaseTable.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/HBaseTable.java
index 0f446fb..c9e7400 100644
--- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/HBaseTable.java
+++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/HBaseTable.java
@@ -16,7 +16,7 @@
  */
 package org.apache.nifi.atlas.provenance.analyzer;
 
-import org.apache.atlas.typesystem.Referenceable;
+import org.apache.atlas.v1.model.instance.Referenceable;
 import org.apache.nifi.atlas.provenance.AbstractNiFiProvenanceEventAnalyzer;
 import org.apache.nifi.atlas.provenance.AnalysisContext;
 import org.apache.nifi.atlas.provenance.DataSetRefs;
diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/HDFSPath.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/HDFSPath.java
index b1ef828..5cf77ab 100644
--- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/HDFSPath.java
+++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/HDFSPath.java
@@ -16,7 +16,7 @@
  */
 package org.apache.nifi.atlas.provenance.analyzer;
 
-import org.apache.atlas.typesystem.Referenceable;
+import org.apache.atlas.v1.model.instance.Referenceable;
 import org.apache.nifi.atlas.provenance.AbstractNiFiProvenanceEventAnalyzer;
 import org.apache.nifi.atlas.provenance.AnalysisContext;
 import org.apache.nifi.atlas.provenance.DataSetRefs;
diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/Hive2JDBC.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/Hive2JDBC.java
index ccbbc66..ffed41f 100644
--- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/Hive2JDBC.java
+++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/Hive2JDBC.java
@@ -16,7 +16,7 @@
  */
 package org.apache.nifi.atlas.provenance.analyzer;
 
-import org.apache.atlas.typesystem.Referenceable;
+import org.apache.atlas.v1.model.instance.Referenceable;
 import org.apache.nifi.atlas.provenance.AnalysisContext;
 import org.apache.nifi.atlas.provenance.DataSetRefs;
 import org.apache.nifi.provenance.ProvenanceEventRecord;
diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/KafkaTopic.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/KafkaTopic.java
index ff86166..acdaef4 100644
--- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/KafkaTopic.java
+++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/KafkaTopic.java
@@ -16,7 +16,7 @@
  */
 package org.apache.nifi.atlas.provenance.analyzer;
 
-import org.apache.atlas.typesystem.Referenceable;
+import org.apache.atlas.v1.model.instance.Referenceable;
 import org.apache.nifi.atlas.provenance.AbstractNiFiProvenanceEventAnalyzer;
 import org.apache.nifi.atlas.provenance.AnalysisContext;
 import org.apache.nifi.atlas.provenance.DataSetRefs;
diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/NiFiRemotePort.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/NiFiRemotePort.java
index 69b2d47..f7ea33e 100644
--- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/NiFiRemotePort.java
+++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/NiFiRemotePort.java
@@ -16,7 +16,7 @@
  */
 package org.apache.nifi.atlas.provenance.analyzer;
 
-import org.apache.atlas.typesystem.Referenceable;
+import org.apache.atlas.v1.model.instance.Referenceable;
 import org.apache.nifi.atlas.provenance.AnalysisContext;
 import org.apache.nifi.atlas.provenance.DataSetRefs;
 import org.apache.nifi.controller.status.ConnectionStatus;
diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/NiFiRootGroupPort.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/NiFiRootGroupPort.java
index e791c62..75ebb44 100644
--- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/NiFiRootGroupPort.java
+++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/NiFiRootGroupPort.java
@@ -16,7 +16,7 @@
  */
 package org.apache.nifi.atlas.provenance.analyzer;
 
-import org.apache.atlas.typesystem.Referenceable;
+import org.apache.atlas.v1.model.instance.Referenceable;
 import org.apache.nifi.atlas.provenance.AnalysisContext;
 import org.apache.nifi.atlas.provenance.DataSetRefs;
 import org.apache.nifi.controller.status.ConnectionStatus;
diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/PutHiveStreaming.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/PutHiveStreaming.java
index 7b41c57..78c37ea 100644
--- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/PutHiveStreaming.java
+++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/PutHiveStreaming.java
@@ -16,7 +16,7 @@
  */
 package org.apache.nifi.atlas.provenance.analyzer;
 
-import org.apache.atlas.typesystem.Referenceable;
+import org.apache.atlas.v1.model.instance.Referenceable;
 import org.apache.nifi.atlas.provenance.AnalysisContext;
 import org.apache.nifi.atlas.provenance.DataSetRefs;
 import org.apache.nifi.provenance.ProvenanceEventRecord;
diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/unknown/UnknownDataSet.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/unknown/UnknownDataSet.java
index 42e407d..0117b19 100644
--- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/unknown/UnknownDataSet.java
+++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/unknown/UnknownDataSet.java
@@ -16,7 +16,7 @@
  */
 package org.apache.nifi.atlas.provenance.analyzer.unknown;
 
-import org.apache.atlas.typesystem.Referenceable;
+import org.apache.atlas.v1.model.instance.Referenceable;
 import org.apache.nifi.atlas.provenance.AbstractNiFiProvenanceEventAnalyzer;
 import org.apache.nifi.atlas.provenance.AnalysisContext;
 import org.apache.nifi.provenance.ProvenanceEventRecord;
diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/unknown/UnknownInput.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/unknown/UnknownInput.java
index f16908b..850f4cf 100644
--- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/unknown/UnknownInput.java
+++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/unknown/UnknownInput.java
@@ -16,7 +16,7 @@
  */
 package org.apache.nifi.atlas.provenance.analyzer.unknown;
 
-import org.apache.atlas.typesystem.Referenceable;
+import org.apache.atlas.v1.model.instance.Referenceable;
 import org.apache.nifi.atlas.provenance.AnalysisContext;
 import org.apache.nifi.atlas.provenance.DataSetRefs;
 import org.apache.nifi.provenance.ProvenanceEventRecord;
diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/unknown/UnknownOutput.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/unknown/UnknownOutput.java
index 5d564c2..9fc9253 100644
--- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/unknown/UnknownOutput.java
+++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/unknown/UnknownOutput.java
@@ -16,7 +16,7 @@
  */
 package org.apache.nifi.atlas.provenance.analyzer.unknown;
 
-import org.apache.atlas.typesystem.Referenceable;
+import org.apache.atlas.v1.model.instance.Referenceable;
 import org.apache.nifi.atlas.provenance.AnalysisContext;
 import org.apache.nifi.atlas.provenance.DataSetRefs;
 import org.apache.nifi.provenance.ProvenanceEventRecord;
diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/lineage/AbstractLineageStrategy.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/lineage/AbstractLineageStrategy.java
index a253c7d..ba53bf0 100644
--- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/lineage/AbstractLineageStrategy.java
+++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/lineage/AbstractLineageStrategy.java
@@ -17,8 +17,9 @@
 package org.apache.nifi.atlas.provenance.lineage;
 
 import org.apache.atlas.model.instance.AtlasObjectId;
-import org.apache.atlas.notification.hook.HookNotification;
-import org.apache.atlas.typesystem.Referenceable;
+import org.apache.atlas.v1.model.instance.Id;
+import org.apache.atlas.v1.model.instance.Referenceable;
+import org.apache.atlas.v1.model.notification.HookNotificationV1;
 import org.apache.nifi.atlas.NiFiFlow;
 import org.apache.nifi.atlas.NiFiFlowPath;
 import org.apache.nifi.atlas.provenance.AnalysisContext;
@@ -40,7 +41,6 @@ import java.util.stream.Collectors;
 
 import static org.apache.nifi.atlas.AtlasUtils.toStr;
 import static org.apache.nifi.atlas.AtlasUtils.toTypedQualifiedName;
-import static org.apache.nifi.atlas.hook.NiFiAtlasHook.NIFI_USER;
 import static org.apache.nifi.atlas.NiFiTypes.ATTR_INPUTS;
 import static org.apache.nifi.atlas.NiFiTypes.ATTR_NAME;
 import static org.apache.nifi.atlas.NiFiTypes.ATTR_NIFI_FLOW;
@@ -49,6 +49,7 @@ import static org.apache.nifi.atlas.NiFiTypes.ATTR_QUALIFIED_NAME;
 import static org.apache.nifi.atlas.NiFiTypes.ATTR_URL;
 import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_FLOW;
 import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_FLOW_PATH;
+import static org.apache.nifi.atlas.hook.NiFiAtlasHook.NIFI_USER;
 
 public abstract class AbstractLineageStrategy implements LineageStrategy {
 
@@ -131,7 +132,7 @@ public abstract class AbstractLineageStrategy implements LineageStrategy {
     }
 
     protected void createEntity(Referenceable ... entities) {
-        final HookNotification.EntityCreateRequest msg = new HookNotification.EntityCreateRequest(NIFI_USER, entities);
+        final HookNotificationV1.EntityCreateRequest msg = new HookNotificationV1.EntityCreateRequest(NIFI_USER, entities);
         lineageContext.addMessage(msg);
     }
 
@@ -146,11 +147,11 @@ public abstract class AbstractLineageStrategy implements LineageStrategy {
 
             refsToAdd.stream().filter(ref -> !existingRefTypedQualifiedNames.contains(toTypedQualifiedName.apply(ref)))
                     .forEach(ref -> {
-                        if (ref.getId().isUnassigned()) {
+                        if (isUnassigned(ref.getId())) {
                             // Create new entity.
                             logger.debug("Found a new DataSet reference from {} to {}, sending an EntityCreateRequest",
                                     new Object[]{toTypedQualifiedName.apply(nifiFlowPath), toTypedQualifiedName.apply(ref)});
-                            final HookNotification.EntityCreateRequest createDataSet = new HookNotification.EntityCreateRequest(NIFI_USER, ref);
+                            final HookNotificationV1.EntityCreateRequest createDataSet = new HookNotificationV1.EntityCreateRequest(NIFI_USER, ref);
                             lineageContext.addMessage(createDataSet);
                         }
                         refs.add(ref);
@@ -169,10 +170,18 @@ public abstract class AbstractLineageStrategy implements LineageStrategy {
         final boolean inputsAdded = addDataSetRefs(dataSetRefs.getInputs(), flowPathRef, ATTR_INPUTS);
         final boolean outputsAdded = addDataSetRefs(dataSetRefs.getOutputs(), flowPathRef, ATTR_OUTPUTS);
         if (inputsAdded || outputsAdded) {
-            lineageContext.addMessage(new HookNotification.EntityPartialUpdateRequest(NIFI_USER, TYPE_NIFI_FLOW_PATH,
+            lineageContext.addMessage(new HookNotificationV1.EntityPartialUpdateRequest(NIFI_USER, TYPE_NIFI_FLOW_PATH,
                     ATTR_QUALIFIED_NAME, (String) flowPathRef.get(ATTR_QUALIFIED_NAME), flowPathRef));
         }
     }
 
-
+    // Copy of org.apache.atlas.typesystem.persistence.Id.isUnassigned() from v0.8.1. This method does not exists in v2.0.0.
+    private boolean isUnassigned(Id id) {
+        try {
+            long l = Long.parseLong(id.getId());
+            return l < 0;
+        } catch (NumberFormatException ne) {
+            return false;
+        }
+    }
 }
diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/lineage/CompleteFlowPathLineage.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/lineage/CompleteFlowPathLineage.java
index d3ac658..03c9eaa 100644
--- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/lineage/CompleteFlowPathLineage.java
+++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/lineage/CompleteFlowPathLineage.java
@@ -16,7 +16,7 @@
  */
 package org.apache.nifi.atlas.provenance.lineage;
 
-import org.apache.atlas.typesystem.Referenceable;
+import org.apache.atlas.v1.model.instance.Referenceable;
 import org.apache.nifi.atlas.NiFiFlow;
 import org.apache.nifi.atlas.NiFiFlowPath;
 import org.apache.nifi.atlas.provenance.AnalysisContext;
diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/lineage/LineageContext.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/lineage/LineageContext.java
index 060cfbe..32405ed 100644
--- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/lineage/LineageContext.java
+++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/lineage/LineageContext.java
@@ -16,8 +16,8 @@
  */
 package org.apache.nifi.atlas.provenance.lineage;
 
-import org.apache.atlas.notification.hook.HookNotification;
+import org.apache.atlas.model.notification.HookNotification;
 
 public interface LineageContext {
-    void addMessage(HookNotification.HookNotificationMessage message);
+    void addMessage(HookNotification message);
 }
diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/lineage/SimpleFlowPathLineage.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/lineage/SimpleFlowPathLineage.java
index 7ecbed5..4d86c78 100644
--- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/lineage/SimpleFlowPathLineage.java
+++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/lineage/SimpleFlowPathLineage.java
@@ -16,7 +16,7 @@
  */
 package org.apache.nifi.atlas.provenance.lineage;
 
-import org.apache.atlas.typesystem.Referenceable;
+import org.apache.atlas.v1.model.instance.Referenceable;
 import org.apache.nifi.atlas.NiFiFlow;
 import org.apache.nifi.atlas.NiFiFlowPath;
 import org.apache.nifi.atlas.provenance.AnalysisContext;
diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/reporting/ReportLineageToAtlas.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/reporting/ReportLineageToAtlas.java
index 80bad7a..1560c7d 100644
--- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/reporting/ReportLineageToAtlas.java
+++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/reporting/ReportLineageToAtlas.java
@@ -16,36 +16,10 @@
  */
 package org.apache.nifi.atlas.reporting;
 
-import static org.apache.commons.lang3.StringUtils.isEmpty;
-import static org.apache.nifi.reporting.util.provenance.ProvenanceEventConsumer.PROVENANCE_BATCH_SIZE;
-import static org.apache.nifi.reporting.util.provenance.ProvenanceEventConsumer.PROVENANCE_START_POSITION;
-
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.net.MalformedURLException;
-import java.net.URL;
-import java.time.Instant;
-import java.time.ZoneOffset;
-import java.time.format.DateTimeFormatter;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.ServiceLoader;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-import java.util.function.Consumer;
-import java.util.stream.Stream;
-
+import com.sun.jersey.api.client.ClientResponse;
 import org.apache.atlas.ApplicationProperties;
 import org.apache.atlas.AtlasServiceException;
+import org.apache.atlas.hook.AtlasHook;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.common.config.SslConfigs;
@@ -57,10 +31,10 @@ import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.annotation.lifecycle.OnScheduled;
 import org.apache.nifi.annotation.lifecycle.OnStopped;
 import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
-import org.apache.nifi.atlas.hook.NiFiAtlasHook;
 import org.apache.nifi.atlas.NiFiAtlasClient;
 import org.apache.nifi.atlas.NiFiFlow;
 import org.apache.nifi.atlas.NiFiFlowAnalyzer;
+import org.apache.nifi.atlas.hook.NiFiAtlasHook;
 import org.apache.nifi.atlas.provenance.AnalysisContext;
 import org.apache.nifi.atlas.provenance.StandardAnalysisContext;
 import org.apache.nifi.atlas.provenance.lineage.CompleteFlowPathLineage;
@@ -81,8 +55,8 @@ import org.apache.nifi.components.state.Scope;
 import org.apache.nifi.context.PropertyContext;
 import org.apache.nifi.controller.ConfigurationContext;
 import org.apache.nifi.controller.status.ProcessGroupStatus;
-import org.apache.nifi.kerberos.KerberosCredentialsService;
 import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.kerberos.KerberosCredentialsService;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.provenance.ProvenanceEventRecord;
@@ -93,7 +67,33 @@ import org.apache.nifi.reporting.ReportingContext;
 import org.apache.nifi.reporting.util.provenance.ProvenanceEventConsumer;
 import org.apache.nifi.ssl.SSLContextService;
 
-import com.sun.jersey.api.client.ClientResponse;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.time.Instant;
+import java.time.ZoneOffset;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.ServiceLoader;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+import java.util.stream.Stream;
+
+import static org.apache.commons.lang3.StringUtils.isEmpty;
+import static org.apache.nifi.reporting.util.provenance.ProvenanceEventConsumer.PROVENANCE_BATCH_SIZE;
+import static org.apache.nifi.reporting.util.provenance.ProvenanceEventConsumer.PROVENANCE_START_POSITION;
 
 @Tags({"atlas", "lineage"})
 @CapabilityDescription("Report NiFi flow data set level lineage to Apache Atlas." +
@@ -552,6 +552,8 @@ public class ReportLineageToAtlas extends AbstractReportingTask {
 
         // Create Atlas configuration file if necessary.
         if (createAtlasConf) {
+            // enforce synchronous notification sending (needed for the checkpointing in ProvenanceEventConsumer)
+            atlasProperties.setProperty(AtlasHook.ATLAS_NOTIFICATION_ASYNCHRONOUS, "false");
 
             atlasProperties.put(ATLAS_PROPERTY_CLIENT_CONNECT_TIMEOUT_MS, atlasConnectTimeoutMs);
             atlasProperties.put(ATLAS_PROPERTY_CLIENT_READ_TIMEOUT_MS, atlasReadTimeoutMs);
@@ -568,6 +570,13 @@ public class ReportLineageToAtlas extends AbstractReportingTask {
                         .format(Instant.now());
                 atlasProperties.store(fos, "Generated by Apache NiFi ReportLineageToAtlas ReportingTask at " + ts);
             }
+        } else {
+            // check if synchronous notification sending has been set (needed for the checkpointing in ProvenanceEventConsumer)
+            String isAsync = atlasProperties.getProperty(AtlasHook.ATLAS_NOTIFICATION_ASYNCHRONOUS);
+            if (isAsync == null || !isAsync.equalsIgnoreCase("false")) {
+                throw new ProcessException("Atlas property '" + AtlasHook.ATLAS_NOTIFICATION_ASYNCHRONOUS + "' must be set to 'false' in " + ATLAS_PROPERTIES_FILENAME + "." +
+                        " Sending notifications asynchronously is not supported by the reporting task.");
+            }
         }
 
         getLogger().debug("Force reloading Atlas application properties.");
diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/security/Kerberos.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/security/Kerberos.java
index 41a2966..021a860 100644
--- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/security/Kerberos.java
+++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/security/Kerberos.java
@@ -16,15 +16,6 @@
  */
 package org.apache.nifi.atlas.security;
 
-import static org.apache.nifi.atlas.reporting.ReportLineageToAtlas.NIFI_KERBEROS_KEYTAB;
-import static org.apache.nifi.atlas.reporting.ReportLineageToAtlas.NIFI_KERBEROS_PRINCIPAL;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.Properties;
-
 import org.apache.atlas.AtlasClientV2;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -34,6 +25,15 @@ import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.context.PropertyContext;
 import org.apache.nifi.kerberos.KerberosCredentialsService;
 
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Properties;
+
+import static org.apache.nifi.atlas.reporting.ReportLineageToAtlas.NIFI_KERBEROS_KEYTAB;
+import static org.apache.nifi.atlas.reporting.ReportLineageToAtlas.NIFI_KERBEROS_PRINCIPAL;
+
 public class Kerberos implements AtlasAuthN {
     private static final String ALLOW_EXPLICIT_KEYTAB = "NIFI_ALLOW_EXPLICIT_KEYTAB";
 
diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/emulator/AtlasAPIV2ServerEmulator.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/emulator/AtlasAPIV2ServerEmulator.java
index b147810..0de6e8f 100644
--- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/emulator/AtlasAPIV2ServerEmulator.java
+++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/emulator/AtlasAPIV2ServerEmulator.java
@@ -22,10 +22,11 @@ import org.apache.atlas.model.instance.AtlasEntityHeader;
 import org.apache.atlas.model.instance.EntityMutationResponse;
 import org.apache.atlas.model.typedef.AtlasEntityDef;
 import org.apache.atlas.model.typedef.AtlasTypesDef;
-import org.apache.atlas.notification.hook.HookNotification;
-import org.apache.atlas.typesystem.Referenceable;
+import org.apache.atlas.v1.model.instance.Referenceable;
+import org.apache.atlas.v1.model.notification.HookNotificationV1;
 import org.apache.nifi.atlas.AtlasUtils;
 import org.apache.nifi.atlas.NiFiTypes;
+import org.codehaus.jackson.map.DeserializationConfig;
 import org.codehaus.jackson.map.ObjectMapper;
 import org.eclipse.jetty.server.Connector;
 import org.eclipse.jetty.server.Handler;
@@ -94,19 +95,19 @@ public class AtlasAPIV2ServerEmulator {
         server.start();
         logger.info("Starting {} on port {}", AtlasAPIV2ServerEmulator.class.getSimpleName(), httpConnector.getLocalPort());
 
-        embeddedKafka = new EmbeddedKafka(false);
+        embeddedKafka = new EmbeddedKafka();
         embeddedKafka.start();
 
         notificationServerEmulator.consume(m -> {
-            if (m instanceof HookNotification.EntityCreateRequest) {
-                HookNotification.EntityCreateRequest em = (HookNotification.EntityCreateRequest) m;
+            if (m instanceof HookNotificationV1.EntityCreateRequest) {
+                HookNotificationV1.EntityCreateRequest em = (HookNotificationV1.EntityCreateRequest) m;
                 for (Referenceable ref : em.getEntities()) {
                     final AtlasEntity entity = toEntity(ref);
                     createEntityByNotification(entity);
                 }
-            } else if (m instanceof HookNotification.EntityPartialUpdateRequest) {
-                HookNotification.EntityPartialUpdateRequest em
-                        = (HookNotification.EntityPartialUpdateRequest) m;
+            } else if (m instanceof HookNotificationV1.EntityPartialUpdateRequest) {
+                HookNotificationV1.EntityPartialUpdateRequest em
+                        = (HookNotificationV1.EntityPartialUpdateRequest) m;
                 final AtlasEntity entity = toEntity(em.getEntity());
                 entity.setAttribute(em.getAttribute(), em.getAttributeValue());
                 updateEntityByNotification(entity);
@@ -241,7 +242,11 @@ public class AtlasAPIV2ServerEmulator {
     }
 
     private static <T> T readInputJSON(HttpServletRequest req, Class<? extends T> clazz) throws IOException {
-        return new ObjectMapper().reader().withType(clazz).readValue(req.getInputStream());
+        return new ObjectMapper()
+                .configure(DeserializationConfig.Feature.FAIL_ON_UNKNOWN_PROPERTIES, false)
+                .reader()
+                .withType(clazz)
+                .readValue(req.getInputStream());
     }
 
     private static final AtlasTypesDef atlasTypesDef = new AtlasTypesDef();
diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/emulator/AtlasNotificationServerEmulator.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/emulator/AtlasNotificationServerEmulator.java
index 9e3a787..e317923 100644
--- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/emulator/AtlasNotificationServerEmulator.java
+++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/emulator/AtlasNotificationServerEmulator.java
@@ -16,9 +16,9 @@
  */
 package org.apache.nifi.atlas.emulator;
 
+import org.apache.atlas.model.notification.HookNotification;
 import org.apache.atlas.notification.MessageDeserializer;
 import org.apache.atlas.notification.NotificationInterface;
-import org.apache.atlas.notification.hook.HookNotification;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
@@ -36,7 +36,7 @@ public class AtlasNotificationServerEmulator {
 
     private volatile boolean isStopped;
 
-    public void consume(Consumer<HookNotification.HookNotificationMessage> c) {
+    public void consume(Consumer<HookNotification> c) {
         Properties props = new Properties();
         props.put("bootstrap.servers", "localhost:9092");
         props.put("group.id", "test");
@@ -53,8 +53,8 @@ public class AtlasNotificationServerEmulator {
             ConsumerRecords<String, String> records = consumer.poll(100);
             for (ConsumerRecord<String, String> record : records) {
                 final MessageDeserializer deserializer = NotificationInterface.NotificationType.HOOK.getDeserializer();
-                final HookNotification.HookNotificationMessage m
-                        = (HookNotification.HookNotificationMessage) deserializer.deserialize(record.value());
+                final HookNotification m
+                        = (HookNotification) deserializer.deserialize(record.value());
                 c.accept(m);
             }
         }
diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/emulator/EmbeddedKafka.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/emulator/EmbeddedKafka.java
index 40ec052..9338a55 100644
--- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/emulator/EmbeddedKafka.java
+++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/emulator/EmbeddedKafka.java
@@ -28,8 +28,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
-import java.io.IOException;
-import java.net.ServerSocket;
 import java.util.Properties;
 
 /**
@@ -47,10 +45,6 @@ public class EmbeddedKafka {
 
     private final Logger logger = LoggerFactory.getLogger(EmbeddedKafka.class);
 
-    private final int kafkaPort;
-
-    private final int zookeeperPort;
-
     private boolean started;
 
     /**
@@ -58,8 +52,8 @@ public class EmbeddedKafka {
      * configuration properties will be loaded from 'server.properties' and
      * 'zookeeper.properties' located at the root of the classpath.
      */
-    public EmbeddedKafka(boolean useRandomPort) {
-        this(loadPropertiesFromClasspath("/server.properties"), loadPropertiesFromClasspath("/zookeeper.properties"), useRandomPort);
+    public EmbeddedKafka() {
+        this(loadPropertiesFromClasspath("/server.properties"), loadPropertiesFromClasspath("/zookeeper.properties"));
     }
 
     /**
@@ -70,47 +64,14 @@ public class EmbeddedKafka {
      * @param zookeeperConfig
      *            Zookeeper configuration properties
      */
-    public EmbeddedKafka(Properties kafkaConfig, Properties zookeeperConfig, boolean useRandomPort) {
+    public EmbeddedKafka(Properties kafkaConfig, Properties zookeeperConfig) {
         this.cleanupKafkaWorkDir();
-        this.zookeeperConfig = zookeeperConfig;
-        this.kafkaConfig = kafkaConfig;
 
-        if (useRandomPort) {
-            this.kafkaPort = this.availablePort();
-            this.zookeeperPort = this.availablePort();
-
-            this.kafkaConfig.setProperty("port", String.valueOf(this.kafkaPort));
-            this.kafkaConfig.setProperty("zookeeper.connect", "localhost:" + this.zookeeperPort);
-            this.zookeeperConfig.setProperty("clientPort", String.valueOf(this.zookeeperPort));
-        } else {
-            this.kafkaPort = Integer.parseInt(kafkaConfig.getProperty("port"));
-            this.zookeeperPort = Integer.parseInt(zookeeperConfig.getProperty("clientPort"));
-        }
+        this.kafkaConfig = kafkaConfig;
+        this.zookeeperConfig = zookeeperConfig;
 
-        this.zkServer = new ZooKeeperServer();
         this.kafkaServer = new KafkaServerStartable(new KafkaConfig(kafkaConfig));
-    }
-
-    /**
-     *
-     * @return port for Kafka server
-     */
-    public int getKafkaPort() {
-        if (!this.started) {
-            throw new IllegalStateException("Kafka server is not started. Kafka port can't be determined.");
-        }
-        return this.kafkaPort;
-    }
-
-    /**
-     *
-     * @return port for Zookeeper server
-     */
-    public int getZookeeperPort() {
-        if (!this.started) {
-            throw new IllegalStateException("Kafka server is not started. Zookeeper port can't be determined.");
-        }
-        return this.zookeeperPort;
+        this.zkServer = new ZooKeeperServer();
     }
 
     /**
@@ -127,7 +88,7 @@ public class EmbeddedKafka {
             logger.info("Starting Kafka server");
             this.kafkaServer.startup();
 
-            logger.info("Embedded Kafka is started at localhost:" + this.kafkaServer.serverConfig().port()
+            logger.info("Embedded Kafka is started at localhost:" + this.kafkaServer.staticServerConfig().port()
                     + ". Zookeeper connection string: " + this.kafkaConfig.getProperty("zookeeper.connect"));
             this.started = true;
         }
@@ -209,24 +170,4 @@ public class EmbeddedKafka {
             throw new IllegalStateException(e);
         }
     }
-
-    /**
-     * Will determine the available port used by Kafka/Zookeeper servers.
-     */
-    private int availablePort() {
-        ServerSocket s = null;
-        try {
-            s = new ServerSocket(0);
-            s.setReuseAddress(true);
-            return s.getLocalPort();
-        } catch (Exception e) {
-            throw new IllegalStateException("Failed to discover available port.", e);
-        } finally {
-            try {
-                s.close();
-            } catch (IOException e) {
-                // ignore
-            }
-        }
-    }
 }
diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/hook/TestNiFiAtlasHook.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/hook/TestNiFiAtlasHook.java
index 98fd11e..bd7dca6 100644
--- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/hook/TestNiFiAtlasHook.java
+++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/hook/TestNiFiAtlasHook.java
@@ -16,7 +16,7 @@
  */
 package org.apache.nifi.atlas.hook;
 
-import org.apache.atlas.notification.hook.HookNotification;
+import org.apache.atlas.model.notification.HookNotification;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -40,16 +40,16 @@ public class TestNiFiAtlasHook {
 
     @Test
     public void messagesListShouldContainMessagesAfterAddMessage() {
-        hook.addMessage(new HookNotification.HookNotificationMessage(HookNotification.HookNotificationType.ENTITY_CREATE, "nifi"));
-        hook.addMessage(new HookNotification.HookNotificationMessage(HookNotification.HookNotificationType.ENTITY_PARTIAL_UPDATE, "nifi"));
+        hook.addMessage(new HookNotification(HookNotification.HookNotificationType.ENTITY_CREATE, "nifi"));
+        hook.addMessage(new HookNotification(HookNotification.HookNotificationType.ENTITY_PARTIAL_UPDATE, "nifi"));
 
         assertEquals(2, hook.getMessages().size());
     }
 
     @Test
     public void messagesListShouldBeCleanedUpAfterCommit() {
-        hook.addMessage(new HookNotification.HookNotificationMessage(HookNotification.HookNotificationType.ENTITY_CREATE, "nifi"));
-        hook.addMessage(new HookNotification.HookNotificationMessage(HookNotification.HookNotificationType.ENTITY_PARTIAL_UPDATE, "nifi"));
+        hook.addMessage(new HookNotification(HookNotification.HookNotificationType.ENTITY_CREATE, "nifi"));
+        hook.addMessage(new HookNotification(HookNotification.HookNotificationType.ENTITY_PARTIAL_UPDATE, "nifi"));
 
         hook.commitMessages();
 
diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/hook/TestNotificationSender.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/hook/TestNotificationSender.java
index 7d08810..2a9e768 100644
--- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/hook/TestNotificationSender.java
+++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/hook/TestNotificationSender.java
@@ -19,8 +19,10 @@ package org.apache.nifi.atlas.hook;
 import org.apache.atlas.AtlasServiceException;
 import org.apache.atlas.model.instance.AtlasEntity;
 import org.apache.atlas.model.instance.AtlasObjectId;
-import org.apache.atlas.notification.hook.HookNotification;
-import org.apache.atlas.typesystem.Referenceable;
+import org.apache.atlas.model.notification.HookNotification;
+import org.apache.atlas.v1.model.instance.Referenceable;
+import org.apache.atlas.v1.model.notification.HookNotificationV1;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.nifi.atlas.AtlasUtils;
 import org.apache.nifi.atlas.NiFiAtlasClient;
 import org.junit.Test;
@@ -34,7 +36,7 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.function.Consumer;
+import java.util.function.BiConsumer;
 import java.util.stream.Collectors;
 
 import static java.util.Arrays.asList;
@@ -60,10 +62,10 @@ public class TestNotificationSender {
 
     private static final Logger logger = LoggerFactory.getLogger(TestNotificationSender.class);
 
-    private static class Notifier implements Consumer<List<HookNotification.HookNotificationMessage>> {
-        private final List<List<HookNotification.HookNotificationMessage>> notifications = new ArrayList<>();
+    private static class Notifier implements BiConsumer<List<HookNotification>, UserGroupInformation> {
+        private final List<List<HookNotification>> notifications = new ArrayList<>();
         @Override
-        public void accept(List<HookNotification.HookNotificationMessage> messages) {
+        public void accept(List<HookNotification> messages, UserGroupInformation ugi) {
             logger.info("notified at {}, {}", notifications.size(), messages);
             notifications.add(messages);
         }
@@ -72,7 +74,7 @@ public class TestNotificationSender {
     @Test
     public void testZeroMessage() {
         final NotificationSender sender = new NotificationSender();
-        final List<HookNotification.HookNotificationMessage> messages = Collections.emptyList();
+        final List<HookNotification> messages = Collections.emptyList();
         final Notifier notifier = new Notifier();
         sender.send(messages, notifier);
         assertEquals(0, notifier.notifications.get(0).size());
@@ -88,9 +90,9 @@ public class TestNotificationSender {
     @SuppressWarnings("unchecked")
     private void assertCreateMessage(Notifier notifier, int notificationIndex, Referenceable ... expects) {
         assertTrue(notifier.notifications.size() > notificationIndex);
-        final List<HookNotification.HookNotificationMessage> messages = notifier.notifications.get(notificationIndex);
+        final List<HookNotification> messages = notifier.notifications.get(notificationIndex);
         assertEquals(1, messages.size());
-        final HookNotification.EntityCreateRequest message = (HookNotification.EntityCreateRequest) messages.get(0);
+        final HookNotificationV1.EntityCreateRequest message = (HookNotificationV1.EntityCreateRequest) messages.get(0);
         assertEquals(expects.length, message.getEntities().size());
 
         // The use of 'flatMap' at NotificationSender does not preserve actual entities order.
@@ -135,11 +137,11 @@ public class TestNotificationSender {
     @SuppressWarnings("unchecked")
     private void assertUpdateFlowPathMessage(Notifier notifier, int notificationIndex, Referenceable ... expects) {
         assertTrue(notifier.notifications.size() > notificationIndex);
-        final List<HookNotification.HookNotificationMessage> messages = notifier.notifications.get(notificationIndex);
+        final List<HookNotification> messages = notifier.notifications.get(notificationIndex);
         assertEquals(expects.length, messages.size());
         for (int i = 0; i < expects.length; i++) {
             final Referenceable expect = expects[i];
-            final HookNotification.EntityPartialUpdateRequest actual = (HookNotification.EntityPartialUpdateRequest) messages.get(i);
+            final HookNotificationV1.EntityPartialUpdateRequest actual = (HookNotificationV1.EntityPartialUpdateRequest) messages.get(i);
             assertEquals(expect.getTypeName(), actual.getTypeName());
             assertEquals(ATTR_QUALIFIED_NAME, actual.getAttribute());
             assertEquals(expect.get(ATTR_QUALIFIED_NAME), actual.getAttributeValue());
@@ -155,8 +157,8 @@ public class TestNotificationSender {
     public void testOneCreateDataSetMessage() {
         final NotificationSender sender = new NotificationSender();
         final Referenceable queue1 = createRef(TYPE_NIFI_QUEUE, "queue1@test");
-        final List<HookNotification.HookNotificationMessage> messages = Collections.singletonList(
-                new HookNotification.EntityCreateRequest(NIFI_USER, queue1));
+        final List<HookNotification> messages = Collections.singletonList(
+                new HookNotificationV1.EntityCreateRequest(NIFI_USER, queue1));
         final Notifier notifier = new Notifier();
         sender.send(messages, notifier);
 
@@ -275,46 +277,46 @@ public class TestNotificationSender {
         ff4_pathB22.set(ATTR_OUTPUTS, singleton(ff4_data22));
 
 
-        final List<HookNotification.HookNotificationMessage> messages = asList(
-                new HookNotification.EntityCreateRequest(NIFI_USER, ff1_data1),
-                new HookNotification.EntityCreateRequest(NIFI_USER, ff1_data11),
-                new HookNotification.EntityCreateRequest(NIFI_USER, ff1_data12),
-                new HookNotification.EntityCreateRequest(NIFI_USER, ff1_pathA11),
-                new HookNotification.EntityCreateRequest(NIFI_USER, ff1_pathA12),
-                new HookNotification.EntityCreateRequest(NIFI_USER, ff1_fork11),
-                new HookNotification.EntityCreateRequest(NIFI_USER, ff1_fork12),
-                new HookNotification.EntityCreateRequest(NIFI_USER, ff1_pathB11),
-                new HookNotification.EntityCreateRequest(NIFI_USER, ff1_pathB12),
-
-                new HookNotification.EntityCreateRequest(NIFI_USER, ff2_data2),
-                new HookNotification.EntityCreateRequest(NIFI_USER, ff2_data21),
-                new HookNotification.EntityCreateRequest(NIFI_USER, ff2_data22),
-                new HookNotification.EntityCreateRequest(NIFI_USER, ff2_pathA21),
-                new HookNotification.EntityCreateRequest(NIFI_USER, ff2_pathA22),
-                new HookNotification.EntityCreateRequest(NIFI_USER, ff2_fork21),
-                new HookNotification.EntityCreateRequest(NIFI_USER, ff2_fork22),
-                new HookNotification.EntityCreateRequest(NIFI_USER, ff2_pathB21),
-                new HookNotification.EntityCreateRequest(NIFI_USER, ff2_pathB22),
-
-                new HookNotification.EntityCreateRequest(NIFI_USER, ff3_data1),
-                new HookNotification.EntityCreateRequest(NIFI_USER, ff3_data11),
-                new HookNotification.EntityCreateRequest(NIFI_USER, ff3_data12),
-                new HookNotification.EntityCreateRequest(NIFI_USER, ff3_pathA11),
-                new HookNotification.EntityCreateRequest(NIFI_USER, ff3_pathA12),
-                new HookNotification.EntityCreateRequest(NIFI_USER, ff3_fork11),
-                new HookNotification.EntityCreateRequest(NIFI_USER, ff3_fork12),
-                new HookNotification.EntityCreateRequest(NIFI_USER, ff3_pathB11),
-                new HookNotification.EntityCreateRequest(NIFI_USER, ff3_pathB12),
-
-                new HookNotification.EntityCreateRequest(NIFI_USER, ff4_data2),
-                new HookNotification.EntityCreateRequest(NIFI_USER, ff4_data21),
-                new HookNotification.EntityCreateRequest(NIFI_USER, ff4_data22),
-                new HookNotification.EntityCreateRequest(NIFI_USER, ff4_pathA21),
-                new HookNotification.EntityCreateRequest(NIFI_USER, ff4_pathA22),
-                new HookNotification.EntityCreateRequest(NIFI_USER, ff4_fork21),
-                new HookNotification.EntityCreateRequest(NIFI_USER, ff4_fork22),
-                new HookNotification.EntityCreateRequest(NIFI_USER, ff4_pathB21),
-                new HookNotification.EntityCreateRequest(NIFI_USER, ff4_pathB22)
+        final List<HookNotification> messages = asList(
+                new HookNotificationV1.EntityCreateRequest(NIFI_USER, ff1_data1),
+                new HookNotificationV1.EntityCreateRequest(NIFI_USER, ff1_data11),
+                new HookNotificationV1.EntityCreateRequest(NIFI_USER, ff1_data12),
+                new HookNotificationV1.EntityCreateRequest(NIFI_USER, ff1_pathA11),
+                new HookNotificationV1.EntityCreateRequest(NIFI_USER, ff1_pathA12),
+                new HookNotificationV1.EntityCreateRequest(NIFI_USER, ff1_fork11),
+                new HookNotificationV1.EntityCreateRequest(NIFI_USER, ff1_fork12),
+                new HookNotificationV1.EntityCreateRequest(NIFI_USER, ff1_pathB11),
+                new HookNotificationV1.EntityCreateRequest(NIFI_USER, ff1_pathB12),
+
+                new HookNotificationV1.EntityCreateRequest(NIFI_USER, ff2_data2),
+                new HookNotificationV1.EntityCreateRequest(NIFI_USER, ff2_data21),
+                new HookNotificationV1.EntityCreateRequest(NIFI_USER, ff2_data22),
+                new HookNotificationV1.EntityCreateRequest(NIFI_USER, ff2_pathA21),
+                new HookNotificationV1.EntityCreateRequest(NIFI_USER, ff2_pathA22),
+                new HookNotificationV1.EntityCreateRequest(NIFI_USER, ff2_fork21),
+                new HookNotificationV1.EntityCreateRequest(NIFI_USER, ff2_fork22),
+                new HookNotificationV1.EntityCreateRequest(NIFI_USER, ff2_pathB21),
+                new HookNotificationV1.EntityCreateRequest(NIFI_USER, ff2_pathB22),
+
+                new HookNotificationV1.EntityCreateRequest(NIFI_USER, ff3_data1),
+                new HookNotificationV1.EntityCreateRequest(NIFI_USER, ff3_data11),
+                new HookNotificationV1.EntityCreateRequest(NIFI_USER, ff3_data12),
+                new HookNotificationV1.EntityCreateRequest(NIFI_USER, ff3_pathA11),
+                new HookNotificationV1.EntityCreateRequest(NIFI_USER, ff3_pathA12),
+                new HookNotificationV1.EntityCreateRequest(NIFI_USER, ff3_fork11),
+                new HookNotificationV1.EntityCreateRequest(NIFI_USER, ff3_fork12),
+                new HookNotificationV1.EntityCreateRequest(NIFI_USER, ff3_pathB11),
+                new HookNotificationV1.EntityCreateRequest(NIFI_USER, ff3_pathB12),
+
+                new HookNotificationV1.EntityCreateRequest(NIFI_USER, ff4_data2),
+                new HookNotificationV1.EntityCreateRequest(NIFI_USER, ff4_data21),
+                new HookNotificationV1.EntityCreateRequest(NIFI_USER, ff4_data22),
+                new HookNotificationV1.EntityCreateRequest(NIFI_USER, ff4_pathA21),
+                new HookNotificationV1.EntityCreateRequest(NIFI_USER, ff4_pathA22),
+                new HookNotificationV1.EntityCreateRequest(NIFI_USER, ff4_fork21),
+                new HookNotificationV1.EntityCreateRequest(NIFI_USER, ff4_fork22),
+                new HookNotificationV1.EntityCreateRequest(NIFI_USER, ff4_pathB21),
+                new HookNotificationV1.EntityCreateRequest(NIFI_USER, ff4_pathB22)
         );
 
         final Notifier notifier = new Notifier();
@@ -376,10 +378,10 @@ public class TestNotificationSender {
         newPath1Lineage.set(ATTR_INPUTS, singleton(fileC));
         newPath1Lineage.set(ATTR_OUTPUTS, singleton(fileD));
 
-        final List<HookNotification.HookNotificationMessage> messages = asList(
-                new HookNotification.EntityCreateRequest(NIFI_USER, fileC),
-                new HookNotification.EntityCreateRequest(NIFI_USER, fileD),
-                new HookNotification.EntityPartialUpdateRequest(NIFI_USER, TYPE_NIFI_FLOW_PATH, ATTR_QUALIFIED_NAME, "path1@test", newPath1Lineage)
+        final List<HookNotification> messages = asList(
+                new HookNotificationV1.EntityCreateRequest(NIFI_USER, fileC),
+                new HookNotificationV1.EntityCreateRequest(NIFI_USER, fileD),
+                new HookNotificationV1.EntityPartialUpdateRequest(NIFI_USER, TYPE_NIFI_FLOW_PATH, ATTR_QUALIFIED_NAME, "path1@test", newPath1Lineage)
         );
 
         final NiFiAtlasClient atlasClient = mock(NiFiAtlasClient.class);
diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestHBaseTable.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestHBaseTable.java
index dddea2a..5217e74 100644
--- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestHBaseTable.java
+++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestHBaseTable.java
@@ -16,7 +16,7 @@
  */
 package org.apache.nifi.atlas.provenance.analyzer;
 
-import org.apache.atlas.typesystem.Referenceable;
+import org.apache.atlas.v1.model.instance.Referenceable;
 import org.apache.nifi.atlas.provenance.AnalysisContext;
 import org.apache.nifi.atlas.provenance.DataSetRefs;
 import org.apache.nifi.atlas.provenance.NiFiProvenanceEventAnalyzer;
diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestHDFSPath.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestHDFSPath.java
index c02b72a..3d94a33 100644
--- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestHDFSPath.java
+++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestHDFSPath.java
@@ -16,7 +16,7 @@
  */
 package org.apache.nifi.atlas.provenance.analyzer;
 
-import org.apache.atlas.typesystem.Referenceable;
+import org.apache.atlas.v1.model.instance.Referenceable;
 import org.apache.nifi.atlas.provenance.AnalysisContext;
 import org.apache.nifi.atlas.provenance.DataSetRefs;
 import org.apache.nifi.atlas.provenance.NiFiProvenanceEventAnalyzer;
diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestHive2JDBC.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestHive2JDBC.java
index 2f64afb..5d5fcd6 100644
--- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestHive2JDBC.java
+++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestHive2JDBC.java
@@ -16,7 +16,7 @@
  */
 package org.apache.nifi.atlas.provenance.analyzer;
 
-import org.apache.atlas.typesystem.Referenceable;
+import org.apache.atlas.v1.model.instance.Referenceable;
 import org.apache.nifi.atlas.provenance.AnalysisContext;
 import org.apache.nifi.atlas.provenance.DataSetRefs;
 import org.apache.nifi.atlas.provenance.NiFiProvenanceEventAnalyzer;
diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestKafkaTopic.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestKafkaTopic.java
index cbe805c..b8a68fb 100644
--- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestKafkaTopic.java
+++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestKafkaTopic.java
@@ -16,7 +16,7 @@
  */
 package org.apache.nifi.atlas.provenance.analyzer;
 
-import org.apache.atlas.typesystem.Referenceable;
+import org.apache.atlas.v1.model.instance.Referenceable;
 import org.apache.nifi.atlas.provenance.AnalysisContext;
 import org.apache.nifi.atlas.provenance.DataSetRefs;
 import org.apache.nifi.atlas.provenance.NiFiProvenanceEventAnalyzer;
diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestNiFiRemotePort.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestNiFiRemotePort.java
index 4585826..e4799b3 100644
--- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestNiFiRemotePort.java
+++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestNiFiRemotePort.java
@@ -16,7 +16,7 @@
  */
 package org.apache.nifi.atlas.provenance.analyzer;
 
-import org.apache.atlas.typesystem.Referenceable;
+import org.apache.atlas.v1.model.instance.Referenceable;
 import org.apache.nifi.atlas.provenance.AnalysisContext;
 import org.apache.nifi.atlas.provenance.DataSetRefs;
 import org.apache.nifi.atlas.provenance.NiFiProvenanceEventAnalyzer;
diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestNiFiRootGroupPort.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestNiFiRootGroupPort.java
index a09f207..84777af 100644
--- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestNiFiRootGroupPort.java
+++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestNiFiRootGroupPort.java
@@ -16,7 +16,7 @@
  */
 package org.apache.nifi.atlas.provenance.analyzer;
 
-import org.apache.atlas.typesystem.Referenceable;
+import org.apache.atlas.v1.model.instance.Referenceable;
 import org.apache.nifi.atlas.provenance.AnalysisContext;
 import org.apache.nifi.atlas.provenance.DataSetRefs;
 import org.apache.nifi.atlas.provenance.NiFiProvenanceEventAnalyzer;
diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestPutHiveStreaming.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestPutHiveStreaming.java
index c816348..5184025 100644
--- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestPutHiveStreaming.java
+++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestPutHiveStreaming.java
@@ -16,7 +16,7 @@
  */
 package org.apache.nifi.atlas.provenance.analyzer;
 
-import org.apache.atlas.typesystem.Referenceable;
+import org.apache.atlas.v1.model.instance.Referenceable;
 import org.apache.nifi.atlas.provenance.AnalysisContext;
 import org.apache.nifi.atlas.provenance.DataSetRefs;
 import org.apache.nifi.atlas.provenance.NiFiProvenanceEventAnalyzer;
diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestUnknownDataSet.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestUnknownDataSet.java
index 01616b3..96623a5 100644
--- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestUnknownDataSet.java
+++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestUnknownDataSet.java
@@ -16,7 +16,7 @@
  */
 package org.apache.nifi.atlas.provenance.analyzer;
 
-import org.apache.atlas.typesystem.Referenceable;
+import org.apache.atlas.v1.model.instance.Referenceable;
 import org.apache.nifi.atlas.provenance.AnalysisContext;
 import org.apache.nifi.atlas.provenance.DataSetRefs;
 import org.apache.nifi.atlas.provenance.NiFiProvenanceEventAnalyzer;
diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/reporting/ITReportLineageToAtlas.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/reporting/ITReportLineageToAtlas.java
index 31293bb..55d45a2 100644
--- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/reporting/ITReportLineageToAtlas.java
+++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/reporting/ITReportLineageToAtlas.java
@@ -26,6 +26,7 @@ import org.apache.nifi.controller.status.ConnectionStatus;
 import org.apache.nifi.controller.status.PortStatus;
 import org.apache.nifi.controller.status.ProcessGroupStatus;
 import org.apache.nifi.controller.status.ProcessorStatus;
+import org.apache.nifi.controller.status.RemoteProcessGroupStatus;
 import org.apache.nifi.flowfile.attributes.SiteToSiteAttributes;
 import org.apache.nifi.provenance.ProvenanceEventRecord;
 import org.apache.nifi.provenance.ProvenanceRepository;
@@ -123,7 +124,38 @@ public class ITReportLineageToAtlas {
             throw new RuntimeException("Failed to parse template", e);
         }
 
-        return handler.getRootProcessGroupStatus();
+        ProcessGroupStatus rootPgStatus = handler.getRootProcessGroupStatus();
+
+        for (ConnectionStatus connectionStatus : rootPgStatus.getConnectionStatus()) {
+            connectionStatus.setSourceName(lookupComponentName(rootPgStatus, connectionStatus.getSourceId()));
+            connectionStatus.setDestinationName(lookupComponentName(rootPgStatus, connectionStatus.getDestinationId()));
+        }
+
+        return rootPgStatus;
+    }
+
+    private String lookupComponentName(ProcessGroupStatus rootPgStatus, String componentId) {
+        for (ProcessorStatus processorStatus : rootPgStatus.getProcessorStatus()) {
+            if (processorStatus.getId().equals(componentId)) {
+                return processorStatus.getName();
+            }
+        }
+        for (PortStatus portStatus : rootPgStatus.getInputPortStatus()) {
+            if (portStatus.getId().equals(componentId)) {
+                return portStatus.getName();
+            }
+        }
+        for (PortStatus portStatus : rootPgStatus.getOutputPortStatus()) {
+            if (portStatus.getId().equals(componentId)) {
+                return portStatus.getName();
+            }
+        }
+        for (RemoteProcessGroupStatus remoteProcessGroupStatus : rootPgStatus.getRemoteProcessGroupStatus()) {
+            if (remoteProcessGroupStatus.getId().equals(componentId)) {
+                return remoteProcessGroupStatus.getName();
+            }
+        }
+        return null; // funnels do not have names
     }
 
     private static class TemplateContentHander implements ContentHandler {
diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/reporting/TestReportLineageToAtlas.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/reporting/TestReportLineageToAtlas.java
index 18a4f37..bb4297f 100644
--- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/reporting/TestReportLineageToAtlas.java
+++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/reporting/TestReportLineageToAtlas.java
@@ -17,11 +17,15 @@
 package org.apache.nifi.atlas.reporting;
 
 import com.sun.jersey.api.client.Client;
+import org.apache.atlas.ApplicationProperties;
 import org.apache.atlas.AtlasClientV2;
+import org.apache.atlas.hook.AtlasHook;
+import org.apache.commons.configuration.Configuration;
 import org.apache.nifi.atlas.NiFiAtlasClient;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.reporting.ReportingContext;
 import org.apache.nifi.reporting.ReportingInitializationContext;
 import org.apache.nifi.util.MockComponentLog;
@@ -35,11 +39,14 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
 import java.lang.reflect.Field;
 import java.lang.reflect.Method;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Properties;
 import java.util.function.BiConsumer;
 import java.util.function.Consumer;
 
@@ -54,6 +61,7 @@ import static org.apache.nifi.atlas.reporting.ReportLineageToAtlas.ATLAS_URLS;
 import static org.apache.nifi.atlas.reporting.ReportLineageToAtlas.ATLAS_USER;
 import static org.apache.nifi.atlas.reporting.ReportLineageToAtlas.KAFKA_BOOTSTRAP_SERVERS;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.mock;
@@ -128,7 +136,9 @@ public class TestReportLineageToAtlas {
     @Test
     public void testDefaultConnectAndReadTimeout() throws Exception {
         // GIVEN
-        Map<PropertyDescriptor, String> properties = new HashMap<>();
+        String atlasConfDir = createAtlasConfDir();
+
+        Map<PropertyDescriptor, String> properties = initReportingTaskProperties(atlasConfDir);
 
         // WHEN
         // THEN
@@ -141,21 +151,12 @@ public class TestReportLineageToAtlas {
         int expectedConnectTimeoutMs = 10000;
         int expectedReadTimeoutMs = 5000;
 
-        String atlasConfDir = "target/atlasConfDir";
-        File directory = new File(atlasConfDir);
-        if (!directory.exists()) {
-            directory.mkdirs();
-        }
+        String atlasConfDir = createAtlasConfDir();
 
-        Map<PropertyDescriptor, String> properties = new HashMap<>();
+        Map<PropertyDescriptor, String> properties = initReportingTaskProperties(atlasConfDir);
         properties.put(ATLAS_CONNECT_TIMEOUT, (expectedConnectTimeoutMs / 1000) + " sec");
         properties.put(ATLAS_READ_TIMEOUT, (expectedReadTimeoutMs / 1000) + " sec");
 
-        properties.put(ATLAS_CONF_DIR, atlasConfDir);
-        properties.put(ATLAS_CONF_CREATE, "true");
-        properties.put(ATLAS_DEFAULT_CLUSTER_NAME, "defaultClusterName");
-        properties.put(KAFKA_BOOTSTRAP_SERVERS, "http://localhost:6667");
-
         // WHEN
         // THEN
         testConnectAndReadTimeout(properties, expectedConnectTimeoutMs, expectedReadTimeoutMs);
@@ -163,11 +164,6 @@ public class TestReportLineageToAtlas {
 
     private void testConnectAndReadTimeout(Map<PropertyDescriptor, String> properties, Integer expectedConnectTimeout, Integer expectedReadTimeout) throws Exception {
         // GIVEN
-        properties.put(ATLAS_NIFI_URL, "http://localhost:8080/nifi");
-        properties.put(ATLAS_URLS, "http://localhost:27000");
-        properties.put(ATLAS_USER, "admin");
-        properties.put(ATLAS_PASSWORD, "admin123");
-
         reportingContext = mock(ReportingContext.class);
         when(reportingContext.getProperties()).thenReturn(properties);
         when(reportingContext.getProperty(any())).then(invocation -> new MockPropertyValue(properties.get(invocation.getArguments()[0])));
@@ -200,4 +196,83 @@ public class TestReportLineageToAtlas {
         assertEquals(expectedConnectTimeout, actualConnectTimeout);
         assertEquals(expectedReadTimeout, actualReadTimeout);
     }
+
+    @Test
+    public void testNotificationSendingIsSynchronousWhenAtlasConfIsGenerated() throws Exception {
+        String atlasConfDir = createAtlasConfDir();
+
+        Map<PropertyDescriptor, String> properties = initReportingTaskProperties(atlasConfDir);
+
+        testNotificationSendingIsSynchronous(properties);
+    }
+
+    @Test
+    public void testNotificationSendingIsSynchronousWhenAtlasConfIsProvidedAndSynchronousModeHasBeenSet() throws Exception {
+        String atlasConfDir = createAtlasConfDir();
+
+        Properties atlasConf = new Properties();
+        atlasConf.setProperty(AtlasHook.ATLAS_NOTIFICATION_ASYNCHRONOUS, "false");
+        saveAtlasConf(atlasConfDir, atlasConf);
+
+        Map<PropertyDescriptor, String> properties = initReportingTaskProperties(atlasConfDir);
+        properties.put(ATLAS_CONF_CREATE, "false");
+
+        testNotificationSendingIsSynchronous(properties);
+    }
+
+    private void testNotificationSendingIsSynchronous(Map<PropertyDescriptor, String> properties) throws Exception {
+        ConfigurationContext configurationContext = new MockConfigurationContext(properties, null);
+
+        testSubject.initialize(initializationContext);
+        testSubject.setup(configurationContext);
+
+        Configuration atlasProperties = ApplicationProperties.get();
+        boolean isAsync = atlasProperties.getBoolean(AtlasHook.ATLAS_NOTIFICATION_ASYNCHRONOUS, Boolean.TRUE);
+        assertFalse(isAsync);
+    }
+
+    @Test(expected = ProcessException.class)
+    public void testThrowExceptionWhenAtlasConfIsProvidedButSynchronousModeHasNotBeenSet() throws Exception {
+        String atlasConfDir = createAtlasConfDir();
+
+        Properties atlasConf = new Properties();
+        saveAtlasConf(atlasConfDir, atlasConf);
+
+        Map<PropertyDescriptor, String> properties = initReportingTaskProperties(atlasConfDir);
+        properties.put(ATLAS_CONF_CREATE, "false");
+
+        ConfigurationContext configurationContext = new MockConfigurationContext(properties, null);
+
+        testSubject.initialize(initializationContext);
+        testSubject.setup(configurationContext);
+    }
+
+    private String createAtlasConfDir() {
+        String atlasConfDir = "target/atlasConfDir";
+        File directory = new File(atlasConfDir);
+        if (!directory.exists()) {
+            directory.mkdirs();
+        }
+        return atlasConfDir;
+    }
+
+    private void saveAtlasConf(String atlasConfDir, Properties atlasConf) throws IOException {
+        FileOutputStream fos = new FileOutputStream(atlasConfDir + File.separator + ApplicationProperties.APPLICATION_PROPERTIES);
+        atlasConf.store(fos, "Atlas test config");
+    }
+
+    private Map<PropertyDescriptor, String> initReportingTaskProperties(String atlasConfDir) {
+        Map<PropertyDescriptor, String> properties = new HashMap<>();
+
+        properties.put(ATLAS_URLS, "http://localhost:21000");
+        properties.put(ATLAS_NIFI_URL, "http://localhost:8080/nifi");
+        properties.put(ATLAS_CONF_DIR, atlasConfDir);
+        properties.put(ATLAS_CONF_CREATE, "true");
+        properties.put(ATLAS_DEFAULT_CLUSTER_NAME, "defaultClusterName");
+        properties.put(ATLAS_USER, "admin");
+        properties.put(ATLAS_PASSWORD, "password");
+        properties.put(KAFKA_BOOTSTRAP_SERVERS, "http://localhost:9092");
+
+        return properties;
+    }
 }
diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/resources/atlas-application.properties b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/resources/atlas-application.properties
index 927347d..8d845ce 100644
--- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/resources/atlas-application.properties
+++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/resources/atlas-application.properties
@@ -16,3 +16,5 @@ atlas.cluster.name=AtlasCluster
 
 # atlas.kafka.bootstrap.servers=atlas.example.com:6667
 atlas.kafka.bootstrap.servers=localhost:9092
+
+atlas.notification.hook.asynchronous=false
diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/resources/server.properties b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/resources/server.properties
index d15debc..c0363c1 100644
--- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/resources/server.properties
+++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/resources/server.properties
@@ -12,6 +12,7 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+
 # see kafka.server.KafkaConfig for additional details and defaults
 
 ############################# Server Basics #############################
@@ -21,25 +22,26 @@ broker.id=0
 
 ############################# Socket Server Settings #############################
 
-# The port the socket server listens on
-port=9092
-
-# Hostname the broker will bind to. If not set, the server will bind to all interfaces
-#host.name=localhost
+# The address the socket server listens on. It will get the value returned from 
+# java.net.InetAddress.getCanonicalHostName() if not configured.
+#   FORMAT:
+#     listeners = listener_name://host_name:port
+#   EXAMPLE:
+#     listeners = PLAINTEXT://your.host.name:9092
+#listeners=PLAINTEXT://:9092
 
-# Hostname the broker will advertise to producers and consumers. If not set, it uses the
-# value for "host.name" if configured.  Otherwise, it will use the value returned from
-# java.net.InetAddress.getCanonicalHostName().
-#advertised.host.name=<hostname routable by clients>
+# Hostname and port the broker will advertise to producers and consumers. If not set, 
+# it uses the value for "listeners" if configured.  Otherwise, it will use the value
+# returned from java.net.InetAddress.getCanonicalHostName().
+#advertised.listeners=PLAINTEXT://your.host.name:9092
 
-# The port to publish to ZooKeeper for clients to use. If this is not set,
-# it will publish the same port that the broker binds to.
-#advertised.port=<port accessible by clients>
+# Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details
+#listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
 
-# The number of threads handling network requests
+# The number of threads that the server uses for receiving requests from the network and sending responses to the network
 num.network.threads=3
 
-# The number of threads doing disk I/O
+# The number of threads that the server uses for processing requests, which may include disk I/O
 num.io.threads=8
 
 # The send buffer (SO_SNDBUF) used by the socket server
@@ -54,7 +56,7 @@ socket.request.max.bytes=104857600
 
 ############################# Log Basics #############################
 
-# A comma seperated list of directories under which to store log files
+# A comma separated list of directories under which to store log files
 log.dirs=target/kafka-tmp/kafka-logs
 
 # The default number of log partitions per topic. More partitions allow greater
@@ -66,6 +68,13 @@ num.partitions=1
 # This value is recommended to be increased for installations with data dirs located in RAID array.
 num.recovery.threads.per.data.dir=1
 
+############################# Internal Topic Settings  #############################
+# The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state"
+# For anything other than development testing, a value greater than 1 is recommended for to ensure availability such as 3.
+offsets.topic.replication.factor=1
+transaction.state.log.replication.factor=1
+transaction.state.log.min.isr=1
+
 ############################# Log Flush Policy #############################
 
 # Messages are immediately written to the filesystem but by default we only fsync() to sync
@@ -73,7 +82,7 @@ num.recovery.threads.per.data.dir=1
 # There are a few important trade-offs here:
 #    1. Durability: Unflushed data may be lost if you are not using replication.
 #    2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
-#    3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to exceessive seeks.
+#    3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to excessive seeks.
 # The settings below allow one to configure the flush policy to flush data after a period of time or
 # every N messages (or both). This can be done globally and overridden on a per-topic basis.
 
@@ -90,11 +99,11 @@ num.recovery.threads.per.data.dir=1
 # A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
 # from the end of the log.
 
-# The minimum age of a log file to be eligible for deletion
+# The minimum age of a log file to be eligible for deletion due to age
 log.retention.hours=168
 
-# A size-based retention policy for logs. Segments are pruned from the log as long as the remaining
-# segments don't drop below log.retention.bytes.
+# A size-based retention policy for logs. Segments are pruned from the log unless the remaining
+# segments drop below log.retention.bytes. Functions independently of log.retention.hours.
 #log.retention.bytes=1073741824
 
 # The maximum size of a log segment file. When this size is reached a new log segment will be created.
@@ -104,10 +113,6 @@ log.segment.bytes=1073741824
 # to the retention policies
 log.retention.check.interval.ms=300000
 
-# By default the log cleaner is disabled and the log retention policy will default to just delete segments after their retention expires.
-# If log.cleaner.enable=true is set the cleaner will be enabled and individual logs can then be marked for log compaction.
-log.cleaner.enable=false
-
 ############################# Zookeeper #############################
 
 # Zookeeper connection string (see zookeeper docs for details).
@@ -119,3 +124,13 @@ zookeeper.connect=localhost:2181
 
 # Timeout in ms for connecting to zookeeper
 zookeeper.connection.timeout.ms=6000
+
+
+############################# Group Coordinator Settings #############################
+
+# The following configuration specifies the time, in milliseconds, that the GroupCoordinator will delay the initial consumer rebalance.
+# The rebalance will be further delayed by the value of group.initial.rebalance.delay.ms as new members join the group, up to a maximum of max.poll.interval.ms.
+# The default value for this is 3 seconds.
+# We override this to 0 here as it makes for a better out-of-the-box experience for development and testing.
+# However, in production environments the default value of 3 seconds is more suitable as this will help to avoid unnecessary, and potentially expensive, rebalances during application startup.
+group.initial.rebalance.delay.ms=0
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-atlas-bundle/pom.xml b/nifi-nar-bundles/nifi-atlas-bundle/pom.xml
index 96fef2d..9ffbf87 100644
--- a/nifi-nar-bundles/nifi-atlas-bundle/pom.xml
+++ b/nifi-nar-bundles/nifi-atlas-bundle/pom.xml
@@ -22,13 +22,13 @@
         <version>1.11.0-SNAPSHOT</version>
     </parent>
 
-    <groupId>org.apache.nifi</groupId>
     <artifactId>nifi-atlas-bundle</artifactId>
-    <version>1.11.0-SNAPSHOT</version>
     <packaging>pom</packaging>
+
     <properties>
-        <atlas.version>0.8.1</atlas.version>
+        <atlas.version>2.0.0</atlas.version>
     </properties>
+
     <modules>
         <module>nifi-atlas-reporting-task</module>
         <module>nifi-atlas-nar</module>
@@ -37,94 +37,20 @@
     <dependencyManagement>
         <dependencies>
             <dependency>
-                <!-- Explicitly force Netty to 3.7.1 due to CVE-2014-0193 -->
-                <groupId>io.netty</groupId>
-                <artifactId>netty</artifactId>
-                <version>3.7.1.Final</version>
-            </dependency>
-            <dependency>
                 <groupId>org.apache.nifi</groupId>
                 <artifactId>nifi-atlas-reporting-task</artifactId>
                 <version>1.11.0-SNAPSHOT</version>
             </dependency>
-            <dependency>
-                <groupId>org.apache.nifi</groupId>
-                <artifactId>nifi-client-dto</artifactId>
-                <version>1.11.0-SNAPSHOT</version>
-            </dependency>
-            <dependency>
-                <groupId>org.apache.atlas</groupId>
-                <artifactId>atlas-client</artifactId>
-                <version>${atlas.version}</version>
-                <exclusions>
-                    <exclusion>
-                        <groupId>org.slf4j</groupId>
-                        <artifactId>slf4j-log4j12</artifactId>
-                    </exclusion>
-                    <exclusion>
-                        <groupId>org.slf4j</groupId>
-                        <artifactId>jul-to-slf4j</artifactId>
-                    </exclusion>
-                    <!--
-                        <exclusion>
-                            <groupId>log4j</groupId>
-                            <artifactId>log4j</artifactId>
-                        </exclusion>
-                        <exclusion>
-                            <groupId>org.apache.hadoop</groupId>
-                            <artifactId>hadoop-minikdc</artifactId>
-                        </exclusion>
-                        <exclusion>
-                            <groupId>org.apache.hadoop</groupId>
-                            <artifactId>hadoop-annotations</artifactId>
-                        </exclusion>
-                        <!-
-                            Exclude these Atlas typesystem to reduce dependency size.
-                            Use atlas-intg and atlas-common instead.
-                        ->
-                        <exclusion>
-                            <groupId>org.apache.atlas</groupId>
-                            <artifactId>atlas-typesystem</artifactId>
-                        </exclusion>
-                    -->
-                </exclusions>
-            </dependency>
-            <dependency>
-                <groupId>org.apache.atlas</groupId>
-                <artifactId>atlas-intg</artifactId>
-                <version>${atlas.version}</version>
-                <exclusions>
-                    <exclusion>
-                        <groupId>org.slf4j</groupId>
-                        <artifactId>slf4j-log4j12</artifactId>
-                    </exclusion>
-                    <exclusion>
-                        <groupId>org.slf4j</groupId>
-                        <artifactId>jul-to-slf4j</artifactId>
-                    </exclusion>
-                    <exclusion>
-                        <groupId>log4j</groupId>
-                        <artifactId>log4j</artifactId>
-                    </exclusion>
-                </exclusions>
-            </dependency>
+
             <dependency>
                 <groupId>org.apache.atlas</groupId>
-                <artifactId>atlas-common</artifactId>
+                <artifactId>atlas-client-v2</artifactId>
                 <version>${atlas.version}</version>
                 <exclusions>
                     <exclusion>
                         <groupId>org.slf4j</groupId>
                         <artifactId>slf4j-log4j12</artifactId>
                     </exclusion>
-                    <exclusion>
-                        <groupId>org.slf4j</groupId>
-                        <artifactId>jul-to-slf4j</artifactId>
-                    </exclusion>
-                    <exclusion>
-                        <groupId>log4j</groupId>
-                        <artifactId>log4j</artifactId>
-                    </exclusion>
                 </exclusions>
             </dependency>
             <dependency>
@@ -136,12 +62,10 @@
                         <groupId>org.slf4j</groupId>
                         <artifactId>slf4j-log4j12</artifactId>
                     </exclusion>
-                    <!--
                     <exclusion>
                         <groupId>log4j</groupId>
                         <artifactId>log4j</artifactId>
                     </exclusion>
-                    -->
                 </exclusions>
             </dependency>
         </dependencies>