You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2020/12/03 19:43:05 UTC

[nifi] 01/02: NIFI-8060 Added minimal VolatileProvenanceRepository to nifi-stateless-engine and remove dependency on nifi-volatile-provenance-repo module

This is an automated email from the ASF dual-hosted git repository.

markap14 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git

commit 8ac8a2bd1fbf75b8458554988eb1b3b1851b25d9
Author: Bryan Bende <bb...@apache.org>
AuthorDate: Wed Dec 2 14:57:49 2020 -0500

    NIFI-8060 Added minimal VolatileProvenanceRepository to nifi-stateless-engine and remove dependency on nifi-volatile-provenance-repo module
---
 .../nifi-stateless-engine/pom.xml                  |   9 -
 .../flow/StandardStatelessDataflowFactory.java     |   4 +-
 .../repository/VolatileProvenanceRepository.java   | 409 +++++++++++++++++++++
 .../nifi-stateless-nar/pom.xml                     |   9 -
 4 files changed, 411 insertions(+), 20 deletions(-)

diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-engine/pom.xml b/nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-engine/pom.xml
index 000e9fb..9d29e05 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-engine/pom.xml
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-engine/pom.xml
@@ -91,15 +91,6 @@
             <version>${jackson.version}</version>
         </dependency>
 
-        <dependency>
-            <groupId>org.apache.nifi</groupId>
-            <artifactId>nifi-volatile-provenance-repository</artifactId>
-            <version>1.13.0-SNAPSHOT</version>
-            <scope>compile</scope>
-            <optional>true</optional>
-        </dependency>
-
-
         <!-- Test Dependencies -->
         <dependency>
             <groupId>junit</groupId>
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessDataflowFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessDataflowFactory.java
index 02a4b34..061d840 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessDataflowFactory.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessDataflowFactory.java
@@ -47,7 +47,6 @@ import org.apache.nifi.parameter.ParameterContextManager;
 import org.apache.nifi.parameter.StandardParameterContextManager;
 import org.apache.nifi.provenance.IdentifierLookup;
 import org.apache.nifi.provenance.ProvenanceRepository;
-import org.apache.nifi.provenance.VolatileProvenanceRepository;
 import org.apache.nifi.registry.VariableRegistry;
 import org.apache.nifi.registry.flow.FlowRegistryClient;
 import org.apache.nifi.registry.flow.InMemoryFlowRegistry;
@@ -75,6 +74,7 @@ import org.apache.nifi.stateless.repository.ByteArrayContentRepository;
 import org.apache.nifi.stateless.repository.RepositoryContextFactory;
 import org.apache.nifi.stateless.repository.StatelessFlowFileRepository;
 import org.apache.nifi.stateless.repository.StatelessRepositoryContextFactory;
+import org.apache.nifi.stateless.repository.VolatileProvenanceRepository;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -127,7 +127,7 @@ public class StandardStatelessDataflowFactory implements StatelessDataflowFactor
 
             final ParameterContextManager parameterContextManager = new StandardParameterContextManager();
             processScheduler = new StatelessProcessScheduler(extensionManager);
-            provenanceRepo = new VolatileProvenanceRepository(1_000, "", "");
+            provenanceRepo = new VolatileProvenanceRepository(1_000);
             provenanceRepo.initialize(EventReporter.NO_OP, new StatelessAuthorizer(), new StatelessProvenanceAuthorizableFactory(), IdentifierLookup.EMPTY);
 
             final SSLContext sslContext;
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/repository/VolatileProvenanceRepository.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/repository/VolatileProvenanceRepository.java
new file mode 100644
index 0000000..3f67385
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/repository/VolatileProvenanceRepository.java
@@ -0,0 +1,409 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.stateless.repository;
+
+import org.apache.nifi.authorization.Authorizer;
+import org.apache.nifi.authorization.user.NiFiUser;
+import org.apache.nifi.events.EventReporter;
+import org.apache.nifi.provenance.AsyncLineageSubmission;
+import org.apache.nifi.provenance.IdentifierLookup;
+import org.apache.nifi.provenance.ProvenanceAuthorizableFactory;
+import org.apache.nifi.provenance.ProvenanceEventBuilder;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.ProvenanceEventRepository;
+import org.apache.nifi.provenance.ProvenanceEventType;
+import org.apache.nifi.provenance.ProvenanceRepository;
+import org.apache.nifi.provenance.StandardProvenanceEventRecord;
+import org.apache.nifi.provenance.lineage.ComputeLineageSubmission;
+import org.apache.nifi.provenance.search.Query;
+import org.apache.nifi.provenance.search.QuerySubmission;
+import org.apache.nifi.provenance.search.SearchableField;
+import org.apache.nifi.util.RingBuffer;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class VolatileProvenanceRepository implements ProvenanceRepository {
+
+    // default property values
+    public static final int DEFAULT_BUFFER_SIZE = 10000;
+
+    public static String CONTAINER_NAME = "in-memory";
+
+    private final RingBuffer<ProvenanceEventRecord> ringBuffer;
+    private final int maxSize;
+
+    private final AtomicLong idGenerator = new AtomicLong(0L);
+    private final AtomicBoolean initialized = new AtomicBoolean(false);
+
+    /**
+     * Default no args constructor for service loading only
+     */
+    public VolatileProvenanceRepository() {
+        ringBuffer = null;
+        maxSize = DEFAULT_BUFFER_SIZE;
+    }
+
+    public VolatileProvenanceRepository(final int maxEvents) {
+        maxSize = maxEvents;
+        ringBuffer = new RingBuffer<>(maxSize);
+    }
+
+    @Override
+    public void initialize(final EventReporter eventReporter, final Authorizer authorizer, final ProvenanceAuthorizableFactory resourceFactory,
+                           final IdentifierLookup idLookup) throws IOException {
+        if (initialized.getAndSet(true)) {
+            return;
+        }
+    }
+
+    @Override
+    public ProvenanceEventRepository getProvenanceEventRepository() {
+        return this;
+    }
+
+    @Override
+    public ProvenanceEventBuilder eventBuilder() {
+        return new StandardProvenanceEventRecord.Builder();
+    }
+
+    @Override
+    public void registerEvent(final ProvenanceEventRecord event) {
+        final long id = idGenerator.getAndIncrement();
+        ringBuffer.add(new IdEnrichedProvEvent(event, id));
+    }
+
+    @Override
+    public void registerEvents(final Iterable<ProvenanceEventRecord> events) {
+        for (final ProvenanceEventRecord event : events) {
+            registerEvent(event);
+        }
+    }
+
+    @Override
+    public List<ProvenanceEventRecord> getEvents(final long firstRecordId, final int maxRecords) throws IOException {
+        return getEvents(firstRecordId, maxRecords, null);
+    }
+
+    @Override
+    public List<ProvenanceEventRecord> getEvents(final long firstRecordId, final int maxRecords, final NiFiUser user) throws IOException {
+        return ringBuffer.getSelectedElements(new RingBuffer.Filter<ProvenanceEventRecord>() {
+            @Override
+            public boolean select(final ProvenanceEventRecord value) {
+                if (!isAuthorized(value, user)) {
+                    return false;
+                }
+
+                return value.getEventId() >= firstRecordId;
+            }
+        }, maxRecords);
+    }
+
+    @Override
+    public Long getMaxEventId() {
+        final ProvenanceEventRecord newest = ringBuffer.getNewestElement();
+        return (newest == null) ? null : newest.getEventId();
+    }
+
+    public ProvenanceEventRecord getEvent(final String identifier) throws IOException {
+        final List<ProvenanceEventRecord> records = ringBuffer.getSelectedElements(new RingBuffer.Filter<ProvenanceEventRecord>() {
+            @Override
+            public boolean select(final ProvenanceEventRecord event) {
+                return identifier.equals(event.getFlowFileUuid());
+            }
+        }, 1);
+        return records.isEmpty() ? null : records.get(0);
+    }
+
+    @Override
+    public ProvenanceEventRecord getEvent(final long id) {
+        final List<ProvenanceEventRecord> records = ringBuffer.getSelectedElements(new RingBuffer.Filter<ProvenanceEventRecord>() {
+            @Override
+            public boolean select(final ProvenanceEventRecord event) {
+                return event.getEventId() == id;
+            }
+        }, 1);
+
+        return records.isEmpty() ? null : records.get(0);
+    }
+
+    @Override
+    public ProvenanceEventRecord getEvent(final long id, final NiFiUser user) {
+        final ProvenanceEventRecord event = getEvent(id);
+        if (event == null) {
+            return null;
+        }
+
+        authorize(event, user);
+        return event;
+    }
+
+    public boolean isAuthorized(final ProvenanceEventRecord event, final NiFiUser user) {
+        return true;
+    }
+
+    protected void authorize(final ProvenanceEventRecord event, final NiFiUser user) {
+
+    }
+
+    @Override
+    public void close() throws IOException {
+
+    }
+
+    @Override
+    public List<SearchableField> getSearchableFields() {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public List<SearchableField> getSearchableAttributes() {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public QuerySubmission submitQuery(final Query query, final NiFiUser user) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public QuerySubmission retrieveQuerySubmission(final String queryIdentifier, final NiFiUser user) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public ComputeLineageSubmission submitLineageComputation(final long eventId, final NiFiUser user) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public AsyncLineageSubmission submitLineageComputation(final String flowFileUuid, final NiFiUser user) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public ComputeLineageSubmission retrieveLineageSubmission(String lineageIdentifier, final NiFiUser user) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public ComputeLineageSubmission submitExpandParents(final long eventId, final NiFiUser user) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public ComputeLineageSubmission submitExpandChildren(final long eventId, final NiFiUser user) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public long getContainerCapacity(final String containerName) throws IOException {
+        return maxSize;
+    }
+
+    @Override
+    public Set<String> getContainerNames() {
+        return Collections.singleton(CONTAINER_NAME);
+    }
+
+    @Override
+    public long getContainerUsableSpace(String containerName) throws IOException {
+        return maxSize - ringBuffer.getSize();
+    }
+
+    @Override
+    public String getContainerFileStoreName(String containerName) {
+        return null;
+    }
+
+    private static class IdEnrichedProvEvent implements ProvenanceEventRecord {
+
+        private final ProvenanceEventRecord record;
+        private final long id;
+
+        public IdEnrichedProvEvent(final ProvenanceEventRecord record, final long id) {
+            this.record = record;
+            this.id = id;
+        }
+
+        @Override
+        public long getEventId() {
+            return id;
+        }
+
+        @Override
+        public long getEventTime() {
+            return record.getEventTime();
+        }
+
+        @Override
+        public long getFlowFileEntryDate() {
+            return record.getFlowFileEntryDate();
+        }
+
+        @Override
+        public long getLineageStartDate() {
+            return record.getLineageStartDate();
+        }
+
+        @Override
+        public long getFileSize() {
+            return record.getFileSize();
+        }
+
+        @Override
+        public Long getPreviousFileSize() {
+            return record.getPreviousFileSize();
+        }
+
+        @Override
+        public long getEventDuration() {
+            return record.getEventDuration();
+        }
+
+        @Override
+        public ProvenanceEventType getEventType() {
+            return record.getEventType();
+        }
+
+        @Override
+        public Map<String, String> getAttributes() {
+            return record.getAttributes();
+        }
+
+        @Override
+        public Map<String, String> getPreviousAttributes() {
+            return record.getPreviousAttributes();
+        }
+
+        @Override
+        public Map<String, String> getUpdatedAttributes() {
+            return record.getUpdatedAttributes();
+        }
+
+        @Override
+        public String getComponentId() {
+            return record.getComponentId();
+        }
+
+        @Override
+        public String getComponentType() {
+            return record.getComponentType();
+        }
+
+        @Override
+        public String getTransitUri() {
+            return record.getTransitUri();
+        }
+
+        @Override
+        public String getSourceSystemFlowFileIdentifier() {
+            return record.getSourceSystemFlowFileIdentifier();
+        }
+
+        @Override
+        public String getFlowFileUuid() {
+            return record.getFlowFileUuid();
+        }
+
+        @Override
+        public List<String> getParentUuids() {
+            return record.getParentUuids();
+        }
+
+        @Override
+        public List<String> getChildUuids() {
+            return record.getChildUuids();
+        }
+
+        @Override
+        public String getAlternateIdentifierUri() {
+            return record.getAlternateIdentifierUri();
+        }
+
+        @Override
+        public String getDetails() {
+            return record.getDetails();
+        }
+
+        @Override
+        public String getRelationship() {
+            return record.getRelationship();
+        }
+
+        @Override
+        public String getSourceQueueIdentifier() {
+            return record.getSourceQueueIdentifier();
+        }
+
+        @Override
+        public String getContentClaimSection() {
+            return record.getContentClaimSection();
+        }
+
+        @Override
+        public String getPreviousContentClaimSection() {
+            return record.getPreviousContentClaimSection();
+        }
+
+        @Override
+        public String getContentClaimContainer() {
+            return record.getContentClaimContainer();
+        }
+
+        @Override
+        public String getPreviousContentClaimContainer() {
+            return record.getPreviousContentClaimContainer();
+        }
+
+        @Override
+        public String getContentClaimIdentifier() {
+            return record.getContentClaimIdentifier();
+        }
+
+        @Override
+        public String getPreviousContentClaimIdentifier() {
+            return record.getPreviousContentClaimIdentifier();
+        }
+
+        @Override
+        public Long getContentClaimOffset() {
+            return record.getContentClaimOffset();
+        }
+
+        @Override
+        public Long getPreviousContentClaimOffset() {
+            return record.getPreviousContentClaimOffset();
+        }
+
+        /**
+         * Returns the best event identifier for this event (eventId if available, descriptive identifier if not yet persisted to allow for traceability).
+         *
+         * @return a descriptive event ID to allow tracing
+         */
+        @Override
+        public String getBestEventIdentifier() {
+            return Long.toString(getEventId());
+        }
+    }
+
+}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-nar/pom.xml b/nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-nar/pom.xml
index 5d3b618..84dc34a 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-nar/pom.xml
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-nar/pom.xml
@@ -30,15 +30,6 @@
             <artifactId>nifi-stateless-engine</artifactId>
             <version>1.13.0-SNAPSHOT</version>
         </dependency>
-
-        <!-- Explicitly bring in the Volatile Provenance Repository with optional flag set to true so that it gets packaged. -->
-        <dependency>
-            <groupId>org.apache.nifi</groupId>
-            <artifactId>nifi-volatile-provenance-repository</artifactId>
-            <version>1.13.0-SNAPSHOT</version>
-            <scope>compile</scope>
-            <optional>true</optional>
-        </dependency>
     </dependencies>
 
 </project>
\ No newline at end of file