You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2020/12/03 19:43:05 UTC
[nifi] 01/02: NIFI-8060 Added minimal VolatileProvenanceRepository
to nifi-stateless-engine and remove dependency on
nifi-volatile-provenance-repo module
This is an automated email from the ASF dual-hosted git repository.
markap14 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
commit 8ac8a2bd1fbf75b8458554988eb1b3b1851b25d9
Author: Bryan Bende <bb...@apache.org>
AuthorDate: Wed Dec 2 14:57:49 2020 -0500
NIFI-8060 Added minimal VolatileProvenanceRepository to nifi-stateless-engine and remove dependency on nifi-volatile-provenance-repo module
---
.../nifi-stateless-engine/pom.xml | 9 -
.../flow/StandardStatelessDataflowFactory.java | 4 +-
.../repository/VolatileProvenanceRepository.java | 409 +++++++++++++++++++++
.../nifi-stateless-nar/pom.xml | 9 -
4 files changed, 411 insertions(+), 20 deletions(-)
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-engine/pom.xml b/nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-engine/pom.xml
index 000e9fb..9d29e05 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-engine/pom.xml
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-engine/pom.xml
@@ -91,15 +91,6 @@
<version>${jackson.version}</version>
</dependency>
- <dependency>
- <groupId>org.apache.nifi</groupId>
- <artifactId>nifi-volatile-provenance-repository</artifactId>
- <version>1.13.0-SNAPSHOT</version>
- <scope>compile</scope>
- <optional>true</optional>
- </dependency>
-
-
<!-- Test Dependencies -->
<dependency>
<groupId>junit</groupId>
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessDataflowFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessDataflowFactory.java
index 02a4b34..061d840 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessDataflowFactory.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessDataflowFactory.java
@@ -47,7 +47,6 @@ import org.apache.nifi.parameter.ParameterContextManager;
import org.apache.nifi.parameter.StandardParameterContextManager;
import org.apache.nifi.provenance.IdentifierLookup;
import org.apache.nifi.provenance.ProvenanceRepository;
-import org.apache.nifi.provenance.VolatileProvenanceRepository;
import org.apache.nifi.registry.VariableRegistry;
import org.apache.nifi.registry.flow.FlowRegistryClient;
import org.apache.nifi.registry.flow.InMemoryFlowRegistry;
@@ -75,6 +74,7 @@ import org.apache.nifi.stateless.repository.ByteArrayContentRepository;
import org.apache.nifi.stateless.repository.RepositoryContextFactory;
import org.apache.nifi.stateless.repository.StatelessFlowFileRepository;
import org.apache.nifi.stateless.repository.StatelessRepositoryContextFactory;
+import org.apache.nifi.stateless.repository.VolatileProvenanceRepository;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -127,7 +127,7 @@ public class StandardStatelessDataflowFactory implements StatelessDataflowFactor
final ParameterContextManager parameterContextManager = new StandardParameterContextManager();
processScheduler = new StatelessProcessScheduler(extensionManager);
- provenanceRepo = new VolatileProvenanceRepository(1_000, "", "");
+ provenanceRepo = new VolatileProvenanceRepository(1_000);
provenanceRepo.initialize(EventReporter.NO_OP, new StatelessAuthorizer(), new StatelessProvenanceAuthorizableFactory(), IdentifierLookup.EMPTY);
final SSLContext sslContext;
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/repository/VolatileProvenanceRepository.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/repository/VolatileProvenanceRepository.java
new file mode 100644
index 0000000..3f67385
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/repository/VolatileProvenanceRepository.java
@@ -0,0 +1,409 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.stateless.repository;
+
+import org.apache.nifi.authorization.Authorizer;
+import org.apache.nifi.authorization.user.NiFiUser;
+import org.apache.nifi.events.EventReporter;
+import org.apache.nifi.provenance.AsyncLineageSubmission;
+import org.apache.nifi.provenance.IdentifierLookup;
+import org.apache.nifi.provenance.ProvenanceAuthorizableFactory;
+import org.apache.nifi.provenance.ProvenanceEventBuilder;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.ProvenanceEventRepository;
+import org.apache.nifi.provenance.ProvenanceEventType;
+import org.apache.nifi.provenance.ProvenanceRepository;
+import org.apache.nifi.provenance.StandardProvenanceEventRecord;
+import org.apache.nifi.provenance.lineage.ComputeLineageSubmission;
+import org.apache.nifi.provenance.search.Query;
+import org.apache.nifi.provenance.search.QuerySubmission;
+import org.apache.nifi.provenance.search.SearchableField;
+import org.apache.nifi.util.RingBuffer;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class VolatileProvenanceRepository implements ProvenanceRepository {
+
+ // default property values
+ public static final int DEFAULT_BUFFER_SIZE = 10000;
+
+ public static String CONTAINER_NAME = "in-memory";
+
+ private final RingBuffer<ProvenanceEventRecord> ringBuffer;
+ private final int maxSize;
+
+ private final AtomicLong idGenerator = new AtomicLong(0L);
+ private final AtomicBoolean initialized = new AtomicBoolean(false);
+
+ /**
+ * Default no args constructor for service loading only
+ */
+ public VolatileProvenanceRepository() {
+ ringBuffer = null;
+ maxSize = DEFAULT_BUFFER_SIZE;
+ }
+
+ public VolatileProvenanceRepository(final int maxEvents) {
+ maxSize = maxEvents;
+ ringBuffer = new RingBuffer<>(maxSize);
+ }
+
+ @Override
+ public void initialize(final EventReporter eventReporter, final Authorizer authorizer, final ProvenanceAuthorizableFactory resourceFactory,
+ final IdentifierLookup idLookup) throws IOException {
+ if (initialized.getAndSet(true)) {
+ return;
+ }
+ }
+
+ @Override
+ public ProvenanceEventRepository getProvenanceEventRepository() {
+ return this;
+ }
+
+ @Override
+ public ProvenanceEventBuilder eventBuilder() {
+ return new StandardProvenanceEventRecord.Builder();
+ }
+
+ @Override
+ public void registerEvent(final ProvenanceEventRecord event) {
+ final long id = idGenerator.getAndIncrement();
+ ringBuffer.add(new IdEnrichedProvEvent(event, id));
+ }
+
+ @Override
+ public void registerEvents(final Iterable<ProvenanceEventRecord> events) {
+ for (final ProvenanceEventRecord event : events) {
+ registerEvent(event);
+ }
+ }
+
+ @Override
+ public List<ProvenanceEventRecord> getEvents(final long firstRecordId, final int maxRecords) throws IOException {
+ return getEvents(firstRecordId, maxRecords, null);
+ }
+
+ @Override
+ public List<ProvenanceEventRecord> getEvents(final long firstRecordId, final int maxRecords, final NiFiUser user) throws IOException {
+ return ringBuffer.getSelectedElements(new RingBuffer.Filter<ProvenanceEventRecord>() {
+ @Override
+ public boolean select(final ProvenanceEventRecord value) {
+ if (!isAuthorized(value, user)) {
+ return false;
+ }
+
+ return value.getEventId() >= firstRecordId;
+ }
+ }, maxRecords);
+ }
+
+ @Override
+ public Long getMaxEventId() {
+ final ProvenanceEventRecord newest = ringBuffer.getNewestElement();
+ return (newest == null) ? null : newest.getEventId();
+ }
+
+ public ProvenanceEventRecord getEvent(final String identifier) throws IOException {
+ final List<ProvenanceEventRecord> records = ringBuffer.getSelectedElements(new RingBuffer.Filter<ProvenanceEventRecord>() {
+ @Override
+ public boolean select(final ProvenanceEventRecord event) {
+ return identifier.equals(event.getFlowFileUuid());
+ }
+ }, 1);
+ return records.isEmpty() ? null : records.get(0);
+ }
+
+ @Override
+ public ProvenanceEventRecord getEvent(final long id) {
+ final List<ProvenanceEventRecord> records = ringBuffer.getSelectedElements(new RingBuffer.Filter<ProvenanceEventRecord>() {
+ @Override
+ public boolean select(final ProvenanceEventRecord event) {
+ return event.getEventId() == id;
+ }
+ }, 1);
+
+ return records.isEmpty() ? null : records.get(0);
+ }
+
+ @Override
+ public ProvenanceEventRecord getEvent(final long id, final NiFiUser user) {
+ final ProvenanceEventRecord event = getEvent(id);
+ if (event == null) {
+ return null;
+ }
+
+ authorize(event, user);
+ return event;
+ }
+
+ public boolean isAuthorized(final ProvenanceEventRecord event, final NiFiUser user) {
+ return true;
+ }
+
+ protected void authorize(final ProvenanceEventRecord event, final NiFiUser user) {
+
+ }
+
+ @Override
+ public void close() throws IOException {
+
+ }
+
+ @Override
+ public List<SearchableField> getSearchableFields() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public List<SearchableField> getSearchableAttributes() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public QuerySubmission submitQuery(final Query query, final NiFiUser user) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public QuerySubmission retrieveQuerySubmission(final String queryIdentifier, final NiFiUser user) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public ComputeLineageSubmission submitLineageComputation(final long eventId, final NiFiUser user) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public AsyncLineageSubmission submitLineageComputation(final String flowFileUuid, final NiFiUser user) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public ComputeLineageSubmission retrieveLineageSubmission(String lineageIdentifier, final NiFiUser user) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public ComputeLineageSubmission submitExpandParents(final long eventId, final NiFiUser user) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public ComputeLineageSubmission submitExpandChildren(final long eventId, final NiFiUser user) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public long getContainerCapacity(final String containerName) throws IOException {
+ return maxSize;
+ }
+
+ @Override
+ public Set<String> getContainerNames() {
+ return Collections.singleton(CONTAINER_NAME);
+ }
+
+ @Override
+ public long getContainerUsableSpace(String containerName) throws IOException {
+ return maxSize - ringBuffer.getSize();
+ }
+
+ @Override
+ public String getContainerFileStoreName(String containerName) {
+ return null;
+ }
+
+ private static class IdEnrichedProvEvent implements ProvenanceEventRecord {
+
+ private final ProvenanceEventRecord record;
+ private final long id;
+
+ public IdEnrichedProvEvent(final ProvenanceEventRecord record, final long id) {
+ this.record = record;
+ this.id = id;
+ }
+
+ @Override
+ public long getEventId() {
+ return id;
+ }
+
+ @Override
+ public long getEventTime() {
+ return record.getEventTime();
+ }
+
+ @Override
+ public long getFlowFileEntryDate() {
+ return record.getFlowFileEntryDate();
+ }
+
+ @Override
+ public long getLineageStartDate() {
+ return record.getLineageStartDate();
+ }
+
+ @Override
+ public long getFileSize() {
+ return record.getFileSize();
+ }
+
+ @Override
+ public Long getPreviousFileSize() {
+ return record.getPreviousFileSize();
+ }
+
+ @Override
+ public long getEventDuration() {
+ return record.getEventDuration();
+ }
+
+ @Override
+ public ProvenanceEventType getEventType() {
+ return record.getEventType();
+ }
+
+ @Override
+ public Map<String, String> getAttributes() {
+ return record.getAttributes();
+ }
+
+ @Override
+ public Map<String, String> getPreviousAttributes() {
+ return record.getPreviousAttributes();
+ }
+
+ @Override
+ public Map<String, String> getUpdatedAttributes() {
+ return record.getUpdatedAttributes();
+ }
+
+ @Override
+ public String getComponentId() {
+ return record.getComponentId();
+ }
+
+ @Override
+ public String getComponentType() {
+ return record.getComponentType();
+ }
+
+ @Override
+ public String getTransitUri() {
+ return record.getTransitUri();
+ }
+
+ @Override
+ public String getSourceSystemFlowFileIdentifier() {
+ return record.getSourceSystemFlowFileIdentifier();
+ }
+
+ @Override
+ public String getFlowFileUuid() {
+ return record.getFlowFileUuid();
+ }
+
+ @Override
+ public List<String> getParentUuids() {
+ return record.getParentUuids();
+ }
+
+ @Override
+ public List<String> getChildUuids() {
+ return record.getChildUuids();
+ }
+
+ @Override
+ public String getAlternateIdentifierUri() {
+ return record.getAlternateIdentifierUri();
+ }
+
+ @Override
+ public String getDetails() {
+ return record.getDetails();
+ }
+
+ @Override
+ public String getRelationship() {
+ return record.getRelationship();
+ }
+
+ @Override
+ public String getSourceQueueIdentifier() {
+ return record.getSourceQueueIdentifier();
+ }
+
+ @Override
+ public String getContentClaimSection() {
+ return record.getContentClaimSection();
+ }
+
+ @Override
+ public String getPreviousContentClaimSection() {
+ return record.getPreviousContentClaimSection();
+ }
+
+ @Override
+ public String getContentClaimContainer() {
+ return record.getContentClaimContainer();
+ }
+
+ @Override
+ public String getPreviousContentClaimContainer() {
+ return record.getPreviousContentClaimContainer();
+ }
+
+ @Override
+ public String getContentClaimIdentifier() {
+ return record.getContentClaimIdentifier();
+ }
+
+ @Override
+ public String getPreviousContentClaimIdentifier() {
+ return record.getPreviousContentClaimIdentifier();
+ }
+
+ @Override
+ public Long getContentClaimOffset() {
+ return record.getContentClaimOffset();
+ }
+
+ @Override
+ public Long getPreviousContentClaimOffset() {
+ return record.getPreviousContentClaimOffset();
+ }
+
+ /**
+ * Returns the best event identifier for this event (eventId if available, descriptive identifier if not yet persisted to allow for traceability).
+ *
+ * @return a descriptive event ID to allow tracing
+ */
+ @Override
+ public String getBestEventIdentifier() {
+ return Long.toString(getEventId());
+ }
+ }
+
+}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-nar/pom.xml b/nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-nar/pom.xml
index 5d3b618..84dc34a 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-nar/pom.xml
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-nar/pom.xml
@@ -30,15 +30,6 @@
<artifactId>nifi-stateless-engine</artifactId>
<version>1.13.0-SNAPSHOT</version>
</dependency>
-
- <!-- Explicitly bring in the Volatile Provenance Repository with optional flag set to true so that it gets packaged. -->
- <dependency>
- <groupId>org.apache.nifi</groupId>
- <artifactId>nifi-volatile-provenance-repository</artifactId>
- <version>1.13.0-SNAPSHOT</version>
- <scope>compile</scope>
- <optional>true</optional>
- </dependency>
</dependencies>
</project>
\ No newline at end of file