You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2015/11/02 20:32:10 UTC

[13/50] [abbrv] nifi git commit: NIFI-10: Added FETCH and DOWNLOAD Provenance Events; updated FlowController to use DOWNLOAD event instead of SEND whenever a user downloads/views content via Provenance Event

NIFI-10: Added FETCH and DOWNLOAD Provenance Events; updated FlowController to use DOWNLOAD event instead of SEND whenever a user downloads/views content via Provenance Event


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/fc2aa276
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/fc2aa276
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/fc2aa276

Branch: refs/heads/NIFI-730
Commit: fc2aa2764cc9e85a19d3f3eec640873f43c60148
Parents: 51f5640
Author: Mark Payne <ma...@hotmail.com>
Authored: Sun Oct 25 11:53:46 2015 -0400
Committer: Mark Payne <ma...@hotmail.com>
Committed: Mon Oct 26 14:58:50 2015 -0400

----------------------------------------------------------------------
 .../nifi/provenance/ProvenanceEventType.java    |   5 +
 .../apache/nifi/controller/FlowController.java  | 130 +++++++++----------
 2 files changed, 69 insertions(+), 66 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/fc2aa276/nifi-api/src/main/java/org/apache/nifi/provenance/ProvenanceEventType.java
----------------------------------------------------------------------
diff --git a/nifi-api/src/main/java/org/apache/nifi/provenance/ProvenanceEventType.java b/nifi-api/src/main/java/org/apache/nifi/provenance/ProvenanceEventType.java
index 188e8fc..0d844b8 100644
--- a/nifi-api/src/main/java/org/apache/nifi/provenance/ProvenanceEventType.java
+++ b/nifi-api/src/main/java/org/apache/nifi/provenance/ProvenanceEventType.java
@@ -47,6 +47,11 @@ public enum ProvenanceEventType {
     SEND,
 
     /**
+     * Indicates that the contents of a FlowFile were downloaded by a user or external entity.
+     */
+    DOWNLOAD,
+
+    /**
      * Indicates a provenance event for the conclusion of an object's life for
      * some reason other than object expiration
      */

http://git-wip-us.apache.org/repos/asf/nifi/blob/fc2aa276/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index d9c3f39..3f815b0 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -216,7 +216,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
     public static final String SCHEDULE_MINIMUM_NANOSECONDS = "flowcontroller.minimum.nanoseconds";
     public static final String GRACEFUL_SHUTDOWN_PERIOD = "nifi.flowcontroller.graceful.shutdown.seconds";
     public static final long DEFAULT_GRACEFUL_SHUTDOWN_SECONDS = 10;
-    public static final int METRICS_RESERVOIR_SIZE = 288;   // 1 day worth of 5-minute captures
+    public static final int METRICS_RESERVOIR_SIZE = 288; // 1 day worth of 5-minute captures
 
     public static final String ROOT_GROUP_ID_ALIAS = "root";
     public static final String DEFAULT_ROOT_GROUP_NAME = "NiFi Flow";
@@ -245,7 +245,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
     private final UserService userService;
     private final EventDrivenWorkerQueue eventDrivenWorkerQueue;
     private final ComponentStatusRepository componentStatusRepository;
-    private final long systemStartTime = System.currentTimeMillis();    // time at which the node was started
+    private final long systemStartTime = System.currentTimeMillis(); // time at which the node was started
     private final ConcurrentMap<String, ReportingTaskNode> reportingTasks = new ConcurrentHashMap<>();
 
     // The Heartbeat Bean is used to provide an Atomic Reference to data that is used in heartbeats that may
@@ -336,38 +336,38 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
     private final Lock readLock = rwLock.readLock();
     private final Lock writeLock = rwLock.writeLock();
 
-    private FlowFileSwapManager flowFileSwapManager;    // guarded by read/write lock
+    private FlowFileSwapManager flowFileSwapManager; // guarded by read/write lock
 
     private static final Logger LOG = LoggerFactory.getLogger(FlowController.class);
     private static final Logger heartbeatLogger = LoggerFactory.getLogger("org.apache.nifi.cluster.heartbeat");
 
     public static FlowController createStandaloneInstance(
-            final FlowFileEventRepository flowFileEventRepo,
-            final NiFiProperties properties,
-            final UserService userService,
-            final StringEncryptor encryptor) {
+        final FlowFileEventRepository flowFileEventRepo,
+        final NiFiProperties properties,
+        final UserService userService,
+        final StringEncryptor encryptor) {
         return new FlowController(
-                flowFileEventRepo,
-                properties,
-                userService,
-                encryptor,
-                /* configuredForClustering */ false,
-                /* NodeProtocolSender */ null);
+            flowFileEventRepo,
+            properties,
+            userService,
+            encryptor,
+            /* configuredForClustering */ false,
+            /* NodeProtocolSender */ null);
     }
 
     public static FlowController createClusteredInstance(
-            final FlowFileEventRepository flowFileEventRepo,
-            final NiFiProperties properties,
-            final UserService userService,
-            final StringEncryptor encryptor,
-            final NodeProtocolSender protocolSender) {
+        final FlowFileEventRepository flowFileEventRepo,
+        final NiFiProperties properties,
+        final UserService userService,
+        final StringEncryptor encryptor,
+        final NodeProtocolSender protocolSender) {
         final FlowController flowController = new FlowController(
-                flowFileEventRepo,
-                properties,
-                userService,
-                encryptor,
-                /* configuredForClustering */ true,
-                /* NodeProtocolSender */ protocolSender);
+            flowFileEventRepo,
+            properties,
+            userService,
+            encryptor,
+            /* configuredForClustering */ true,
+            /* NodeProtocolSender */ protocolSender);
 
         flowController.setClusterManagerRemoteSiteInfo(properties.getRemoteInputPort(), properties.isSiteToSiteSecure());
 
@@ -375,12 +375,12 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
     }
 
     private FlowController(
-            final FlowFileEventRepository flowFileEventRepo,
-            final NiFiProperties properties,
-            final UserService userService,
-            final StringEncryptor encryptor,
-            final boolean configuredForClustering,
-            final NodeProtocolSender protocolSender) {
+        final FlowFileEventRepository flowFileEventRepo,
+        final NiFiProperties properties,
+        final UserService userService,
+        final StringEncryptor encryptor,
+        final boolean configuredForClustering,
+        final NodeProtocolSender protocolSender) {
 
         maxTimerDrivenThreads = new AtomicInteger(10);
         maxEventDrivenThreads = new AtomicInteger(5);
@@ -416,7 +416,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
 
         final ProcessContextFactory contextFactory = new ProcessContextFactory(contentRepository, flowFileRepository, flowFileEventRepository, counterRepositoryRef.get(), provenanceEventRepository);
         processScheduler.setSchedulingAgent(SchedulingStrategy.EVENT_DRIVEN, new EventDrivenSchedulingAgent(
-                eventDrivenEngineRef.get(), this, eventDrivenWorkerQueue, contextFactory, maxEventDrivenThreads.get(), encryptor));
+            eventDrivenEngineRef.get(), this, eventDrivenWorkerQueue, contextFactory, maxEventDrivenThreads.get(), encryptor));
 
         final QuartzSchedulingAgent quartzSchedulingAgent = new QuartzSchedulingAgent(this, timerDrivenEngineRef.get(), contextFactory, encryptor);
         final TimerDrivenSchedulingAgent timerDrivenAgent = new TimerDrivenSchedulingAgent(this, timerDrivenEngineRef.get(), contextFactory, encryptor);
@@ -468,7 +468,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
             externalSiteListener = null;
         } else if (isSiteToSiteSecure && sslContext == null) {
             LOG.error("Unable to create Secure Site-to-Site Listener because not all required Keystore/Truststore "
-                    + "Properties are set. Site-to-Site functionality will be disabled until this problem is has been fixed.");
+                + "Properties are set. Site-to-Site functionality will be disabled until this problem is has been fixed.");
             externalSiteListener = null;
         } else {
             // Register the SocketFlowFileServerProtocol as the appropriate resource for site-to-site Server Protocol
@@ -501,7 +501,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
         final String implementationClassName = properties.getProperty(NiFiProperties.FLOWFILE_REPOSITORY_IMPLEMENTATION, DEFAULT_FLOWFILE_REPO_IMPLEMENTATION);
         if (implementationClassName == null) {
             throw new RuntimeException("Cannot create FlowFile Repository because the NiFi Properties is missing the following property: "
-                    + NiFiProperties.FLOWFILE_REPOSITORY_IMPLEMENTATION);
+                + NiFiProperties.FLOWFILE_REPOSITORY_IMPLEMENTATION);
         }
 
         try {
@@ -612,7 +612,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
                             startConnectable(connectable);
                         }
                     } catch (final Throwable t) {
-                        LOG.error("Unable to start {} due to {}", new Object[]{connectable, t.toString()});
+                        LOG.error("Unable to start {} due to {}", new Object[] {connectable, t.toString()});
                         if (LOG.isDebugEnabled()) {
                             LOG.error("", t);
                         }
@@ -627,7 +627,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
                         remoteGroupPort.getRemoteProcessGroup().startTransmitting(remoteGroupPort);
                         startedTransmitting++;
                     } catch (final Throwable t) {
-                        LOG.error("Unable to start transmitting with {} due to {}", new Object[]{remoteGroupPort, t});
+                        LOG.error("Unable to start transmitting with {} due to {}", new Object[] {remoteGroupPort, t});
                     }
                 }
 
@@ -642,7 +642,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
                             startConnectable(connectable);
                         }
                     } catch (final Throwable t) {
-                        LOG.error("Unable to start {} due to {}", new Object[]{connectable, t});
+                        LOG.error("Unable to start {} due to {}", new Object[] {connectable, t});
                     }
                 }
 
@@ -658,7 +658,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
         final String implementationClassName = properties.getProperty(NiFiProperties.CONTENT_REPOSITORY_IMPLEMENTATION, DEFAULT_CONTENT_REPO_IMPLEMENTATION);
         if (implementationClassName == null) {
             throw new RuntimeException("Cannot create Provenance Repository because the NiFi Properties is missing the following property: "
-                    + NiFiProperties.CONTENT_REPOSITORY_IMPLEMENTATION);
+                + NiFiProperties.CONTENT_REPOSITORY_IMPLEMENTATION);
         }
 
         try {
@@ -676,7 +676,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
         final String implementationClassName = properties.getProperty(NiFiProperties.PROVENANCE_REPO_IMPLEMENTATION_CLASS, DEFAULT_PROVENANCE_REPO_IMPLEMENTATION);
         if (implementationClassName == null) {
             throw new RuntimeException("Cannot create Provenance Repository because the NiFi Properties is missing the following property: "
-                    + NiFiProperties.PROVENANCE_REPO_IMPLEMENTATION_CLASS);
+                + NiFiProperties.PROVENANCE_REPO_IMPLEMENTATION_CLASS);
         }
 
         try {
@@ -690,7 +690,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
         final String implementationClassName = properties.getProperty(NiFiProperties.COMPONENT_STATUS_REPOSITORY_IMPLEMENTATION, DEFAULT_COMPONENT_STATUS_REPO_IMPLEMENTATION);
         if (implementationClassName == null) {
             throw new RuntimeException("Cannot create Component Status Repository because the NiFi Properties is missing the following property: "
-                    + NiFiProperties.COMPONENT_STATUS_REPOSITORY_IMPLEMENTATION);
+                + NiFiProperties.COMPONENT_STATUS_REPOSITORY_IMPLEMENTATION);
         }
 
         try {
@@ -910,7 +910,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
         name = requireNonNull(name).intern();
         verifyPortIdDoesNotExist(id);
         return new StandardRootGroupPort(id, name, null, TransferDirection.RECEIVE, ConnectableType.INPUT_PORT,
-                userService, getBulletinRepository(), processScheduler, Boolean.TRUE.equals(isSiteToSiteSecure));
+            userService, getBulletinRepository(), processScheduler, Boolean.TRUE.equals(isSiteToSiteSecure));
     }
 
     /**
@@ -927,7 +927,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
         name = requireNonNull(name).intern();
         verifyPortIdDoesNotExist(id);
         return new StandardRootGroupPort(id, name, null, TransferDirection.SEND, ConnectableType.OUTPUT_PORT,
-                userService, getBulletinRepository(), processScheduler, Boolean.TRUE.equals(isSiteToSiteSecure));
+            userService, getBulletinRepository(), processScheduler, Boolean.TRUE.equals(isSiteToSiteSecure));
     }
 
     /**
@@ -1083,14 +1083,14 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
             try {
                 flowFileRepository.close();
             } catch (final Throwable t) {
-                LOG.warn("Unable to shut down FlowFileRepository due to {}", new Object[]{t});
+                LOG.warn("Unable to shut down FlowFileRepository due to {}", new Object[] {t});
             }
 
             if (this.timerDrivenEngineRef.get().isTerminated() && eventDrivenEngineRef.get().isTerminated()) {
                 LOG.info("Controller has been terminated successfully.");
             } else {
                 LOG.warn("Controller hasn't terminated properly.  There exists an uninterruptable thread that "
-                        + "will take an indeterminate amount of time to stop.  Might need to kill the program manually.");
+                    + "will take an indeterminate amount of time to stop.  Might need to kill the program manually.");
             }
 
             if (externalSiteListener != null) {
@@ -1153,7 +1153,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
      * @throws FlowSynchronizationException if updates to the controller failed. If this exception is thrown, then the controller should be considered unsafe to be used
      */
     public void synchronize(final FlowSynchronizer synchronizer, final DataFlow dataFlow)
-            throws FlowSerializationException, FlowSynchronizationException, UninheritableFlowException {
+        throws FlowSerializationException, FlowSynchronizationException, UninheritableFlowException {
         writeLock.lock();
         try {
             LOG.debug("Synchronizing controller with proposed flow");
@@ -1199,7 +1199,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
      *
      * @param maxThreadCount
      *
-     * This method must be called while holding the write lock!
+     *            This method must be called while holding the write lock!
      */
     private void setMaxThreadCount(final int maxThreadCount, final FlowEngine engine, final AtomicInteger maxThreads) {
         if (maxThreadCount < 1) {
@@ -1267,7 +1267,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
      * @throws ProcessorInstantiationException
      *
      * @throws IllegalStateException if no process group can be found with the ID of DTO or with the ID of the DTO's parentGroupId, if the template ID specified is invalid, or if the DTO's Parent
-     * Group ID changes but the parent group has incoming or outgoing connections
+     *             Group ID changes but the parent group has incoming or outgoing connections
      *
      * @throws NullPointerException if the DTO or its ID is null
      */
@@ -1371,7 +1371,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
      *
      * @throws NullPointerException if either argument is null
      * @throws IllegalStateException if the snippet is not valid because a component in the snippet has an ID that is not unique to this flow, or because it shares an Input Port or Output Port at the
-     * root level whose name already exists in the given ProcessGroup, or because the Template contains a Processor or a Prioritizer whose class is not valid within this instance of NiFi.
+     *             root level whose name already exists in the given ProcessGroup, or because the Template contains a Processor or a Prioritizer whose class is not valid within this instance of NiFi.
      * @throws ProcessorInstantiationException if unable to instantiate a processor
      */
     public void instantiateSnippet(final ProcessGroup group, final FlowSnippetDTO dto) throws ProcessorInstantiationException {
@@ -2542,7 +2542,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
         if (firstTimeAdded) {
             final ComponentLog componentLog = new SimpleProcessLogger(id, taskNode.getReportingTask());
             final ReportingInitializationContext config = new StandardReportingInitializationContext(id, taskNode.getName(),
-                    SchedulingStrategy.TIMER_DRIVEN, "1 min", componentLog, this);
+                SchedulingStrategy.TIMER_DRIVEN, "1 min", componentLog, this);
 
             try {
                 task.initialize(config);
@@ -2888,7 +2888,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
         readLock.lock();
         try {
             return heartbeatGeneratorFuture != null && !heartbeatGeneratorFuture.isCancelled()
-                    && heartbeatSenderFuture != null && !heartbeatSenderFuture.isCancelled();
+                && heartbeatSenderFuture != null && !heartbeatSenderFuture.isCancelled();
         } finally {
             readLock.unlock();
         }
@@ -2948,7 +2948,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
 
     /**
      * @return the DN of the Cluster Manager that we are currently connected to, if available. This will return null if the instance is not clustered or if the instance is clustered but the NCM's DN
-     * is not available - for instance, if cluster communications are not secure
+     *         is not available - for instance, if cluster communications are not secure
      */
     public String getClusterManagerDN() {
         readLock.lock();
@@ -3101,10 +3101,10 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
             @Override
             public boolean isContentSame() {
                 return areEqual(event.getPreviousContentClaimContainer(), event.getContentClaimContainer())
-                        && areEqual(event.getPreviousContentClaimSection(), event.getContentClaimSection())
-                        && areEqual(event.getPreviousContentClaimIdentifier(), event.getContentClaimIdentifier())
-                        && areEqual(event.getPreviousContentClaimOffset(), event.getContentClaimOffset())
-                        && areEqual(event.getPreviousFileSize(), event.getFileSize());
+                    && areEqual(event.getPreviousContentClaimSection(), event.getContentClaimSection())
+                    && areEqual(event.getPreviousContentClaimIdentifier(), event.getContentClaimIdentifier())
+                    && areEqual(event.getPreviousContentClaimOffset(), event.getContentClaimOffset())
+                    && areEqual(event.getPreviousFileSize(), event.getFileSize());
             }
 
             @Override
@@ -3180,7 +3180,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
 
         // Register a Provenance Event to indicate that we replayed the data.
         final ProvenanceEventRecord sendEvent = new StandardProvenanceEventRecord.Builder()
-            .setEventType(ProvenanceEventType.SEND)
+            .setEventType(ProvenanceEventType.DOWNLOAD)
             .setFlowFileUUID(provEvent.getFlowFileUuid())
             .setAttributes(provEvent.getAttributes(), Collections.<String, String> emptyMap())
             .setCurrentContentClaim(resourceClaim.getContainer(), resourceClaim.getSection(), resourceClaim.getId(), offset, size)
@@ -3297,7 +3297,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
 
         // Create the ContentClaim
         final ResourceClaim resourceClaim = contentClaimManager.newResourceClaim(event.getPreviousContentClaimContainer(),
-                event.getPreviousContentClaimSection(), event.getPreviousContentClaimIdentifier(), false);
+            event.getPreviousContentClaimSection(), event.getPreviousContentClaimIdentifier(), false);
 
         // Increment Claimant Count, since we will now be referencing the Content Claim
         contentClaimManager.incrementClaimantCount(resourceClaim);
@@ -3367,7 +3367,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
         // Update the FlowFile Repository to indicate that we have added the FlowFile to the flow
         final StandardRepositoryRecord record = new StandardRepositoryRecord(queue, flowFileRecord);
         record.setDestination(queue);
-        flowFileRepository.updateRepository(Collections.<RepositoryRecord>singleton(record));
+        flowFileRepository.updateRepository(Collections.<RepositoryRecord> singleton(record));
 
         // Enqueue the data
         queue.put(flowFileRecord);
@@ -3434,11 +3434,9 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
                 protocolSender.sendBulletins(message);
                 if (LOG.isDebugEnabled()) {
                     LOG.debug(
-                            String.format(
-                                    "Sending bulletins to cluster manager at %s",
-                                    dateFormatter.format(new Date())
-                            )
-                    );
+                        String.format(
+                            "Sending bulletins to cluster manager at %s",
+                            dateFormatter.format(new Date())));
                 }
 
             } catch (final UnknownServiceAddressException usae) {
@@ -3496,7 +3494,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
                         escapedBulletin = BulletinFactory.createBulletin(bulletin.getCategory(), bulletin.getLevel(), escapedBulletinMessage);
                     } else {
                         escapedBulletin = BulletinFactory.createBulletin(bulletin.getGroupId(), bulletin.getSourceId(), bulletin.getSourceType(),
-                                bulletin.getSourceName(), bulletin.getCategory(), bulletin.getLevel(), escapedBulletinMessage);
+                            bulletin.getSourceName(), bulletin.getCategory(), bulletin.getLevel(), escapedBulletinMessage);
                     }
                 } else {
                     escapedBulletin = bulletin;
@@ -3554,9 +3552,9 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
                 final long sendMillis = TimeUnit.NANOSECONDS.toMillis(sendNanos);
 
                 heartbeatLogger.info("Heartbeat created at {} and sent at {}; send took {} millis",
-                        dateFormatter.format(new Date(message.getHeartbeat().getCreatedTimestamp())),
-                        dateFormatter.format(new Date()),
-                        sendMillis);
+                    dateFormatter.format(new Date(message.getHeartbeat().getCreatedTimestamp())),
+                    dateFormatter.format(new Date()),
+                    sendMillis);
             } catch (final UnknownServiceAddressException usae) {
                 if (heartbeatLogger.isDebugEnabled()) {
                     heartbeatLogger.debug(usae.getMessage());