You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by mc...@apache.org on 2020/01/08 18:27:00 UTC
[nifi] branch master updated: NIFI-6939: Upgrade Atlas client
dependency to 2.0.0 NIFI-6939: Review changes
This is an automated email from the ASF dual-hosted git repository.
mcgilman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/master by this push:
new cc74534 NIFI-6939: Upgrade Atlas client dependency to 2.0.0 NIFI-6939: Review changes
cc74534 is described below
commit cc74534bc0c4556816e2f3d9b04dbf76f73ab22c
Author: Peter Turcsanyi <tu...@apache.org>
AuthorDate: Thu Dec 19 16:53:42 2019 +0100
NIFI-6939: Upgrade Atlas client dependency to 2.0.0
NIFI-6939: Review changes
This closes #3944
---
.../nifi-atlas-reporting-task/pom.xml | 37 ++-----
.../org/apache/nifi/atlas/hook/NiFiAtlasHook.java | 21 ++--
.../apache/nifi/atlas/hook/NotificationSender.java | 66 +++++++-----
.../AbstractNiFiProvenanceEventAnalyzer.java | 2 +-
.../apache/nifi/atlas/provenance/DataSetRefs.java | 2 +-
.../provenance/analyzer/AbstractHiveAnalyzer.java | 2 +-
.../nifi/atlas/provenance/analyzer/FilePath.java | 2 +-
.../nifi/atlas/provenance/analyzer/HBaseTable.java | 2 +-
.../nifi/atlas/provenance/analyzer/HDFSPath.java | 2 +-
.../nifi/atlas/provenance/analyzer/Hive2JDBC.java | 2 +-
.../nifi/atlas/provenance/analyzer/KafkaTopic.java | 2 +-
.../atlas/provenance/analyzer/NiFiRemotePort.java | 2 +-
.../provenance/analyzer/NiFiRootGroupPort.java | 2 +-
.../provenance/analyzer/PutHiveStreaming.java | 2 +-
.../analyzer/unknown/UnknownDataSet.java | 2 +-
.../provenance/analyzer/unknown/UnknownInput.java | 2 +-
.../provenance/analyzer/unknown/UnknownOutput.java | 2 +-
.../lineage/AbstractLineageStrategy.java | 25 +++--
.../lineage/CompleteFlowPathLineage.java | 2 +-
.../atlas/provenance/lineage/LineageContext.java | 4 +-
.../provenance/lineage/SimpleFlowPathLineage.java | 2 +-
.../nifi/atlas/reporting/ReportLineageToAtlas.java | 71 +++++++------
.../org/apache/nifi/atlas/security/Kerberos.java | 18 ++--
.../atlas/emulator/AtlasAPIV2ServerEmulator.java | 23 ++--
.../emulator/AtlasNotificationServerEmulator.java | 8 +-
.../apache/nifi/atlas/emulator/EmbeddedKafka.java | 73 ++-----------
.../apache/nifi/atlas/hook/TestNiFiAtlasHook.java | 10 +-
.../nifi/atlas/hook/TestNotificationSender.java | 116 +++++++++++----------
.../atlas/provenance/analyzer/TestHBaseTable.java | 2 +-
.../atlas/provenance/analyzer/TestHDFSPath.java | 2 +-
.../atlas/provenance/analyzer/TestHive2JDBC.java | 2 +-
.../atlas/provenance/analyzer/TestKafkaTopic.java | 2 +-
.../provenance/analyzer/TestNiFiRemotePort.java | 2 +-
.../provenance/analyzer/TestNiFiRootGroupPort.java | 2 +-
.../provenance/analyzer/TestPutHiveStreaming.java | 2 +-
.../provenance/analyzer/TestUnknownDataSet.java | 2 +-
.../atlas/reporting/ITReportLineageToAtlas.java | 34 +++++-
.../atlas/reporting/TestReportLineageToAtlas.java | 109 ++++++++++++++++---
.../test/resources/atlas-application.properties | 2 +
.../src/test/resources/server.properties | 61 +++++++----
nifi-nar-bundles/nifi-atlas-bundle/pom.xml | 86 +--------------
41 files changed, 403 insertions(+), 409 deletions(-)
diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/pom.xml b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/pom.xml
index 353cf51..d9d8e39 100644
--- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/pom.xml
+++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/pom.xml
@@ -48,49 +48,24 @@
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-kerberos-credentials-service-api</artifactId>
</dependency>
- <!-- Explicitly force beanutils 1.9.3 because versions prior to 1.9.2 had a vuln
- Can remove this once atlas client which depends on hadoop-common uses a more recent version -->
+
+ <!-- Explicitly force beanutils 1.9.4 in order to avoid vulnerabilities in earlier versions.
+ Can remove this once atlas client which depends on hadoop-common uses a more recent version. -->
<dependency>
<groupId>commons-beanutils</groupId>
<artifactId>commons-beanutils</artifactId>
<version>1.9.4</version>
</dependency>
+
<dependency>
<groupId>org.apache.atlas</groupId>
- <artifactId>atlas-client</artifactId>
- <!-- Exclude dependencies to reduce NAR file size -->
- <exclusions>
- <!-- NOTE: Scala is required by atlas notification -->
- <!--
- fastutil-6.5.16.jar is 16MB.
- 'fastutil' is only used by
- org.apache.atlas.repository.memory.AttributeStores
- which is deprecated as being part of V1 API.
- -->
- <exclusion>
- <groupId>it.unimi.dsi</groupId>
- <artifactId>fastutil</artifactId>
- </exclusion>
- <!-- Explicit dep referred to in POM above. commons-beanutils and commons-beanutils-core merged in 1.9.0 -->
- <exclusion>
- <groupId>commons-beanutils</groupId>
- <artifactId>commons-beanutils-core</artifactId>
- </exclusion>
- </exclusions>
+ <artifactId>atlas-client-v2</artifactId>
</dependency>
<dependency>
<groupId>org.apache.atlas</groupId>
<artifactId>atlas-notification</artifactId>
</dependency>
- <!--
- NOTE: Could not use nifi-hadoop-libraries-nar because hadoop-client uses httpclient-4.2.5,
- but atlas-client uses httpclient-4.5.3.
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-common</artifactId>
- <scope>provided</scope>
- </dependency>
- -->
+
<dependency>
<groupId>org.codehaus.jettison</groupId>
<artifactId>jettison</artifactId>
diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/hook/NiFiAtlasHook.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/hook/NiFiAtlasHook.java
index 4962a70..3f778e3 100644
--- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/hook/NiFiAtlasHook.java
+++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/hook/NiFiAtlasHook.java
@@ -17,7 +17,7 @@
package org.apache.nifi.atlas.hook;
import org.apache.atlas.hook.AtlasHook;
-import org.apache.atlas.notification.hook.HookNotification.HookNotificationMessage;
+import org.apache.atlas.model.notification.HookNotification;
import org.apache.nifi.atlas.NiFiAtlasClient;
import org.apache.nifi.atlas.provenance.lineage.LineageContext;
@@ -26,38 +26,29 @@ import java.util.List;
/**
* This class is not thread-safe as it holds uncommitted notification messages within instance.
- * {@link #addMessage(HookNotificationMessage)} and {@link #commitMessages()} should be used serially from a single thread.
+ * {@link #addMessage(HookNotification)} and {@link #commitMessages()} should be used serially from a single thread.
*/
public class NiFiAtlasHook extends AtlasHook implements LineageContext {
public static final String NIFI_USER = "nifi";
- private static final String CONF_PREFIX = "atlas.hook.nifi.";
- private static final String HOOK_NUM_RETRIES = CONF_PREFIX + "numRetries";
-
private NiFiAtlasClient atlasClient;
public void setAtlasClient(NiFiAtlasClient atlasClient) {
this.atlasClient = atlasClient;
}
- @Override
- protected String getNumberOfRetriesPropertyKey() {
- return HOOK_NUM_RETRIES;
- }
-
-
- private final List<HookNotificationMessage> messages = new ArrayList<>();
+ private final List<HookNotification> messages = new ArrayList<>();
@Override
- public void addMessage(HookNotificationMessage message) {
+ public void addMessage(HookNotification message) {
messages.add(message);
}
public void commitMessages() {
final NotificationSender notificationSender = createNotificationSender();
notificationSender.setAtlasClient(atlasClient);
- List<HookNotificationMessage> messagesBatch = new ArrayList<>(messages);
+ List<HookNotification> messagesBatch = new ArrayList<>(messages);
messages.clear();
notificationSender.send(messagesBatch, this::notifyEntities);
}
@@ -72,7 +63,7 @@ public class NiFiAtlasHook extends AtlasHook implements LineageContext {
return new NotificationSender();
}
- List<HookNotificationMessage> getMessages() {
+ List<HookNotification> getMessages() {
return messages;
}
}
diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/hook/NotificationSender.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/hook/NotificationSender.java
index 66c21df..0ae711f 100644
--- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/hook/NotificationSender.java
+++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/hook/NotificationSender.java
@@ -20,9 +20,11 @@ import com.sun.jersey.api.client.ClientResponse;
import org.apache.atlas.AtlasServiceException;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasObjectId;
-import org.apache.atlas.notification.hook.HookNotification;
-import org.apache.atlas.typesystem.Referenceable;
-import org.apache.atlas.typesystem.persistence.Id;
+import org.apache.atlas.model.notification.HookNotification;
+import org.apache.atlas.v1.model.instance.Id;
+import org.apache.atlas.v1.model.instance.Referenceable;
+import org.apache.atlas.v1.model.notification.HookNotificationV1;
+import org.apache.hadoop.security.UserGroupInformation;
import org.apache.nifi.atlas.AtlasUtils;
import org.apache.nifi.atlas.NiFiAtlasClient;
import org.apache.nifi.util.Tuple;
@@ -39,23 +41,24 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
-import java.util.function.Consumer;
+import java.util.UUID;
+import java.util.function.BiConsumer;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static java.util.stream.Collectors.groupingBy;
import static java.util.stream.Collectors.toMap;
-import static org.apache.atlas.notification.hook.HookNotification.HookNotificationType.ENTITY_CREATE;
-import static org.apache.atlas.notification.hook.HookNotification.HookNotificationType.ENTITY_PARTIAL_UPDATE;
+import static org.apache.atlas.model.notification.HookNotification.HookNotificationType.ENTITY_CREATE;
+import static org.apache.atlas.model.notification.HookNotification.HookNotificationType.ENTITY_PARTIAL_UPDATE;
import static org.apache.nifi.atlas.AtlasUtils.toTypedQualifiedName;
-import static org.apache.nifi.atlas.hook.NiFiAtlasHook.NIFI_USER;
import static org.apache.nifi.atlas.NiFiTypes.ATTR_GUID;
import static org.apache.nifi.atlas.NiFiTypes.ATTR_INPUTS;
import static org.apache.nifi.atlas.NiFiTypes.ATTR_OUTPUTS;
import static org.apache.nifi.atlas.NiFiTypes.ATTR_QUALIFIED_NAME;
import static org.apache.nifi.atlas.NiFiTypes.ATTR_TYPENAME;
import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_FLOW_PATH;
+import static org.apache.nifi.atlas.hook.NiFiAtlasHook.NIFI_USER;
/**
* This class implements Atlas hook notification message deduplication mechanism.
@@ -175,7 +178,7 @@ class NotificationSender {
* <p>Send hook notification messages.
* In order to notify relationships between 'nifi_flow_path' and its inputs/outputs, this method sends messages in following order:</p>
* <ol>
- * <li>As a a single {@link org.apache.atlas.notification.hook.HookNotification.EntityCreateRequest} message:
+ * <li>As a a single {@link org.apache.atlas.v1.model.notification.HookNotificationV1.EntityCreateRequest} message:
* <ul>
* <li>New entities except 'nifi_flow_path', including DataSets such as 'nifi_queue', 'kafka_topic' or 'hive_table' ... etc,
* so that 'nifi_flow_path' can refer</li>
@@ -190,21 +193,21 @@ class NotificationSender {
* @param messages list of messages to be sent
* @param notifier responsible for sending notification messages, its accept method can be called multiple times
*/
- void send(final List<HookNotification.HookNotificationMessage> messages, final Consumer<List<HookNotification.HookNotificationMessage>> notifier) {
+ void send(final List<HookNotification> messages, final BiConsumer<List<HookNotification>, UserGroupInformation> notifier) {
logger.info("Sending {} messages to Atlas", messages.size());
final Metrics metrics = new Metrics();
try {
metrics.totalMessages = messages.size();
- final Map<Boolean, List<HookNotification.HookNotificationMessage>> createAndOthers = messages.stream().collect(groupingBy(msg -> ENTITY_CREATE.equals(msg.getType())));
+ final Map<Boolean, List<HookNotification>> createAndOthers = messages.stream().collect(groupingBy(msg -> ENTITY_CREATE.equals(msg.getType())));
- final List<HookNotification.HookNotificationMessage> creates = safeGet(createAndOthers, true);
+ final List<HookNotification> creates = safeGet(createAndOthers, true);
metrics.createMessages = creates.size();
final Map<Boolean, List<Referenceable>> newFlowPathsAndOtherEntities = creates.stream()
- .flatMap(msg -> ((HookNotification.EntityCreateRequest) msg).getEntities().stream())
- .collect(groupingBy(ref -> TYPE_NIFI_FLOW_PATH.equals(ref.typeName)));
+ .flatMap(msg -> ((HookNotificationV1.EntityCreateRequest) msg).getEntities().stream())
+ .collect(groupingBy(ref -> TYPE_NIFI_FLOW_PATH.equals(ref.getTypeName())));
// Deduplicate same entity creation messages.
final List<Referenceable> newEntitiesExceptFlowPaths = safeGet(newFlowPathsAndOtherEntities, false)
@@ -227,28 +230,28 @@ class NotificationSender {
newEntities.addAll(newEntitiesExceptFlowPaths);
newEntities.addAll(newFlowPaths);
if (!newEntities.isEmpty()) {
- notifier.accept(Collections.singletonList(new HookNotification.EntityCreateRequest(NIFI_USER, newEntities)));
+ notifier.accept(Collections.singletonList(new HookNotificationV1.EntityCreateRequest(NIFI_USER, newEntities)), null);
}
- final Map<Boolean, List<HookNotification.HookNotificationMessage>> partialNiFiFlowPathUpdateAndOthers
+ final Map<Boolean, List<HookNotification>> partialNiFiFlowPathUpdateAndOthers
= safeGet(createAndOthers, false).stream().collect(groupingBy(msg
-> ENTITY_PARTIAL_UPDATE.equals(msg.getType())
- && TYPE_NIFI_FLOW_PATH.equals(((HookNotification.EntityPartialUpdateRequest)msg).getTypeName())
- && ATTR_QUALIFIED_NAME.equals(((HookNotification.EntityPartialUpdateRequest)msg).getAttribute())
+ && TYPE_NIFI_FLOW_PATH.equals(((HookNotificationV1.EntityPartialUpdateRequest)msg).getTypeName())
+ && ATTR_QUALIFIED_NAME.equals(((HookNotificationV1.EntityPartialUpdateRequest)msg).getAttribute())
));
// These updates are made against existing flow path entities.
- final List<HookNotification.HookNotificationMessage> partialNiFiFlowPathUpdates = safeGet(partialNiFiFlowPathUpdateAndOthers, true);
- final List<HookNotification.HookNotificationMessage> otherMessages = safeGet(partialNiFiFlowPathUpdateAndOthers, false);
+ final List<HookNotification> partialNiFiFlowPathUpdates = safeGet(partialNiFiFlowPathUpdateAndOthers, true);
+ final List<HookNotification> otherMessages = safeGet(partialNiFiFlowPathUpdateAndOthers, false);
metrics.partialNiFiFlowPathUpdates = partialNiFiFlowPathUpdates.size();
metrics.otherMessages = otherMessages.size();
// 2. Notify de-duplicated 'nifi_flow_path' updates
- final List<HookNotification.HookNotificationMessage> deduplicatedMessages = partialNiFiFlowPathUpdates.stream().map(msg -> (HookNotification.EntityPartialUpdateRequest) msg)
+ final List<HookNotification> deduplicatedMessages = partialNiFiFlowPathUpdates.stream().map(msg -> (HookNotificationV1.EntityPartialUpdateRequest) msg)
// Group by nifi_flow_path qualifiedName value.
- .collect(groupingBy(HookNotification.EntityPartialUpdateRequest::getAttributeValue)).entrySet().stream()
+ .collect(groupingBy(HookNotificationV1.EntityPartialUpdateRequest::getAttributeValue)).entrySet().stream()
.map(entry -> {
final String flowPathQualifiedName = entry.getKey();
final Map<String, Referenceable> distinctInputs;
@@ -274,7 +277,7 @@ class NotificationSender {
}
// Merge all inputs and outputs for this nifi_flow_path.
- for (HookNotification.EntityPartialUpdateRequest msg : entry.getValue()) {
+ for (HookNotificationV1.EntityPartialUpdateRequest msg : entry.getValue()) {
fromReferenceable(msg.getEntity().get(ATTR_INPUTS), metrics)
.entrySet().stream().filter(ref -> !distinctInputs.containsKey(ref.getKey()))
.forEach(ref -> distinctInputs.put(ref.getKey(), ref.getValue()));
@@ -290,17 +293,17 @@ class NotificationSender {
// org.json4s.package$MappingException: Can't find ScalaSig for class org.apache.atlas.typesystem.Referenceable
flowPathRef.set(ATTR_INPUTS, new ArrayList<>(distinctInputs.values()));
flowPathRef.set(ATTR_OUTPUTS, new ArrayList<>(distinctOutputs.values()));
- return new HookNotification.EntityPartialUpdateRequest(NIFI_USER, TYPE_NIFI_FLOW_PATH,
+ return new HookNotificationV1.EntityPartialUpdateRequest(NIFI_USER, TYPE_NIFI_FLOW_PATH,
ATTR_QUALIFIED_NAME, flowPathQualifiedName, flowPathRef);
})
.filter(Objects::nonNull)
.collect(Collectors.toList());
metrics.uniquePartialNiFiFlowPathUpdates = deduplicatedMessages.size();
- notifier.accept(deduplicatedMessages);
+ notifier.accept(deduplicatedMessages, null);
// 3. Notify other messages
- notifier.accept(otherMessages);
+ notifier.accept(otherMessages, null);
} finally {
logger.info(metrics.toLogString("Finished"));
@@ -380,7 +383,7 @@ class NotificationSender {
final String typedRefQualifiedName = toTypedQualifiedName(typeName, refQualifiedName);
final Referenceable refFromCacheIfAvailable = typedQualifiedNameToRef.computeIfAbsent(typedRefQualifiedName, k -> {
- if (id.isAssigned()) {
+ if (isAssigned(id)) {
// If this referenceable has Guid assigned, then add this one to cache.
guidToTypedQualifiedName.put(id._getId(), typedRefQualifiedName);
}
@@ -391,4 +394,15 @@ class NotificationSender {
}).filter(tuple -> tuple.getValue() != null)
.collect(toMap(Tuple::getKey, Tuple::getValue));
}
+
+ // Copy of org.apache.atlas.typesystem.persistence.Id.isAssigned() from v0.8.1. This method does not exists in v2.0.0.
+ private boolean isAssigned(Id id) {
+ try {
+ UUID.fromString(id.getId());
+ } catch (IllegalArgumentException e) {
+ return false;
+ }
+
+ return true;
+ }
}
diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/AbstractNiFiProvenanceEventAnalyzer.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/AbstractNiFiProvenanceEventAnalyzer.java
index 069276a..9269fdf 100644
--- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/AbstractNiFiProvenanceEventAnalyzer.java
+++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/AbstractNiFiProvenanceEventAnalyzer.java
@@ -16,7 +16,7 @@
*/
package org.apache.nifi.atlas.provenance;
-import org.apache.atlas.typesystem.Referenceable;
+import org.apache.atlas.v1.model.instance.Referenceable;
import org.apache.nifi.provenance.ProvenanceEventType;
import java.net.MalformedURLException;
diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/DataSetRefs.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/DataSetRefs.java
index 4f745d1..4b038e4 100644
--- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/DataSetRefs.java
+++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/DataSetRefs.java
@@ -16,7 +16,7 @@
*/
package org.apache.nifi.atlas.provenance;
-import org.apache.atlas.typesystem.Referenceable;
+import org.apache.atlas.v1.model.instance.Referenceable;
import java.util.Collections;
import java.util.LinkedHashSet;
diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/AbstractHiveAnalyzer.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/AbstractHiveAnalyzer.java
index 879b2ee..3b3b105 100644
--- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/AbstractHiveAnalyzer.java
+++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/AbstractHiveAnalyzer.java
@@ -16,7 +16,7 @@
*/
package org.apache.nifi.atlas.provenance.analyzer;
-import org.apache.atlas.typesystem.Referenceable;
+import org.apache.atlas.v1.model.instance.Referenceable;
import org.apache.nifi.atlas.provenance.AbstractNiFiProvenanceEventAnalyzer;
import org.apache.nifi.util.Tuple;
diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/FilePath.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/FilePath.java
index 37df736..7a1a589 100644
--- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/FilePath.java
+++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/FilePath.java
@@ -16,7 +16,7 @@
*/
package org.apache.nifi.atlas.provenance.analyzer;
-import org.apache.atlas.typesystem.Referenceable;
+import org.apache.atlas.v1.model.instance.Referenceable;
import org.apache.nifi.atlas.provenance.AbstractNiFiProvenanceEventAnalyzer;
import org.apache.nifi.atlas.provenance.AnalysisContext;
import org.apache.nifi.atlas.provenance.DataSetRefs;
diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/HBaseTable.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/HBaseTable.java
index 0f446fb..c9e7400 100644
--- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/HBaseTable.java
+++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/HBaseTable.java
@@ -16,7 +16,7 @@
*/
package org.apache.nifi.atlas.provenance.analyzer;
-import org.apache.atlas.typesystem.Referenceable;
+import org.apache.atlas.v1.model.instance.Referenceable;
import org.apache.nifi.atlas.provenance.AbstractNiFiProvenanceEventAnalyzer;
import org.apache.nifi.atlas.provenance.AnalysisContext;
import org.apache.nifi.atlas.provenance.DataSetRefs;
diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/HDFSPath.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/HDFSPath.java
index b1ef828..5cf77ab 100644
--- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/HDFSPath.java
+++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/HDFSPath.java
@@ -16,7 +16,7 @@
*/
package org.apache.nifi.atlas.provenance.analyzer;
-import org.apache.atlas.typesystem.Referenceable;
+import org.apache.atlas.v1.model.instance.Referenceable;
import org.apache.nifi.atlas.provenance.AbstractNiFiProvenanceEventAnalyzer;
import org.apache.nifi.atlas.provenance.AnalysisContext;
import org.apache.nifi.atlas.provenance.DataSetRefs;
diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/Hive2JDBC.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/Hive2JDBC.java
index ccbbc66..ffed41f 100644
--- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/Hive2JDBC.java
+++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/Hive2JDBC.java
@@ -16,7 +16,7 @@
*/
package org.apache.nifi.atlas.provenance.analyzer;
-import org.apache.atlas.typesystem.Referenceable;
+import org.apache.atlas.v1.model.instance.Referenceable;
import org.apache.nifi.atlas.provenance.AnalysisContext;
import org.apache.nifi.atlas.provenance.DataSetRefs;
import org.apache.nifi.provenance.ProvenanceEventRecord;
diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/KafkaTopic.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/KafkaTopic.java
index ff86166..acdaef4 100644
--- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/KafkaTopic.java
+++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/KafkaTopic.java
@@ -16,7 +16,7 @@
*/
package org.apache.nifi.atlas.provenance.analyzer;
-import org.apache.atlas.typesystem.Referenceable;
+import org.apache.atlas.v1.model.instance.Referenceable;
import org.apache.nifi.atlas.provenance.AbstractNiFiProvenanceEventAnalyzer;
import org.apache.nifi.atlas.provenance.AnalysisContext;
import org.apache.nifi.atlas.provenance.DataSetRefs;
diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/NiFiRemotePort.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/NiFiRemotePort.java
index 69b2d47..f7ea33e 100644
--- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/NiFiRemotePort.java
+++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/NiFiRemotePort.java
@@ -16,7 +16,7 @@
*/
package org.apache.nifi.atlas.provenance.analyzer;
-import org.apache.atlas.typesystem.Referenceable;
+import org.apache.atlas.v1.model.instance.Referenceable;
import org.apache.nifi.atlas.provenance.AnalysisContext;
import org.apache.nifi.atlas.provenance.DataSetRefs;
import org.apache.nifi.controller.status.ConnectionStatus;
diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/NiFiRootGroupPort.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/NiFiRootGroupPort.java
index e791c62..75ebb44 100644
--- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/NiFiRootGroupPort.java
+++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/NiFiRootGroupPort.java
@@ -16,7 +16,7 @@
*/
package org.apache.nifi.atlas.provenance.analyzer;
-import org.apache.atlas.typesystem.Referenceable;
+import org.apache.atlas.v1.model.instance.Referenceable;
import org.apache.nifi.atlas.provenance.AnalysisContext;
import org.apache.nifi.atlas.provenance.DataSetRefs;
import org.apache.nifi.controller.status.ConnectionStatus;
diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/PutHiveStreaming.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/PutHiveStreaming.java
index 7b41c57..78c37ea 100644
--- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/PutHiveStreaming.java
+++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/PutHiveStreaming.java
@@ -16,7 +16,7 @@
*/
package org.apache.nifi.atlas.provenance.analyzer;
-import org.apache.atlas.typesystem.Referenceable;
+import org.apache.atlas.v1.model.instance.Referenceable;
import org.apache.nifi.atlas.provenance.AnalysisContext;
import org.apache.nifi.atlas.provenance.DataSetRefs;
import org.apache.nifi.provenance.ProvenanceEventRecord;
diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/unknown/UnknownDataSet.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/unknown/UnknownDataSet.java
index 42e407d..0117b19 100644
--- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/unknown/UnknownDataSet.java
+++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/unknown/UnknownDataSet.java
@@ -16,7 +16,7 @@
*/
package org.apache.nifi.atlas.provenance.analyzer.unknown;
-import org.apache.atlas.typesystem.Referenceable;
+import org.apache.atlas.v1.model.instance.Referenceable;
import org.apache.nifi.atlas.provenance.AbstractNiFiProvenanceEventAnalyzer;
import org.apache.nifi.atlas.provenance.AnalysisContext;
import org.apache.nifi.provenance.ProvenanceEventRecord;
diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/unknown/UnknownInput.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/unknown/UnknownInput.java
index f16908b..850f4cf 100644
--- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/unknown/UnknownInput.java
+++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/unknown/UnknownInput.java
@@ -16,7 +16,7 @@
*/
package org.apache.nifi.atlas.provenance.analyzer.unknown;
-import org.apache.atlas.typesystem.Referenceable;
+import org.apache.atlas.v1.model.instance.Referenceable;
import org.apache.nifi.atlas.provenance.AnalysisContext;
import org.apache.nifi.atlas.provenance.DataSetRefs;
import org.apache.nifi.provenance.ProvenanceEventRecord;
diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/unknown/UnknownOutput.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/unknown/UnknownOutput.java
index 5d564c2..9fc9253 100644
--- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/unknown/UnknownOutput.java
+++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/analyzer/unknown/UnknownOutput.java
@@ -16,7 +16,7 @@
*/
package org.apache.nifi.atlas.provenance.analyzer.unknown;
-import org.apache.atlas.typesystem.Referenceable;
+import org.apache.atlas.v1.model.instance.Referenceable;
import org.apache.nifi.atlas.provenance.AnalysisContext;
import org.apache.nifi.atlas.provenance.DataSetRefs;
import org.apache.nifi.provenance.ProvenanceEventRecord;
diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/lineage/AbstractLineageStrategy.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/lineage/AbstractLineageStrategy.java
index a253c7d..ba53bf0 100644
--- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/lineage/AbstractLineageStrategy.java
+++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/lineage/AbstractLineageStrategy.java
@@ -17,8 +17,9 @@
package org.apache.nifi.atlas.provenance.lineage;
import org.apache.atlas.model.instance.AtlasObjectId;
-import org.apache.atlas.notification.hook.HookNotification;
-import org.apache.atlas.typesystem.Referenceable;
+import org.apache.atlas.v1.model.instance.Id;
+import org.apache.atlas.v1.model.instance.Referenceable;
+import org.apache.atlas.v1.model.notification.HookNotificationV1;
import org.apache.nifi.atlas.NiFiFlow;
import org.apache.nifi.atlas.NiFiFlowPath;
import org.apache.nifi.atlas.provenance.AnalysisContext;
@@ -40,7 +41,6 @@ import java.util.stream.Collectors;
import static org.apache.nifi.atlas.AtlasUtils.toStr;
import static org.apache.nifi.atlas.AtlasUtils.toTypedQualifiedName;
-import static org.apache.nifi.atlas.hook.NiFiAtlasHook.NIFI_USER;
import static org.apache.nifi.atlas.NiFiTypes.ATTR_INPUTS;
import static org.apache.nifi.atlas.NiFiTypes.ATTR_NAME;
import static org.apache.nifi.atlas.NiFiTypes.ATTR_NIFI_FLOW;
@@ -49,6 +49,7 @@ import static org.apache.nifi.atlas.NiFiTypes.ATTR_QUALIFIED_NAME;
import static org.apache.nifi.atlas.NiFiTypes.ATTR_URL;
import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_FLOW;
import static org.apache.nifi.atlas.NiFiTypes.TYPE_NIFI_FLOW_PATH;
+import static org.apache.nifi.atlas.hook.NiFiAtlasHook.NIFI_USER;
public abstract class AbstractLineageStrategy implements LineageStrategy {
@@ -131,7 +132,7 @@ public abstract class AbstractLineageStrategy implements LineageStrategy {
}
protected void createEntity(Referenceable ... entities) {
- final HookNotification.EntityCreateRequest msg = new HookNotification.EntityCreateRequest(NIFI_USER, entities);
+ final HookNotificationV1.EntityCreateRequest msg = new HookNotificationV1.EntityCreateRequest(NIFI_USER, entities);
lineageContext.addMessage(msg);
}
@@ -146,11 +147,11 @@ public abstract class AbstractLineageStrategy implements LineageStrategy {
refsToAdd.stream().filter(ref -> !existingRefTypedQualifiedNames.contains(toTypedQualifiedName.apply(ref)))
.forEach(ref -> {
- if (ref.getId().isUnassigned()) {
+ if (isUnassigned(ref.getId())) {
// Create new entity.
logger.debug("Found a new DataSet reference from {} to {}, sending an EntityCreateRequest",
new Object[]{toTypedQualifiedName.apply(nifiFlowPath), toTypedQualifiedName.apply(ref)});
- final HookNotification.EntityCreateRequest createDataSet = new HookNotification.EntityCreateRequest(NIFI_USER, ref);
+ final HookNotificationV1.EntityCreateRequest createDataSet = new HookNotificationV1.EntityCreateRequest(NIFI_USER, ref);
lineageContext.addMessage(createDataSet);
}
refs.add(ref);
@@ -169,10 +170,18 @@ public abstract class AbstractLineageStrategy implements LineageStrategy {
final boolean inputsAdded = addDataSetRefs(dataSetRefs.getInputs(), flowPathRef, ATTR_INPUTS);
final boolean outputsAdded = addDataSetRefs(dataSetRefs.getOutputs(), flowPathRef, ATTR_OUTPUTS);
if (inputsAdded || outputsAdded) {
- lineageContext.addMessage(new HookNotification.EntityPartialUpdateRequest(NIFI_USER, TYPE_NIFI_FLOW_PATH,
+ lineageContext.addMessage(new HookNotificationV1.EntityPartialUpdateRequest(NIFI_USER, TYPE_NIFI_FLOW_PATH,
ATTR_QUALIFIED_NAME, (String) flowPathRef.get(ATTR_QUALIFIED_NAME), flowPathRef));
}
}
-
+ // Copy of org.apache.atlas.typesystem.persistence.Id.isUnassigned() from v0.8.1. This method does not exists in v2.0.0.
+ private boolean isUnassigned(Id id) {
+ try {
+ long l = Long.parseLong(id.getId());
+ return l < 0;
+ } catch (NumberFormatException ne) {
+ return false;
+ }
+ }
}
diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/lineage/CompleteFlowPathLineage.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/lineage/CompleteFlowPathLineage.java
index d3ac658..03c9eaa 100644
--- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/lineage/CompleteFlowPathLineage.java
+++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/lineage/CompleteFlowPathLineage.java
@@ -16,7 +16,7 @@
*/
package org.apache.nifi.atlas.provenance.lineage;
-import org.apache.atlas.typesystem.Referenceable;
+import org.apache.atlas.v1.model.instance.Referenceable;
import org.apache.nifi.atlas.NiFiFlow;
import org.apache.nifi.atlas.NiFiFlowPath;
import org.apache.nifi.atlas.provenance.AnalysisContext;
diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/lineage/LineageContext.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/lineage/LineageContext.java
index 060cfbe..32405ed 100644
--- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/lineage/LineageContext.java
+++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/lineage/LineageContext.java
@@ -16,8 +16,8 @@
*/
package org.apache.nifi.atlas.provenance.lineage;
-import org.apache.atlas.notification.hook.HookNotification;
+import org.apache.atlas.model.notification.HookNotification;
public interface LineageContext {
- void addMessage(HookNotification.HookNotificationMessage message);
+ void addMessage(HookNotification message);
}
diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/lineage/SimpleFlowPathLineage.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/lineage/SimpleFlowPathLineage.java
index 7ecbed5..4d86c78 100644
--- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/lineage/SimpleFlowPathLineage.java
+++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/provenance/lineage/SimpleFlowPathLineage.java
@@ -16,7 +16,7 @@
*/
package org.apache.nifi.atlas.provenance.lineage;
-import org.apache.atlas.typesystem.Referenceable;
+import org.apache.atlas.v1.model.instance.Referenceable;
import org.apache.nifi.atlas.NiFiFlow;
import org.apache.nifi.atlas.NiFiFlowPath;
import org.apache.nifi.atlas.provenance.AnalysisContext;
diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/reporting/ReportLineageToAtlas.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/reporting/ReportLineageToAtlas.java
index 80bad7a..1560c7d 100644
--- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/reporting/ReportLineageToAtlas.java
+++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/reporting/ReportLineageToAtlas.java
@@ -16,36 +16,10 @@
*/
package org.apache.nifi.atlas.reporting;
-import static org.apache.commons.lang3.StringUtils.isEmpty;
-import static org.apache.nifi.reporting.util.provenance.ProvenanceEventConsumer.PROVENANCE_BATCH_SIZE;
-import static org.apache.nifi.reporting.util.provenance.ProvenanceEventConsumer.PROVENANCE_START_POSITION;
-
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.net.MalformedURLException;
-import java.net.URL;
-import java.time.Instant;
-import java.time.ZoneOffset;
-import java.time.format.DateTimeFormatter;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.ServiceLoader;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-import java.util.function.Consumer;
-import java.util.stream.Stream;
-
+import com.sun.jersey.api.client.ClientResponse;
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasServiceException;
+import org.apache.atlas.hook.AtlasHook;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.config.SslConfigs;
@@ -57,10 +31,10 @@ import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
-import org.apache.nifi.atlas.hook.NiFiAtlasHook;
import org.apache.nifi.atlas.NiFiAtlasClient;
import org.apache.nifi.atlas.NiFiFlow;
import org.apache.nifi.atlas.NiFiFlowAnalyzer;
+import org.apache.nifi.atlas.hook.NiFiAtlasHook;
import org.apache.nifi.atlas.provenance.AnalysisContext;
import org.apache.nifi.atlas.provenance.StandardAnalysisContext;
import org.apache.nifi.atlas.provenance.lineage.CompleteFlowPathLineage;
@@ -81,8 +55,8 @@ import org.apache.nifi.components.state.Scope;
import org.apache.nifi.context.PropertyContext;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.controller.status.ProcessGroupStatus;
-import org.apache.nifi.kerberos.KerberosCredentialsService;
import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.kerberos.KerberosCredentialsService;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.provenance.ProvenanceEventRecord;
@@ -93,7 +67,33 @@ import org.apache.nifi.reporting.ReportingContext;
import org.apache.nifi.reporting.util.provenance.ProvenanceEventConsumer;
import org.apache.nifi.ssl.SSLContextService;
-import com.sun.jersey.api.client.ClientResponse;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.time.Instant;
+import java.time.ZoneOffset;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.ServiceLoader;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+import java.util.stream.Stream;
+
+import static org.apache.commons.lang3.StringUtils.isEmpty;
+import static org.apache.nifi.reporting.util.provenance.ProvenanceEventConsumer.PROVENANCE_BATCH_SIZE;
+import static org.apache.nifi.reporting.util.provenance.ProvenanceEventConsumer.PROVENANCE_START_POSITION;
@Tags({"atlas", "lineage"})
@CapabilityDescription("Report NiFi flow data set level lineage to Apache Atlas." +
@@ -552,6 +552,8 @@ public class ReportLineageToAtlas extends AbstractReportingTask {
// Create Atlas configuration file if necessary.
if (createAtlasConf) {
+ // enforce synchronous notification sending (needed for the checkpointing in ProvenanceEventConsumer)
+ atlasProperties.setProperty(AtlasHook.ATLAS_NOTIFICATION_ASYNCHRONOUS, "false");
atlasProperties.put(ATLAS_PROPERTY_CLIENT_CONNECT_TIMEOUT_MS, atlasConnectTimeoutMs);
atlasProperties.put(ATLAS_PROPERTY_CLIENT_READ_TIMEOUT_MS, atlasReadTimeoutMs);
@@ -568,6 +570,13 @@ public class ReportLineageToAtlas extends AbstractReportingTask {
.format(Instant.now());
atlasProperties.store(fos, "Generated by Apache NiFi ReportLineageToAtlas ReportingTask at " + ts);
}
+ } else {
+ // check if synchronous notification sending has been set (needed for the checkpointing in ProvenanceEventConsumer)
+ String isAsync = atlasProperties.getProperty(AtlasHook.ATLAS_NOTIFICATION_ASYNCHRONOUS);
+ if (isAsync == null || !isAsync.equalsIgnoreCase("false")) {
+ throw new ProcessException("Atlas property '" + AtlasHook.ATLAS_NOTIFICATION_ASYNCHRONOUS + "' must be set to 'false' in " + ATLAS_PROPERTIES_FILENAME + "." +
+ " Sending notifications asynchronously is not supported by the reporting task.");
+ }
}
getLogger().debug("Force reloading Atlas application properties.");
diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/security/Kerberos.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/security/Kerberos.java
index 41a2966..021a860 100644
--- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/security/Kerberos.java
+++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/security/Kerberos.java
@@ -16,15 +16,6 @@
*/
package org.apache.nifi.atlas.security;
-import static org.apache.nifi.atlas.reporting.ReportLineageToAtlas.NIFI_KERBEROS_KEYTAB;
-import static org.apache.nifi.atlas.reporting.ReportLineageToAtlas.NIFI_KERBEROS_PRINCIPAL;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.Properties;
-
import org.apache.atlas.AtlasClientV2;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
@@ -34,6 +25,15 @@ import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.context.PropertyContext;
import org.apache.nifi.kerberos.KerberosCredentialsService;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Properties;
+
+import static org.apache.nifi.atlas.reporting.ReportLineageToAtlas.NIFI_KERBEROS_KEYTAB;
+import static org.apache.nifi.atlas.reporting.ReportLineageToAtlas.NIFI_KERBEROS_PRINCIPAL;
+
public class Kerberos implements AtlasAuthN {
private static final String ALLOW_EXPLICIT_KEYTAB = "NIFI_ALLOW_EXPLICIT_KEYTAB";
diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/emulator/AtlasAPIV2ServerEmulator.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/emulator/AtlasAPIV2ServerEmulator.java
index b147810..0de6e8f 100644
--- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/emulator/AtlasAPIV2ServerEmulator.java
+++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/emulator/AtlasAPIV2ServerEmulator.java
@@ -22,10 +22,11 @@ import org.apache.atlas.model.instance.AtlasEntityHeader;
import org.apache.atlas.model.instance.EntityMutationResponse;
import org.apache.atlas.model.typedef.AtlasEntityDef;
import org.apache.atlas.model.typedef.AtlasTypesDef;
-import org.apache.atlas.notification.hook.HookNotification;
-import org.apache.atlas.typesystem.Referenceable;
+import org.apache.atlas.v1.model.instance.Referenceable;
+import org.apache.atlas.v1.model.notification.HookNotificationV1;
import org.apache.nifi.atlas.AtlasUtils;
import org.apache.nifi.atlas.NiFiTypes;
+import org.codehaus.jackson.map.DeserializationConfig;
import org.codehaus.jackson.map.ObjectMapper;
import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.server.Handler;
@@ -94,19 +95,19 @@ public class AtlasAPIV2ServerEmulator {
server.start();
logger.info("Starting {} on port {}", AtlasAPIV2ServerEmulator.class.getSimpleName(), httpConnector.getLocalPort());
- embeddedKafka = new EmbeddedKafka(false);
+ embeddedKafka = new EmbeddedKafka();
embeddedKafka.start();
notificationServerEmulator.consume(m -> {
- if (m instanceof HookNotification.EntityCreateRequest) {
- HookNotification.EntityCreateRequest em = (HookNotification.EntityCreateRequest) m;
+ if (m instanceof HookNotificationV1.EntityCreateRequest) {
+ HookNotificationV1.EntityCreateRequest em = (HookNotificationV1.EntityCreateRequest) m;
for (Referenceable ref : em.getEntities()) {
final AtlasEntity entity = toEntity(ref);
createEntityByNotification(entity);
}
- } else if (m instanceof HookNotification.EntityPartialUpdateRequest) {
- HookNotification.EntityPartialUpdateRequest em
- = (HookNotification.EntityPartialUpdateRequest) m;
+ } else if (m instanceof HookNotificationV1.EntityPartialUpdateRequest) {
+ HookNotificationV1.EntityPartialUpdateRequest em
+ = (HookNotificationV1.EntityPartialUpdateRequest) m;
final AtlasEntity entity = toEntity(em.getEntity());
entity.setAttribute(em.getAttribute(), em.getAttributeValue());
updateEntityByNotification(entity);
@@ -241,7 +242,11 @@ public class AtlasAPIV2ServerEmulator {
}
private static <T> T readInputJSON(HttpServletRequest req, Class<? extends T> clazz) throws IOException {
- return new ObjectMapper().reader().withType(clazz).readValue(req.getInputStream());
+ return new ObjectMapper()
+ .configure(DeserializationConfig.Feature.FAIL_ON_UNKNOWN_PROPERTIES, false)
+ .reader()
+ .withType(clazz)
+ .readValue(req.getInputStream());
}
private static final AtlasTypesDef atlasTypesDef = new AtlasTypesDef();
diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/emulator/AtlasNotificationServerEmulator.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/emulator/AtlasNotificationServerEmulator.java
index 9e3a787..e317923 100644
--- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/emulator/AtlasNotificationServerEmulator.java
+++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/emulator/AtlasNotificationServerEmulator.java
@@ -16,9 +16,9 @@
*/
package org.apache.nifi.atlas.emulator;
+import org.apache.atlas.model.notification.HookNotification;
import org.apache.atlas.notification.MessageDeserializer;
import org.apache.atlas.notification.NotificationInterface;
-import org.apache.atlas.notification.hook.HookNotification;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
@@ -36,7 +36,7 @@ public class AtlasNotificationServerEmulator {
private volatile boolean isStopped;
- public void consume(Consumer<HookNotification.HookNotificationMessage> c) {
+ public void consume(Consumer<HookNotification> c) {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
@@ -53,8 +53,8 @@ public class AtlasNotificationServerEmulator {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
final MessageDeserializer deserializer = NotificationInterface.NotificationType.HOOK.getDeserializer();
- final HookNotification.HookNotificationMessage m
- = (HookNotification.HookNotificationMessage) deserializer.deserialize(record.value());
+ final HookNotification m
+ = (HookNotification) deserializer.deserialize(record.value());
c.accept(m);
}
}
diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/emulator/EmbeddedKafka.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/emulator/EmbeddedKafka.java
index 40ec052..9338a55 100644
--- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/emulator/EmbeddedKafka.java
+++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/emulator/EmbeddedKafka.java
@@ -28,8 +28,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
-import java.io.IOException;
-import java.net.ServerSocket;
import java.util.Properties;
/**
@@ -47,10 +45,6 @@ public class EmbeddedKafka {
private final Logger logger = LoggerFactory.getLogger(EmbeddedKafka.class);
- private final int kafkaPort;
-
- private final int zookeeperPort;
-
private boolean started;
/**
@@ -58,8 +52,8 @@ public class EmbeddedKafka {
* configuration properties will be loaded from 'server.properties' and
* 'zookeeper.properties' located at the root of the classpath.
*/
- public EmbeddedKafka(boolean useRandomPort) {
- this(loadPropertiesFromClasspath("/server.properties"), loadPropertiesFromClasspath("/zookeeper.properties"), useRandomPort);
+ public EmbeddedKafka() {
+ this(loadPropertiesFromClasspath("/server.properties"), loadPropertiesFromClasspath("/zookeeper.properties"));
}
/**
@@ -70,47 +64,14 @@ public class EmbeddedKafka {
* @param zookeeperConfig
* Zookeeper configuration properties
*/
- public EmbeddedKafka(Properties kafkaConfig, Properties zookeeperConfig, boolean useRandomPort) {
+ public EmbeddedKafka(Properties kafkaConfig, Properties zookeeperConfig) {
this.cleanupKafkaWorkDir();
- this.zookeeperConfig = zookeeperConfig;
- this.kafkaConfig = kafkaConfig;
- if (useRandomPort) {
- this.kafkaPort = this.availablePort();
- this.zookeeperPort = this.availablePort();
-
- this.kafkaConfig.setProperty("port", String.valueOf(this.kafkaPort));
- this.kafkaConfig.setProperty("zookeeper.connect", "localhost:" + this.zookeeperPort);
- this.zookeeperConfig.setProperty("clientPort", String.valueOf(this.zookeeperPort));
- } else {
- this.kafkaPort = Integer.parseInt(kafkaConfig.getProperty("port"));
- this.zookeeperPort = Integer.parseInt(zookeeperConfig.getProperty("clientPort"));
- }
+ this.kafkaConfig = kafkaConfig;
+ this.zookeeperConfig = zookeeperConfig;
- this.zkServer = new ZooKeeperServer();
this.kafkaServer = new KafkaServerStartable(new KafkaConfig(kafkaConfig));
- }
-
- /**
- *
- * @return port for Kafka server
- */
- public int getKafkaPort() {
- if (!this.started) {
- throw new IllegalStateException("Kafka server is not started. Kafka port can't be determined.");
- }
- return this.kafkaPort;
- }
-
- /**
- *
- * @return port for Zookeeper server
- */
- public int getZookeeperPort() {
- if (!this.started) {
- throw new IllegalStateException("Kafka server is not started. Zookeeper port can't be determined.");
- }
- return this.zookeeperPort;
+ this.zkServer = new ZooKeeperServer();
}
/**
@@ -127,7 +88,7 @@ public class EmbeddedKafka {
logger.info("Starting Kafka server");
this.kafkaServer.startup();
- logger.info("Embedded Kafka is started at localhost:" + this.kafkaServer.serverConfig().port()
+ logger.info("Embedded Kafka is started at localhost:" + this.kafkaServer.staticServerConfig().port()
+ ". Zookeeper connection string: " + this.kafkaConfig.getProperty("zookeeper.connect"));
this.started = true;
}
@@ -209,24 +170,4 @@ public class EmbeddedKafka {
throw new IllegalStateException(e);
}
}
-
- /**
- * Will determine the available port used by Kafka/Zookeeper servers.
- */
- private int availablePort() {
- ServerSocket s = null;
- try {
- s = new ServerSocket(0);
- s.setReuseAddress(true);
- return s.getLocalPort();
- } catch (Exception e) {
- throw new IllegalStateException("Failed to discover available port.", e);
- } finally {
- try {
- s.close();
- } catch (IOException e) {
- // ignore
- }
- }
- }
}
diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/hook/TestNiFiAtlasHook.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/hook/TestNiFiAtlasHook.java
index 98fd11e..bd7dca6 100644
--- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/hook/TestNiFiAtlasHook.java
+++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/hook/TestNiFiAtlasHook.java
@@ -16,7 +16,7 @@
*/
package org.apache.nifi.atlas.hook;
-import org.apache.atlas.notification.hook.HookNotification;
+import org.apache.atlas.model.notification.HookNotification;
import org.junit.Before;
import org.junit.Test;
@@ -40,16 +40,16 @@ public class TestNiFiAtlasHook {
@Test
public void messagesListShouldContainMessagesAfterAddMessage() {
- hook.addMessage(new HookNotification.HookNotificationMessage(HookNotification.HookNotificationType.ENTITY_CREATE, "nifi"));
- hook.addMessage(new HookNotification.HookNotificationMessage(HookNotification.HookNotificationType.ENTITY_PARTIAL_UPDATE, "nifi"));
+ hook.addMessage(new HookNotification(HookNotification.HookNotificationType.ENTITY_CREATE, "nifi"));
+ hook.addMessage(new HookNotification(HookNotification.HookNotificationType.ENTITY_PARTIAL_UPDATE, "nifi"));
assertEquals(2, hook.getMessages().size());
}
@Test
public void messagesListShouldBeCleanedUpAfterCommit() {
- hook.addMessage(new HookNotification.HookNotificationMessage(HookNotification.HookNotificationType.ENTITY_CREATE, "nifi"));
- hook.addMessage(new HookNotification.HookNotificationMessage(HookNotification.HookNotificationType.ENTITY_PARTIAL_UPDATE, "nifi"));
+ hook.addMessage(new HookNotification(HookNotification.HookNotificationType.ENTITY_CREATE, "nifi"));
+ hook.addMessage(new HookNotification(HookNotification.HookNotificationType.ENTITY_PARTIAL_UPDATE, "nifi"));
hook.commitMessages();
diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/hook/TestNotificationSender.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/hook/TestNotificationSender.java
index 7d08810..2a9e768 100644
--- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/hook/TestNotificationSender.java
+++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/hook/TestNotificationSender.java
@@ -19,8 +19,10 @@ package org.apache.nifi.atlas.hook;
import org.apache.atlas.AtlasServiceException;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasObjectId;
-import org.apache.atlas.notification.hook.HookNotification;
-import org.apache.atlas.typesystem.Referenceable;
+import org.apache.atlas.model.notification.HookNotification;
+import org.apache.atlas.v1.model.instance.Referenceable;
+import org.apache.atlas.v1.model.notification.HookNotificationV1;
+import org.apache.hadoop.security.UserGroupInformation;
import org.apache.nifi.atlas.AtlasUtils;
import org.apache.nifi.atlas.NiFiAtlasClient;
import org.junit.Test;
@@ -34,7 +36,7 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import java.util.function.Consumer;
+import java.util.function.BiConsumer;
import java.util.stream.Collectors;
import static java.util.Arrays.asList;
@@ -60,10 +62,10 @@ public class TestNotificationSender {
private static final Logger logger = LoggerFactory.getLogger(TestNotificationSender.class);
- private static class Notifier implements Consumer<List<HookNotification.HookNotificationMessage>> {
- private final List<List<HookNotification.HookNotificationMessage>> notifications = new ArrayList<>();
+ private static class Notifier implements BiConsumer<List<HookNotification>, UserGroupInformation> {
+ private final List<List<HookNotification>> notifications = new ArrayList<>();
@Override
- public void accept(List<HookNotification.HookNotificationMessage> messages) {
+ public void accept(List<HookNotification> messages, UserGroupInformation ugi) {
logger.info("notified at {}, {}", notifications.size(), messages);
notifications.add(messages);
}
@@ -72,7 +74,7 @@ public class TestNotificationSender {
@Test
public void testZeroMessage() {
final NotificationSender sender = new NotificationSender();
- final List<HookNotification.HookNotificationMessage> messages = Collections.emptyList();
+ final List<HookNotification> messages = Collections.emptyList();
final Notifier notifier = new Notifier();
sender.send(messages, notifier);
assertEquals(0, notifier.notifications.get(0).size());
@@ -88,9 +90,9 @@ public class TestNotificationSender {
@SuppressWarnings("unchecked")
private void assertCreateMessage(Notifier notifier, int notificationIndex, Referenceable ... expects) {
assertTrue(notifier.notifications.size() > notificationIndex);
- final List<HookNotification.HookNotificationMessage> messages = notifier.notifications.get(notificationIndex);
+ final List<HookNotification> messages = notifier.notifications.get(notificationIndex);
assertEquals(1, messages.size());
- final HookNotification.EntityCreateRequest message = (HookNotification.EntityCreateRequest) messages.get(0);
+ final HookNotificationV1.EntityCreateRequest message = (HookNotificationV1.EntityCreateRequest) messages.get(0);
assertEquals(expects.length, message.getEntities().size());
// The use of 'flatMap' at NotificationSender does not preserve actual entities order.
@@ -135,11 +137,11 @@ public class TestNotificationSender {
@SuppressWarnings("unchecked")
private void assertUpdateFlowPathMessage(Notifier notifier, int notificationIndex, Referenceable ... expects) {
assertTrue(notifier.notifications.size() > notificationIndex);
- final List<HookNotification.HookNotificationMessage> messages = notifier.notifications.get(notificationIndex);
+ final List<HookNotification> messages = notifier.notifications.get(notificationIndex);
assertEquals(expects.length, messages.size());
for (int i = 0; i < expects.length; i++) {
final Referenceable expect = expects[i];
- final HookNotification.EntityPartialUpdateRequest actual = (HookNotification.EntityPartialUpdateRequest) messages.get(i);
+ final HookNotificationV1.EntityPartialUpdateRequest actual = (HookNotificationV1.EntityPartialUpdateRequest) messages.get(i);
assertEquals(expect.getTypeName(), actual.getTypeName());
assertEquals(ATTR_QUALIFIED_NAME, actual.getAttribute());
assertEquals(expect.get(ATTR_QUALIFIED_NAME), actual.getAttributeValue());
@@ -155,8 +157,8 @@ public class TestNotificationSender {
public void testOneCreateDataSetMessage() {
final NotificationSender sender = new NotificationSender();
final Referenceable queue1 = createRef(TYPE_NIFI_QUEUE, "queue1@test");
- final List<HookNotification.HookNotificationMessage> messages = Collections.singletonList(
- new HookNotification.EntityCreateRequest(NIFI_USER, queue1));
+ final List<HookNotification> messages = Collections.singletonList(
+ new HookNotificationV1.EntityCreateRequest(NIFI_USER, queue1));
final Notifier notifier = new Notifier();
sender.send(messages, notifier);
@@ -275,46 +277,46 @@ public class TestNotificationSender {
ff4_pathB22.set(ATTR_OUTPUTS, singleton(ff4_data22));
- final List<HookNotification.HookNotificationMessage> messages = asList(
- new HookNotification.EntityCreateRequest(NIFI_USER, ff1_data1),
- new HookNotification.EntityCreateRequest(NIFI_USER, ff1_data11),
- new HookNotification.EntityCreateRequest(NIFI_USER, ff1_data12),
- new HookNotification.EntityCreateRequest(NIFI_USER, ff1_pathA11),
- new HookNotification.EntityCreateRequest(NIFI_USER, ff1_pathA12),
- new HookNotification.EntityCreateRequest(NIFI_USER, ff1_fork11),
- new HookNotification.EntityCreateRequest(NIFI_USER, ff1_fork12),
- new HookNotification.EntityCreateRequest(NIFI_USER, ff1_pathB11),
- new HookNotification.EntityCreateRequest(NIFI_USER, ff1_pathB12),
-
- new HookNotification.EntityCreateRequest(NIFI_USER, ff2_data2),
- new HookNotification.EntityCreateRequest(NIFI_USER, ff2_data21),
- new HookNotification.EntityCreateRequest(NIFI_USER, ff2_data22),
- new HookNotification.EntityCreateRequest(NIFI_USER, ff2_pathA21),
- new HookNotification.EntityCreateRequest(NIFI_USER, ff2_pathA22),
- new HookNotification.EntityCreateRequest(NIFI_USER, ff2_fork21),
- new HookNotification.EntityCreateRequest(NIFI_USER, ff2_fork22),
- new HookNotification.EntityCreateRequest(NIFI_USER, ff2_pathB21),
- new HookNotification.EntityCreateRequest(NIFI_USER, ff2_pathB22),
-
- new HookNotification.EntityCreateRequest(NIFI_USER, ff3_data1),
- new HookNotification.EntityCreateRequest(NIFI_USER, ff3_data11),
- new HookNotification.EntityCreateRequest(NIFI_USER, ff3_data12),
- new HookNotification.EntityCreateRequest(NIFI_USER, ff3_pathA11),
- new HookNotification.EntityCreateRequest(NIFI_USER, ff3_pathA12),
- new HookNotification.EntityCreateRequest(NIFI_USER, ff3_fork11),
- new HookNotification.EntityCreateRequest(NIFI_USER, ff3_fork12),
- new HookNotification.EntityCreateRequest(NIFI_USER, ff3_pathB11),
- new HookNotification.EntityCreateRequest(NIFI_USER, ff3_pathB12),
-
- new HookNotification.EntityCreateRequest(NIFI_USER, ff4_data2),
- new HookNotification.EntityCreateRequest(NIFI_USER, ff4_data21),
- new HookNotification.EntityCreateRequest(NIFI_USER, ff4_data22),
- new HookNotification.EntityCreateRequest(NIFI_USER, ff4_pathA21),
- new HookNotification.EntityCreateRequest(NIFI_USER, ff4_pathA22),
- new HookNotification.EntityCreateRequest(NIFI_USER, ff4_fork21),
- new HookNotification.EntityCreateRequest(NIFI_USER, ff4_fork22),
- new HookNotification.EntityCreateRequest(NIFI_USER, ff4_pathB21),
- new HookNotification.EntityCreateRequest(NIFI_USER, ff4_pathB22)
+ final List<HookNotification> messages = asList(
+ new HookNotificationV1.EntityCreateRequest(NIFI_USER, ff1_data1),
+ new HookNotificationV1.EntityCreateRequest(NIFI_USER, ff1_data11),
+ new HookNotificationV1.EntityCreateRequest(NIFI_USER, ff1_data12),
+ new HookNotificationV1.EntityCreateRequest(NIFI_USER, ff1_pathA11),
+ new HookNotificationV1.EntityCreateRequest(NIFI_USER, ff1_pathA12),
+ new HookNotificationV1.EntityCreateRequest(NIFI_USER, ff1_fork11),
+ new HookNotificationV1.EntityCreateRequest(NIFI_USER, ff1_fork12),
+ new HookNotificationV1.EntityCreateRequest(NIFI_USER, ff1_pathB11),
+ new HookNotificationV1.EntityCreateRequest(NIFI_USER, ff1_pathB12),
+
+ new HookNotificationV1.EntityCreateRequest(NIFI_USER, ff2_data2),
+ new HookNotificationV1.EntityCreateRequest(NIFI_USER, ff2_data21),
+ new HookNotificationV1.EntityCreateRequest(NIFI_USER, ff2_data22),
+ new HookNotificationV1.EntityCreateRequest(NIFI_USER, ff2_pathA21),
+ new HookNotificationV1.EntityCreateRequest(NIFI_USER, ff2_pathA22),
+ new HookNotificationV1.EntityCreateRequest(NIFI_USER, ff2_fork21),
+ new HookNotificationV1.EntityCreateRequest(NIFI_USER, ff2_fork22),
+ new HookNotificationV1.EntityCreateRequest(NIFI_USER, ff2_pathB21),
+ new HookNotificationV1.EntityCreateRequest(NIFI_USER, ff2_pathB22),
+
+ new HookNotificationV1.EntityCreateRequest(NIFI_USER, ff3_data1),
+ new HookNotificationV1.EntityCreateRequest(NIFI_USER, ff3_data11),
+ new HookNotificationV1.EntityCreateRequest(NIFI_USER, ff3_data12),
+ new HookNotificationV1.EntityCreateRequest(NIFI_USER, ff3_pathA11),
+ new HookNotificationV1.EntityCreateRequest(NIFI_USER, ff3_pathA12),
+ new HookNotificationV1.EntityCreateRequest(NIFI_USER, ff3_fork11),
+ new HookNotificationV1.EntityCreateRequest(NIFI_USER, ff3_fork12),
+ new HookNotificationV1.EntityCreateRequest(NIFI_USER, ff3_pathB11),
+ new HookNotificationV1.EntityCreateRequest(NIFI_USER, ff3_pathB12),
+
+ new HookNotificationV1.EntityCreateRequest(NIFI_USER, ff4_data2),
+ new HookNotificationV1.EntityCreateRequest(NIFI_USER, ff4_data21),
+ new HookNotificationV1.EntityCreateRequest(NIFI_USER, ff4_data22),
+ new HookNotificationV1.EntityCreateRequest(NIFI_USER, ff4_pathA21),
+ new HookNotificationV1.EntityCreateRequest(NIFI_USER, ff4_pathA22),
+ new HookNotificationV1.EntityCreateRequest(NIFI_USER, ff4_fork21),
+ new HookNotificationV1.EntityCreateRequest(NIFI_USER, ff4_fork22),
+ new HookNotificationV1.EntityCreateRequest(NIFI_USER, ff4_pathB21),
+ new HookNotificationV1.EntityCreateRequest(NIFI_USER, ff4_pathB22)
);
final Notifier notifier = new Notifier();
@@ -376,10 +378,10 @@ public class TestNotificationSender {
newPath1Lineage.set(ATTR_INPUTS, singleton(fileC));
newPath1Lineage.set(ATTR_OUTPUTS, singleton(fileD));
- final List<HookNotification.HookNotificationMessage> messages = asList(
- new HookNotification.EntityCreateRequest(NIFI_USER, fileC),
- new HookNotification.EntityCreateRequest(NIFI_USER, fileD),
- new HookNotification.EntityPartialUpdateRequest(NIFI_USER, TYPE_NIFI_FLOW_PATH, ATTR_QUALIFIED_NAME, "path1@test", newPath1Lineage)
+ final List<HookNotification> messages = asList(
+ new HookNotificationV1.EntityCreateRequest(NIFI_USER, fileC),
+ new HookNotificationV1.EntityCreateRequest(NIFI_USER, fileD),
+ new HookNotificationV1.EntityPartialUpdateRequest(NIFI_USER, TYPE_NIFI_FLOW_PATH, ATTR_QUALIFIED_NAME, "path1@test", newPath1Lineage)
);
final NiFiAtlasClient atlasClient = mock(NiFiAtlasClient.class);
diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestHBaseTable.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestHBaseTable.java
index dddea2a..5217e74 100644
--- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestHBaseTable.java
+++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestHBaseTable.java
@@ -16,7 +16,7 @@
*/
package org.apache.nifi.atlas.provenance.analyzer;
-import org.apache.atlas.typesystem.Referenceable;
+import org.apache.atlas.v1.model.instance.Referenceable;
import org.apache.nifi.atlas.provenance.AnalysisContext;
import org.apache.nifi.atlas.provenance.DataSetRefs;
import org.apache.nifi.atlas.provenance.NiFiProvenanceEventAnalyzer;
diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestHDFSPath.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestHDFSPath.java
index c02b72a..3d94a33 100644
--- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestHDFSPath.java
+++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestHDFSPath.java
@@ -16,7 +16,7 @@
*/
package org.apache.nifi.atlas.provenance.analyzer;
-import org.apache.atlas.typesystem.Referenceable;
+import org.apache.atlas.v1.model.instance.Referenceable;
import org.apache.nifi.atlas.provenance.AnalysisContext;
import org.apache.nifi.atlas.provenance.DataSetRefs;
import org.apache.nifi.atlas.provenance.NiFiProvenanceEventAnalyzer;
diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestHive2JDBC.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestHive2JDBC.java
index 2f64afb..5d5fcd6 100644
--- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestHive2JDBC.java
+++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestHive2JDBC.java
@@ -16,7 +16,7 @@
*/
package org.apache.nifi.atlas.provenance.analyzer;
-import org.apache.atlas.typesystem.Referenceable;
+import org.apache.atlas.v1.model.instance.Referenceable;
import org.apache.nifi.atlas.provenance.AnalysisContext;
import org.apache.nifi.atlas.provenance.DataSetRefs;
import org.apache.nifi.atlas.provenance.NiFiProvenanceEventAnalyzer;
diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestKafkaTopic.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestKafkaTopic.java
index cbe805c..b8a68fb 100644
--- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestKafkaTopic.java
+++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestKafkaTopic.java
@@ -16,7 +16,7 @@
*/
package org.apache.nifi.atlas.provenance.analyzer;
-import org.apache.atlas.typesystem.Referenceable;
+import org.apache.atlas.v1.model.instance.Referenceable;
import org.apache.nifi.atlas.provenance.AnalysisContext;
import org.apache.nifi.atlas.provenance.DataSetRefs;
import org.apache.nifi.atlas.provenance.NiFiProvenanceEventAnalyzer;
diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestNiFiRemotePort.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestNiFiRemotePort.java
index 4585826..e4799b3 100644
--- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestNiFiRemotePort.java
+++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestNiFiRemotePort.java
@@ -16,7 +16,7 @@
*/
package org.apache.nifi.atlas.provenance.analyzer;
-import org.apache.atlas.typesystem.Referenceable;
+import org.apache.atlas.v1.model.instance.Referenceable;
import org.apache.nifi.atlas.provenance.AnalysisContext;
import org.apache.nifi.atlas.provenance.DataSetRefs;
import org.apache.nifi.atlas.provenance.NiFiProvenanceEventAnalyzer;
diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestNiFiRootGroupPort.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestNiFiRootGroupPort.java
index a09f207..84777af 100644
--- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestNiFiRootGroupPort.java
+++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestNiFiRootGroupPort.java
@@ -16,7 +16,7 @@
*/
package org.apache.nifi.atlas.provenance.analyzer;
-import org.apache.atlas.typesystem.Referenceable;
+import org.apache.atlas.v1.model.instance.Referenceable;
import org.apache.nifi.atlas.provenance.AnalysisContext;
import org.apache.nifi.atlas.provenance.DataSetRefs;
import org.apache.nifi.atlas.provenance.NiFiProvenanceEventAnalyzer;
diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestPutHiveStreaming.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestPutHiveStreaming.java
index c816348..5184025 100644
--- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestPutHiveStreaming.java
+++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestPutHiveStreaming.java
@@ -16,7 +16,7 @@
*/
package org.apache.nifi.atlas.provenance.analyzer;
-import org.apache.atlas.typesystem.Referenceable;
+import org.apache.atlas.v1.model.instance.Referenceable;
import org.apache.nifi.atlas.provenance.AnalysisContext;
import org.apache.nifi.atlas.provenance.DataSetRefs;
import org.apache.nifi.atlas.provenance.NiFiProvenanceEventAnalyzer;
diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestUnknownDataSet.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestUnknownDataSet.java
index 01616b3..96623a5 100644
--- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestUnknownDataSet.java
+++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestUnknownDataSet.java
@@ -16,7 +16,7 @@
*/
package org.apache.nifi.atlas.provenance.analyzer;
-import org.apache.atlas.typesystem.Referenceable;
+import org.apache.atlas.v1.model.instance.Referenceable;
import org.apache.nifi.atlas.provenance.AnalysisContext;
import org.apache.nifi.atlas.provenance.DataSetRefs;
import org.apache.nifi.atlas.provenance.NiFiProvenanceEventAnalyzer;
diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/reporting/ITReportLineageToAtlas.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/reporting/ITReportLineageToAtlas.java
index 31293bb..55d45a2 100644
--- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/reporting/ITReportLineageToAtlas.java
+++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/reporting/ITReportLineageToAtlas.java
@@ -26,6 +26,7 @@ import org.apache.nifi.controller.status.ConnectionStatus;
import org.apache.nifi.controller.status.PortStatus;
import org.apache.nifi.controller.status.ProcessGroupStatus;
import org.apache.nifi.controller.status.ProcessorStatus;
+import org.apache.nifi.controller.status.RemoteProcessGroupStatus;
import org.apache.nifi.flowfile.attributes.SiteToSiteAttributes;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceRepository;
@@ -123,7 +124,38 @@ public class ITReportLineageToAtlas {
throw new RuntimeException("Failed to parse template", e);
}
- return handler.getRootProcessGroupStatus();
+ ProcessGroupStatus rootPgStatus = handler.getRootProcessGroupStatus();
+
+ for (ConnectionStatus connectionStatus : rootPgStatus.getConnectionStatus()) {
+ connectionStatus.setSourceName(lookupComponentName(rootPgStatus, connectionStatus.getSourceId()));
+ connectionStatus.setDestinationName(lookupComponentName(rootPgStatus, connectionStatus.getDestinationId()));
+ }
+
+ return rootPgStatus;
+ }
+
+ private String lookupComponentName(ProcessGroupStatus rootPgStatus, String componentId) {
+ for (ProcessorStatus processorStatus : rootPgStatus.getProcessorStatus()) {
+ if (processorStatus.getId().equals(componentId)) {
+ return processorStatus.getName();
+ }
+ }
+ for (PortStatus portStatus : rootPgStatus.getInputPortStatus()) {
+ if (portStatus.getId().equals(componentId)) {
+ return portStatus.getName();
+ }
+ }
+ for (PortStatus portStatus : rootPgStatus.getOutputPortStatus()) {
+ if (portStatus.getId().equals(componentId)) {
+ return portStatus.getName();
+ }
+ }
+ for (RemoteProcessGroupStatus remoteProcessGroupStatus : rootPgStatus.getRemoteProcessGroupStatus()) {
+ if (remoteProcessGroupStatus.getId().equals(componentId)) {
+ return remoteProcessGroupStatus.getName();
+ }
+ }
+ return null; // funnels do not have names
}
private static class TemplateContentHander implements ContentHandler {
diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/reporting/TestReportLineageToAtlas.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/reporting/TestReportLineageToAtlas.java
index 18a4f37..bb4297f 100644
--- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/reporting/TestReportLineageToAtlas.java
+++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/reporting/TestReportLineageToAtlas.java
@@ -17,11 +17,15 @@
package org.apache.nifi.atlas.reporting;
import com.sun.jersey.api.client.Client;
+import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasClientV2;
+import org.apache.atlas.hook.AtlasHook;
+import org.apache.commons.configuration.Configuration;
import org.apache.nifi.atlas.NiFiAtlasClient;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.reporting.ReportingContext;
import org.apache.nifi.reporting.ReportingInitializationContext;
import org.apache.nifi.util.MockComponentLog;
@@ -35,11 +39,14 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
+import java.util.Properties;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
@@ -54,6 +61,7 @@ import static org.apache.nifi.atlas.reporting.ReportLineageToAtlas.ATLAS_URLS;
import static org.apache.nifi.atlas.reporting.ReportLineageToAtlas.ATLAS_USER;
import static org.apache.nifi.atlas.reporting.ReportLineageToAtlas.KAFKA_BOOTSTRAP_SERVERS;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
@@ -128,7 +136,9 @@ public class TestReportLineageToAtlas {
@Test
public void testDefaultConnectAndReadTimeout() throws Exception {
// GIVEN
- Map<PropertyDescriptor, String> properties = new HashMap<>();
+ String atlasConfDir = createAtlasConfDir();
+
+ Map<PropertyDescriptor, String> properties = initReportingTaskProperties(atlasConfDir);
// WHEN
// THEN
@@ -141,21 +151,12 @@ public class TestReportLineageToAtlas {
int expectedConnectTimeoutMs = 10000;
int expectedReadTimeoutMs = 5000;
- String atlasConfDir = "target/atlasConfDir";
- File directory = new File(atlasConfDir);
- if (!directory.exists()) {
- directory.mkdirs();
- }
+ String atlasConfDir = createAtlasConfDir();
- Map<PropertyDescriptor, String> properties = new HashMap<>();
+ Map<PropertyDescriptor, String> properties = initReportingTaskProperties(atlasConfDir);
properties.put(ATLAS_CONNECT_TIMEOUT, (expectedConnectTimeoutMs / 1000) + " sec");
properties.put(ATLAS_READ_TIMEOUT, (expectedReadTimeoutMs / 1000) + " sec");
- properties.put(ATLAS_CONF_DIR, atlasConfDir);
- properties.put(ATLAS_CONF_CREATE, "true");
- properties.put(ATLAS_DEFAULT_CLUSTER_NAME, "defaultClusterName");
- properties.put(KAFKA_BOOTSTRAP_SERVERS, "http://localhost:6667");
-
// WHEN
// THEN
testConnectAndReadTimeout(properties, expectedConnectTimeoutMs, expectedReadTimeoutMs);
@@ -163,11 +164,6 @@ public class TestReportLineageToAtlas {
private void testConnectAndReadTimeout(Map<PropertyDescriptor, String> properties, Integer expectedConnectTimeout, Integer expectedReadTimeout) throws Exception {
// GIVEN
- properties.put(ATLAS_NIFI_URL, "http://localhost:8080/nifi");
- properties.put(ATLAS_URLS, "http://localhost:27000");
- properties.put(ATLAS_USER, "admin");
- properties.put(ATLAS_PASSWORD, "admin123");
-
reportingContext = mock(ReportingContext.class);
when(reportingContext.getProperties()).thenReturn(properties);
when(reportingContext.getProperty(any())).then(invocation -> new MockPropertyValue(properties.get(invocation.getArguments()[0])));
@@ -200,4 +196,83 @@ public class TestReportLineageToAtlas {
assertEquals(expectedConnectTimeout, actualConnectTimeout);
assertEquals(expectedReadTimeout, actualReadTimeout);
}
+
+ @Test
+ public void testNotificationSendingIsSynchronousWhenAtlasConfIsGenerated() throws Exception {
+ String atlasConfDir = createAtlasConfDir();
+
+ Map<PropertyDescriptor, String> properties = initReportingTaskProperties(atlasConfDir);
+
+ testNotificationSendingIsSynchronous(properties);
+ }
+
+ @Test
+ public void testNotificationSendingIsSynchronousWhenAtlasConfIsProvidedAndSynchronousModeHasBeenSet() throws Exception {
+ String atlasConfDir = createAtlasConfDir();
+
+ Properties atlasConf = new Properties();
+ atlasConf.setProperty(AtlasHook.ATLAS_NOTIFICATION_ASYNCHRONOUS, "false");
+ saveAtlasConf(atlasConfDir, atlasConf);
+
+ Map<PropertyDescriptor, String> properties = initReportingTaskProperties(atlasConfDir);
+ properties.put(ATLAS_CONF_CREATE, "false");
+
+ testNotificationSendingIsSynchronous(properties);
+ }
+
+ private void testNotificationSendingIsSynchronous(Map<PropertyDescriptor, String> properties) throws Exception {
+ ConfigurationContext configurationContext = new MockConfigurationContext(properties, null);
+
+ testSubject.initialize(initializationContext);
+ testSubject.setup(configurationContext);
+
+ Configuration atlasProperties = ApplicationProperties.get();
+ boolean isAsync = atlasProperties.getBoolean(AtlasHook.ATLAS_NOTIFICATION_ASYNCHRONOUS, Boolean.TRUE);
+ assertFalse(isAsync);
+ }
+
+ @Test(expected = ProcessException.class)
+ public void testThrowExceptionWhenAtlasConfIsProvidedButSynchronousModeHasNotBeenSet() throws Exception {
+ String atlasConfDir = createAtlasConfDir();
+
+ Properties atlasConf = new Properties();
+ saveAtlasConf(atlasConfDir, atlasConf);
+
+ Map<PropertyDescriptor, String> properties = initReportingTaskProperties(atlasConfDir);
+ properties.put(ATLAS_CONF_CREATE, "false");
+
+ ConfigurationContext configurationContext = new MockConfigurationContext(properties, null);
+
+ testSubject.initialize(initializationContext);
+ testSubject.setup(configurationContext);
+ }
+
+ private String createAtlasConfDir() {
+ String atlasConfDir = "target/atlasConfDir";
+ File directory = new File(atlasConfDir);
+ if (!directory.exists()) {
+ directory.mkdirs();
+ }
+ return atlasConfDir;
+ }
+
+ private void saveAtlasConf(String atlasConfDir, Properties atlasConf) throws IOException {
+ FileOutputStream fos = new FileOutputStream(atlasConfDir + File.separator + ApplicationProperties.APPLICATION_PROPERTIES);
+ atlasConf.store(fos, "Atlas test config");
+ }
+
+ private Map<PropertyDescriptor, String> initReportingTaskProperties(String atlasConfDir) {
+ Map<PropertyDescriptor, String> properties = new HashMap<>();
+
+ properties.put(ATLAS_URLS, "http://localhost:21000");
+ properties.put(ATLAS_NIFI_URL, "http://localhost:8080/nifi");
+ properties.put(ATLAS_CONF_DIR, atlasConfDir);
+ properties.put(ATLAS_CONF_CREATE, "true");
+ properties.put(ATLAS_DEFAULT_CLUSTER_NAME, "defaultClusterName");
+ properties.put(ATLAS_USER, "admin");
+ properties.put(ATLAS_PASSWORD, "password");
+ properties.put(KAFKA_BOOTSTRAP_SERVERS, "http://localhost:9092");
+
+ return properties;
+ }
}
diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/resources/atlas-application.properties b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/resources/atlas-application.properties
index 927347d..8d845ce 100644
--- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/resources/atlas-application.properties
+++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/resources/atlas-application.properties
@@ -16,3 +16,5 @@ atlas.cluster.name=AtlasCluster
# atlas.kafka.bootstrap.servers=atlas.example.com:6667
atlas.kafka.bootstrap.servers=localhost:9092
+
+atlas.notification.hook.asynchronous=false
diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/resources/server.properties b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/resources/server.properties
index d15debc..c0363c1 100644
--- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/resources/server.properties
+++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/resources/server.properties
@@ -12,6 +12,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
+
# see kafka.server.KafkaConfig for additional details and defaults
############################# Server Basics #############################
@@ -21,25 +22,26 @@ broker.id=0
############################# Socket Server Settings #############################
-# The port the socket server listens on
-port=9092
-
-# Hostname the broker will bind to. If not set, the server will bind to all interfaces
-#host.name=localhost
+# The address the socket server listens on. It will get the value returned from
+# java.net.InetAddress.getCanonicalHostName() if not configured.
+# FORMAT:
+# listeners = listener_name://host_name:port
+# EXAMPLE:
+# listeners = PLAINTEXT://your.host.name:9092
+#listeners=PLAINTEXT://:9092
-# Hostname the broker will advertise to producers and consumers. If not set, it uses the
-# value for "host.name" if configured. Otherwise, it will use the value returned from
-# java.net.InetAddress.getCanonicalHostName().
-#advertised.host.name=<hostname routable by clients>
+# Hostname and port the broker will advertise to producers and consumers. If not set,
+# it uses the value for "listeners" if configured. Otherwise, it will use the value
+# returned from java.net.InetAddress.getCanonicalHostName().
+#advertised.listeners=PLAINTEXT://your.host.name:9092
-# The port to publish to ZooKeeper for clients to use. If this is not set,
-# it will publish the same port that the broker binds to.
-#advertised.port=<port accessible by clients>
+# Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details
+#listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
-# The number of threads handling network requests
+# The number of threads that the server uses for receiving requests from the network and sending responses to the network
num.network.threads=3
-# The number of threads doing disk I/O
+# The number of threads that the server uses for processing requests, which may include disk I/O
num.io.threads=8
# The send buffer (SO_SNDBUF) used by the socket server
@@ -54,7 +56,7 @@ socket.request.max.bytes=104857600
############################# Log Basics #############################
-# A comma seperated list of directories under which to store log files
+# A comma separated list of directories under which to store log files
log.dirs=target/kafka-tmp/kafka-logs
# The default number of log partitions per topic. More partitions allow greater
@@ -66,6 +68,13 @@ num.partitions=1
# This value is recommended to be increased for installations with data dirs located in RAID array.
num.recovery.threads.per.data.dir=1
+############################# Internal Topic Settings #############################
+# The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state"
+# For anything other than development testing, a value greater than 1 is recommended for to ensure availability such as 3.
+offsets.topic.replication.factor=1
+transaction.state.log.replication.factor=1
+transaction.state.log.min.isr=1
+
############################# Log Flush Policy #############################
# Messages are immediately written to the filesystem but by default we only fsync() to sync
@@ -73,7 +82,7 @@ num.recovery.threads.per.data.dir=1
# There are a few important trade-offs here:
# 1. Durability: Unflushed data may be lost if you are not using replication.
# 2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
-# 3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to exceessive seeks.
+# 3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to excessive seeks.
# The settings below allow one to configure the flush policy to flush data after a period of time or
# every N messages (or both). This can be done globally and overridden on a per-topic basis.
@@ -90,11 +99,11 @@ num.recovery.threads.per.data.dir=1
# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
# from the end of the log.
-# The minimum age of a log file to be eligible for deletion
+# The minimum age of a log file to be eligible for deletion due to age
log.retention.hours=168
-# A size-based retention policy for logs. Segments are pruned from the log as long as the remaining
-# segments don't drop below log.retention.bytes.
+# A size-based retention policy for logs. Segments are pruned from the log unless the remaining
+# segments drop below log.retention.bytes. Functions independently of log.retention.hours.
#log.retention.bytes=1073741824
# The maximum size of a log segment file. When this size is reached a new log segment will be created.
@@ -104,10 +113,6 @@ log.segment.bytes=1073741824
# to the retention policies
log.retention.check.interval.ms=300000
-# By default the log cleaner is disabled and the log retention policy will default to just delete segments after their retention expires.
-# If log.cleaner.enable=true is set the cleaner will be enabled and individual logs can then be marked for log compaction.
-log.cleaner.enable=false
-
############################# Zookeeper #############################
# Zookeeper connection string (see zookeeper docs for details).
@@ -119,3 +124,13 @@ zookeeper.connect=localhost:2181
# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=6000
+
+
+############################# Group Coordinator Settings #############################
+
+# The following configuration specifies the time, in milliseconds, that the GroupCoordinator will delay the initial consumer rebalance.
+# The rebalance will be further delayed by the value of group.initial.rebalance.delay.ms as new members join the group, up to a maximum of max.poll.interval.ms.
+# The default value for this is 3 seconds.
+# We override this to 0 here as it makes for a better out-of-the-box experience for development and testing.
+# However, in production environments the default value of 3 seconds is more suitable as this will help to avoid unnecessary, and potentially expensive, rebalances during application startup.
+group.initial.rebalance.delay.ms=0
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-atlas-bundle/pom.xml b/nifi-nar-bundles/nifi-atlas-bundle/pom.xml
index 96fef2d..9ffbf87 100644
--- a/nifi-nar-bundles/nifi-atlas-bundle/pom.xml
+++ b/nifi-nar-bundles/nifi-atlas-bundle/pom.xml
@@ -22,13 +22,13 @@
<version>1.11.0-SNAPSHOT</version>
</parent>
- <groupId>org.apache.nifi</groupId>
<artifactId>nifi-atlas-bundle</artifactId>
- <version>1.11.0-SNAPSHOT</version>
<packaging>pom</packaging>
+
<properties>
- <atlas.version>0.8.1</atlas.version>
+ <atlas.version>2.0.0</atlas.version>
</properties>
+
<modules>
<module>nifi-atlas-reporting-task</module>
<module>nifi-atlas-nar</module>
@@ -37,94 +37,20 @@
<dependencyManagement>
<dependencies>
<dependency>
- <!-- Explicitly force Netty to 3.7.1 due to CVE-2014-0193 -->
- <groupId>io.netty</groupId>
- <artifactId>netty</artifactId>
- <version>3.7.1.Final</version>
- </dependency>
- <dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-atlas-reporting-task</artifactId>
<version>1.11.0-SNAPSHOT</version>
</dependency>
- <dependency>
- <groupId>org.apache.nifi</groupId>
- <artifactId>nifi-client-dto</artifactId>
- <version>1.11.0-SNAPSHOT</version>
- </dependency>
- <dependency>
- <groupId>org.apache.atlas</groupId>
- <artifactId>atlas-client</artifactId>
- <version>${atlas.version}</version>
- <exclusions>
- <exclusion>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-log4j12</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.slf4j</groupId>
- <artifactId>jul-to-slf4j</artifactId>
- </exclusion>
- <!--
- <exclusion>
- <groupId>log4j</groupId>
- <artifactId>log4j</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-minikdc</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-annotations</artifactId>
- </exclusion>
- <!-
- Exclude these Atlas typesystem to reduce dependency size.
- Use atlas-intg and atlas-common instead.
- ->
- <exclusion>
- <groupId>org.apache.atlas</groupId>
- <artifactId>atlas-typesystem</artifactId>
- </exclusion>
- -->
- </exclusions>
- </dependency>
- <dependency>
- <groupId>org.apache.atlas</groupId>
- <artifactId>atlas-intg</artifactId>
- <version>${atlas.version}</version>
- <exclusions>
- <exclusion>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-log4j12</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.slf4j</groupId>
- <artifactId>jul-to-slf4j</artifactId>
- </exclusion>
- <exclusion>
- <groupId>log4j</groupId>
- <artifactId>log4j</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
+
<dependency>
<groupId>org.apache.atlas</groupId>
- <artifactId>atlas-common</artifactId>
+ <artifactId>atlas-client-v2</artifactId>
<version>${atlas.version}</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
- <exclusion>
- <groupId>org.slf4j</groupId>
- <artifactId>jul-to-slf4j</artifactId>
- </exclusion>
- <exclusion>
- <groupId>log4j</groupId>
- <artifactId>log4j</artifactId>
- </exclusion>
</exclusions>
</dependency>
<dependency>
@@ -136,12 +62,10 @@
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
- <!--
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
- -->
</exclusions>
</dependency>
</dependencies>