You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2015/10/26 22:10:20 UTC

[1/3] nifi git commit: NIFI-10: Added FETCH and DOWNLOAD Provenance Events; updated FlowController to use DOWNLOAD event instead of SEND whenever a user downloads/views content via Provenance Event

Repository: nifi
Updated Branches:
  refs/heads/master aec32a277 -> 17006335e


NIFI-10: Added FETCH and DOWNLOAD Provenance Events; updated FlowController to use DOWNLOAD event instead of SEND whenever a user downloads/views content via Provenance Event


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/fc2aa276
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/fc2aa276
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/fc2aa276

Branch: refs/heads/master
Commit: fc2aa2764cc9e85a19d3f3eec640873f43c60148
Parents: 51f5640
Author: Mark Payne <ma...@hotmail.com>
Authored: Sun Oct 25 11:53:46 2015 -0400
Committer: Mark Payne <ma...@hotmail.com>
Committed: Mon Oct 26 14:58:50 2015 -0400

----------------------------------------------------------------------
 .../nifi/provenance/ProvenanceEventType.java    |   5 +
 .../apache/nifi/controller/FlowController.java  | 130 +++++++++----------
 2 files changed, 69 insertions(+), 66 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/fc2aa276/nifi-api/src/main/java/org/apache/nifi/provenance/ProvenanceEventType.java
----------------------------------------------------------------------
diff --git a/nifi-api/src/main/java/org/apache/nifi/provenance/ProvenanceEventType.java b/nifi-api/src/main/java/org/apache/nifi/provenance/ProvenanceEventType.java
index 188e8fc..0d844b8 100644
--- a/nifi-api/src/main/java/org/apache/nifi/provenance/ProvenanceEventType.java
+++ b/nifi-api/src/main/java/org/apache/nifi/provenance/ProvenanceEventType.java
@@ -47,6 +47,11 @@ public enum ProvenanceEventType {
     SEND,
 
     /**
+     * Indicates that the contents of a FlowFile were downloaded by a user or external entity.
+     */
+    DOWNLOAD,
+
+    /**
      * Indicates a provenance event for the conclusion of an object's life for
      * some reason other than object expiration
      */

http://git-wip-us.apache.org/repos/asf/nifi/blob/fc2aa276/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index d9c3f39..3f815b0 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -216,7 +216,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
     public static final String SCHEDULE_MINIMUM_NANOSECONDS = "flowcontroller.minimum.nanoseconds";
     public static final String GRACEFUL_SHUTDOWN_PERIOD = "nifi.flowcontroller.graceful.shutdown.seconds";
     public static final long DEFAULT_GRACEFUL_SHUTDOWN_SECONDS = 10;
-    public static final int METRICS_RESERVOIR_SIZE = 288;   // 1 day worth of 5-minute captures
+    public static final int METRICS_RESERVOIR_SIZE = 288; // 1 day worth of 5-minute captures
 
     public static final String ROOT_GROUP_ID_ALIAS = "root";
     public static final String DEFAULT_ROOT_GROUP_NAME = "NiFi Flow";
@@ -245,7 +245,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
     private final UserService userService;
     private final EventDrivenWorkerQueue eventDrivenWorkerQueue;
     private final ComponentStatusRepository componentStatusRepository;
-    private final long systemStartTime = System.currentTimeMillis();    // time at which the node was started
+    private final long systemStartTime = System.currentTimeMillis(); // time at which the node was started
     private final ConcurrentMap<String, ReportingTaskNode> reportingTasks = new ConcurrentHashMap<>();
 
     // The Heartbeat Bean is used to provide an Atomic Reference to data that is used in heartbeats that may
@@ -336,38 +336,38 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
     private final Lock readLock = rwLock.readLock();
     private final Lock writeLock = rwLock.writeLock();
 
-    private FlowFileSwapManager flowFileSwapManager;    // guarded by read/write lock
+    private FlowFileSwapManager flowFileSwapManager; // guarded by read/write lock
 
     private static final Logger LOG = LoggerFactory.getLogger(FlowController.class);
     private static final Logger heartbeatLogger = LoggerFactory.getLogger("org.apache.nifi.cluster.heartbeat");
 
     public static FlowController createStandaloneInstance(
-            final FlowFileEventRepository flowFileEventRepo,
-            final NiFiProperties properties,
-            final UserService userService,
-            final StringEncryptor encryptor) {
+        final FlowFileEventRepository flowFileEventRepo,
+        final NiFiProperties properties,
+        final UserService userService,
+        final StringEncryptor encryptor) {
         return new FlowController(
-                flowFileEventRepo,
-                properties,
-                userService,
-                encryptor,
-                /* configuredForClustering */ false,
-                /* NodeProtocolSender */ null);
+            flowFileEventRepo,
+            properties,
+            userService,
+            encryptor,
+            /* configuredForClustering */ false,
+            /* NodeProtocolSender */ null);
     }
 
     public static FlowController createClusteredInstance(
-            final FlowFileEventRepository flowFileEventRepo,
-            final NiFiProperties properties,
-            final UserService userService,
-            final StringEncryptor encryptor,
-            final NodeProtocolSender protocolSender) {
+        final FlowFileEventRepository flowFileEventRepo,
+        final NiFiProperties properties,
+        final UserService userService,
+        final StringEncryptor encryptor,
+        final NodeProtocolSender protocolSender) {
         final FlowController flowController = new FlowController(
-                flowFileEventRepo,
-                properties,
-                userService,
-                encryptor,
-                /* configuredForClustering */ true,
-                /* NodeProtocolSender */ protocolSender);
+            flowFileEventRepo,
+            properties,
+            userService,
+            encryptor,
+            /* configuredForClustering */ true,
+            /* NodeProtocolSender */ protocolSender);
 
         flowController.setClusterManagerRemoteSiteInfo(properties.getRemoteInputPort(), properties.isSiteToSiteSecure());
 
@@ -375,12 +375,12 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
     }
 
     private FlowController(
-            final FlowFileEventRepository flowFileEventRepo,
-            final NiFiProperties properties,
-            final UserService userService,
-            final StringEncryptor encryptor,
-            final boolean configuredForClustering,
-            final NodeProtocolSender protocolSender) {
+        final FlowFileEventRepository flowFileEventRepo,
+        final NiFiProperties properties,
+        final UserService userService,
+        final StringEncryptor encryptor,
+        final boolean configuredForClustering,
+        final NodeProtocolSender protocolSender) {
 
         maxTimerDrivenThreads = new AtomicInteger(10);
         maxEventDrivenThreads = new AtomicInteger(5);
@@ -416,7 +416,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
 
         final ProcessContextFactory contextFactory = new ProcessContextFactory(contentRepository, flowFileRepository, flowFileEventRepository, counterRepositoryRef.get(), provenanceEventRepository);
         processScheduler.setSchedulingAgent(SchedulingStrategy.EVENT_DRIVEN, new EventDrivenSchedulingAgent(
-                eventDrivenEngineRef.get(), this, eventDrivenWorkerQueue, contextFactory, maxEventDrivenThreads.get(), encryptor));
+            eventDrivenEngineRef.get(), this, eventDrivenWorkerQueue, contextFactory, maxEventDrivenThreads.get(), encryptor));
 
         final QuartzSchedulingAgent quartzSchedulingAgent = new QuartzSchedulingAgent(this, timerDrivenEngineRef.get(), contextFactory, encryptor);
         final TimerDrivenSchedulingAgent timerDrivenAgent = new TimerDrivenSchedulingAgent(this, timerDrivenEngineRef.get(), contextFactory, encryptor);
@@ -468,7 +468,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
             externalSiteListener = null;
         } else if (isSiteToSiteSecure && sslContext == null) {
             LOG.error("Unable to create Secure Site-to-Site Listener because not all required Keystore/Truststore "
-                    + "Properties are set. Site-to-Site functionality will be disabled until this problem is has been fixed.");
+                + "Properties are set. Site-to-Site functionality will be disabled until this problem is has been fixed.");
             externalSiteListener = null;
         } else {
             // Register the SocketFlowFileServerProtocol as the appropriate resource for site-to-site Server Protocol
@@ -501,7 +501,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
         final String implementationClassName = properties.getProperty(NiFiProperties.FLOWFILE_REPOSITORY_IMPLEMENTATION, DEFAULT_FLOWFILE_REPO_IMPLEMENTATION);
         if (implementationClassName == null) {
             throw new RuntimeException("Cannot create FlowFile Repository because the NiFi Properties is missing the following property: "
-                    + NiFiProperties.FLOWFILE_REPOSITORY_IMPLEMENTATION);
+                + NiFiProperties.FLOWFILE_REPOSITORY_IMPLEMENTATION);
         }
 
         try {
@@ -612,7 +612,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
                             startConnectable(connectable);
                         }
                     } catch (final Throwable t) {
-                        LOG.error("Unable to start {} due to {}", new Object[]{connectable, t.toString()});
+                        LOG.error("Unable to start {} due to {}", new Object[] {connectable, t.toString()});
                         if (LOG.isDebugEnabled()) {
                             LOG.error("", t);
                         }
@@ -627,7 +627,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
                         remoteGroupPort.getRemoteProcessGroup().startTransmitting(remoteGroupPort);
                         startedTransmitting++;
                     } catch (final Throwable t) {
-                        LOG.error("Unable to start transmitting with {} due to {}", new Object[]{remoteGroupPort, t});
+                        LOG.error("Unable to start transmitting with {} due to {}", new Object[] {remoteGroupPort, t});
                     }
                 }
 
@@ -642,7 +642,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
                             startConnectable(connectable);
                         }
                     } catch (final Throwable t) {
-                        LOG.error("Unable to start {} due to {}", new Object[]{connectable, t});
+                        LOG.error("Unable to start {} due to {}", new Object[] {connectable, t});
                     }
                 }
 
@@ -658,7 +658,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
         final String implementationClassName = properties.getProperty(NiFiProperties.CONTENT_REPOSITORY_IMPLEMENTATION, DEFAULT_CONTENT_REPO_IMPLEMENTATION);
         if (implementationClassName == null) {
             throw new RuntimeException("Cannot create Provenance Repository because the NiFi Properties is missing the following property: "
-                    + NiFiProperties.CONTENT_REPOSITORY_IMPLEMENTATION);
+                + NiFiProperties.CONTENT_REPOSITORY_IMPLEMENTATION);
         }
 
         try {
@@ -676,7 +676,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
         final String implementationClassName = properties.getProperty(NiFiProperties.PROVENANCE_REPO_IMPLEMENTATION_CLASS, DEFAULT_PROVENANCE_REPO_IMPLEMENTATION);
         if (implementationClassName == null) {
             throw new RuntimeException("Cannot create Provenance Repository because the NiFi Properties is missing the following property: "
-                    + NiFiProperties.PROVENANCE_REPO_IMPLEMENTATION_CLASS);
+                + NiFiProperties.PROVENANCE_REPO_IMPLEMENTATION_CLASS);
         }
 
         try {
@@ -690,7 +690,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
         final String implementationClassName = properties.getProperty(NiFiProperties.COMPONENT_STATUS_REPOSITORY_IMPLEMENTATION, DEFAULT_COMPONENT_STATUS_REPO_IMPLEMENTATION);
         if (implementationClassName == null) {
             throw new RuntimeException("Cannot create Component Status Repository because the NiFi Properties is missing the following property: "
-                    + NiFiProperties.COMPONENT_STATUS_REPOSITORY_IMPLEMENTATION);
+                + NiFiProperties.COMPONENT_STATUS_REPOSITORY_IMPLEMENTATION);
         }
 
         try {
@@ -910,7 +910,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
         name = requireNonNull(name).intern();
         verifyPortIdDoesNotExist(id);
         return new StandardRootGroupPort(id, name, null, TransferDirection.RECEIVE, ConnectableType.INPUT_PORT,
-                userService, getBulletinRepository(), processScheduler, Boolean.TRUE.equals(isSiteToSiteSecure));
+            userService, getBulletinRepository(), processScheduler, Boolean.TRUE.equals(isSiteToSiteSecure));
     }
 
     /**
@@ -927,7 +927,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
         name = requireNonNull(name).intern();
         verifyPortIdDoesNotExist(id);
         return new StandardRootGroupPort(id, name, null, TransferDirection.SEND, ConnectableType.OUTPUT_PORT,
-                userService, getBulletinRepository(), processScheduler, Boolean.TRUE.equals(isSiteToSiteSecure));
+            userService, getBulletinRepository(), processScheduler, Boolean.TRUE.equals(isSiteToSiteSecure));
     }
 
     /**
@@ -1083,14 +1083,14 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
             try {
                 flowFileRepository.close();
             } catch (final Throwable t) {
-                LOG.warn("Unable to shut down FlowFileRepository due to {}", new Object[]{t});
+                LOG.warn("Unable to shut down FlowFileRepository due to {}", new Object[] {t});
             }
 
             if (this.timerDrivenEngineRef.get().isTerminated() && eventDrivenEngineRef.get().isTerminated()) {
                 LOG.info("Controller has been terminated successfully.");
             } else {
                 LOG.warn("Controller hasn't terminated properly.  There exists an uninterruptable thread that "
-                        + "will take an indeterminate amount of time to stop.  Might need to kill the program manually.");
+                    + "will take an indeterminate amount of time to stop.  Might need to kill the program manually.");
             }
 
             if (externalSiteListener != null) {
@@ -1153,7 +1153,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
      * @throws FlowSynchronizationException if updates to the controller failed. If this exception is thrown, then the controller should be considered unsafe to be used
      */
     public void synchronize(final FlowSynchronizer synchronizer, final DataFlow dataFlow)
-            throws FlowSerializationException, FlowSynchronizationException, UninheritableFlowException {
+        throws FlowSerializationException, FlowSynchronizationException, UninheritableFlowException {
         writeLock.lock();
         try {
             LOG.debug("Synchronizing controller with proposed flow");
@@ -1199,7 +1199,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
      *
      * @param maxThreadCount
      *
-     * This method must be called while holding the write lock!
+     *            This method must be called while holding the write lock!
      */
     private void setMaxThreadCount(final int maxThreadCount, final FlowEngine engine, final AtomicInteger maxThreads) {
         if (maxThreadCount < 1) {
@@ -1267,7 +1267,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
      * @throws ProcessorInstantiationException
      *
      * @throws IllegalStateException if no process group can be found with the ID of DTO or with the ID of the DTO's parentGroupId, if the template ID specified is invalid, or if the DTO's Parent
-     * Group ID changes but the parent group has incoming or outgoing connections
+     *             Group ID changes but the parent group has incoming or outgoing connections
      *
      * @throws NullPointerException if the DTO or its ID is null
      */
@@ -1371,7 +1371,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
      *
      * @throws NullPointerException if either argument is null
      * @throws IllegalStateException if the snippet is not valid because a component in the snippet has an ID that is not unique to this flow, or because it shares an Input Port or Output Port at the
-     * root level whose name already exists in the given ProcessGroup, or because the Template contains a Processor or a Prioritizer whose class is not valid within this instance of NiFi.
+     *             root level whose name already exists in the given ProcessGroup, or because the Template contains a Processor or a Prioritizer whose class is not valid within this instance of NiFi.
      * @throws ProcessorInstantiationException if unable to instantiate a processor
      */
     public void instantiateSnippet(final ProcessGroup group, final FlowSnippetDTO dto) throws ProcessorInstantiationException {
@@ -2542,7 +2542,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
         if (firstTimeAdded) {
             final ComponentLog componentLog = new SimpleProcessLogger(id, taskNode.getReportingTask());
             final ReportingInitializationContext config = new StandardReportingInitializationContext(id, taskNode.getName(),
-                    SchedulingStrategy.TIMER_DRIVEN, "1 min", componentLog, this);
+                SchedulingStrategy.TIMER_DRIVEN, "1 min", componentLog, this);
 
             try {
                 task.initialize(config);
@@ -2888,7 +2888,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
         readLock.lock();
         try {
             return heartbeatGeneratorFuture != null && !heartbeatGeneratorFuture.isCancelled()
-                    && heartbeatSenderFuture != null && !heartbeatSenderFuture.isCancelled();
+                && heartbeatSenderFuture != null && !heartbeatSenderFuture.isCancelled();
         } finally {
             readLock.unlock();
         }
@@ -2948,7 +2948,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
 
     /**
      * @return the DN of the Cluster Manager that we are currently connected to, if available. This will return null if the instance is not clustered or if the instance is clustered but the NCM's DN
-     * is not available - for instance, if cluster communications are not secure
+     *         is not available - for instance, if cluster communications are not secure
      */
     public String getClusterManagerDN() {
         readLock.lock();
@@ -3101,10 +3101,10 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
             @Override
             public boolean isContentSame() {
                 return areEqual(event.getPreviousContentClaimContainer(), event.getContentClaimContainer())
-                        && areEqual(event.getPreviousContentClaimSection(), event.getContentClaimSection())
-                        && areEqual(event.getPreviousContentClaimIdentifier(), event.getContentClaimIdentifier())
-                        && areEqual(event.getPreviousContentClaimOffset(), event.getContentClaimOffset())
-                        && areEqual(event.getPreviousFileSize(), event.getFileSize());
+                    && areEqual(event.getPreviousContentClaimSection(), event.getContentClaimSection())
+                    && areEqual(event.getPreviousContentClaimIdentifier(), event.getContentClaimIdentifier())
+                    && areEqual(event.getPreviousContentClaimOffset(), event.getContentClaimOffset())
+                    && areEqual(event.getPreviousFileSize(), event.getFileSize());
             }
 
             @Override
@@ -3180,7 +3180,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
 
         // Register a Provenance Event to indicate that we replayed the data.
         final ProvenanceEventRecord sendEvent = new StandardProvenanceEventRecord.Builder()
-            .setEventType(ProvenanceEventType.SEND)
+            .setEventType(ProvenanceEventType.DOWNLOAD)
             .setFlowFileUUID(provEvent.getFlowFileUuid())
             .setAttributes(provEvent.getAttributes(), Collections.<String, String> emptyMap())
             .setCurrentContentClaim(resourceClaim.getContainer(), resourceClaim.getSection(), resourceClaim.getId(), offset, size)
@@ -3297,7 +3297,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
 
         // Create the ContentClaim
         final ResourceClaim resourceClaim = contentClaimManager.newResourceClaim(event.getPreviousContentClaimContainer(),
-                event.getPreviousContentClaimSection(), event.getPreviousContentClaimIdentifier(), false);
+            event.getPreviousContentClaimSection(), event.getPreviousContentClaimIdentifier(), false);
 
         // Increment Claimant Count, since we will now be referencing the Content Claim
         contentClaimManager.incrementClaimantCount(resourceClaim);
@@ -3367,7 +3367,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
         // Update the FlowFile Repository to indicate that we have added the FlowFile to the flow
         final StandardRepositoryRecord record = new StandardRepositoryRecord(queue, flowFileRecord);
         record.setDestination(queue);
-        flowFileRepository.updateRepository(Collections.<RepositoryRecord>singleton(record));
+        flowFileRepository.updateRepository(Collections.<RepositoryRecord> singleton(record));
 
         // Enqueue the data
         queue.put(flowFileRecord);
@@ -3434,11 +3434,9 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
                 protocolSender.sendBulletins(message);
                 if (LOG.isDebugEnabled()) {
                     LOG.debug(
-                            String.format(
-                                    "Sending bulletins to cluster manager at %s",
-                                    dateFormatter.format(new Date())
-                            )
-                    );
+                        String.format(
+                            "Sending bulletins to cluster manager at %s",
+                            dateFormatter.format(new Date())));
                 }
 
             } catch (final UnknownServiceAddressException usae) {
@@ -3496,7 +3494,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
                         escapedBulletin = BulletinFactory.createBulletin(bulletin.getCategory(), bulletin.getLevel(), escapedBulletinMessage);
                     } else {
                         escapedBulletin = BulletinFactory.createBulletin(bulletin.getGroupId(), bulletin.getSourceId(), bulletin.getSourceType(),
-                                bulletin.getSourceName(), bulletin.getCategory(), bulletin.getLevel(), escapedBulletinMessage);
+                            bulletin.getSourceName(), bulletin.getCategory(), bulletin.getLevel(), escapedBulletinMessage);
                     }
                 } else {
                     escapedBulletin = bulletin;
@@ -3554,9 +3552,9 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
                 final long sendMillis = TimeUnit.NANOSECONDS.toMillis(sendNanos);
 
                 heartbeatLogger.info("Heartbeat created at {} and sent at {}; send took {} millis",
-                        dateFormatter.format(new Date(message.getHeartbeat().getCreatedTimestamp())),
-                        dateFormatter.format(new Date()),
-                        sendMillis);
+                    dateFormatter.format(new Date(message.getHeartbeat().getCreatedTimestamp())),
+                    dateFormatter.format(new Date()),
+                    sendMillis);
             } catch (final UnknownServiceAddressException usae) {
                 if (heartbeatLogger.isDebugEnabled()) {
                     heartbeatLogger.debug(usae.getMessage());


[2/3] nifi git commit: NIFI-10: Added FETCH Provenance Event and updated processors to use this new event type

Posted by ma...@apache.org.
NIFI-10: Added FETCH Provenance Event and updated processors to use this new event type


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/51f56402
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/51f56402
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/51f56402

Branch: refs/heads/master
Commit: 51f564024a2fbe7fbd08760635561f08619be0e4
Parents: aec32a2
Author: Mark Payne <ma...@hotmail.com>
Authored: Thu Oct 15 17:00:20 2015 -0400
Committer: Mark Payne <ma...@hotmail.com>
Committed: Mon Oct 26 14:58:50 2015 -0400

----------------------------------------------------------------------
 .../nifi/provenance/ProvenanceEventType.java    |  26 ++-
 .../nifi/provenance/ProvenanceReporter.java     |  37 ++++
 .../nifi/util/MockProvenanceReporter.java       |  35 ++-
 .../nifi/processors/aws/s3/FetchS3Object.java   |   2 +-
 .../repository/StandardProvenanceReporter.java  |  31 ++-
 .../nifi/processors/standard/InvokeHTTP.java    | 211 +++++++++----------
 .../processors/standard/TestInvokeHTTP.java     |  54 ++---
 7 files changed, 258 insertions(+), 138 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/51f56402/nifi-api/src/main/java/org/apache/nifi/provenance/ProvenanceEventType.java
----------------------------------------------------------------------
diff --git a/nifi-api/src/main/java/org/apache/nifi/provenance/ProvenanceEventType.java b/nifi-api/src/main/java/org/apache/nifi/provenance/ProvenanceEventType.java
index e5e47b7..188e8fc 100644
--- a/nifi-api/src/main/java/org/apache/nifi/provenance/ProvenanceEventType.java
+++ b/nifi-api/src/main/java/org/apache/nifi/provenance/ProvenanceEventType.java
@@ -23,45 +23,66 @@ public enum ProvenanceEventType {
      * not received from a remote system or external process
      */
     CREATE,
+
     /**
-     * Indicates a provenance event for receiving data from an external process
+     * Indicates a provenance event for receiving data from an external process. This Event Type
+     * is expected to be the first event for a FlowFile. As such, a Processor that receives data
+     * from an external source and uses that data to replace the content of an existing FlowFile
+     * should use the {@link #FETCH} event type, rather than the RECEIVE event type.
      */
     RECEIVE,
+
+    /**
+     * Indicates that the contents of a FlowFile were overwritten using the contents of some
+     * external resource. This is similar to the {@link #RECEIVE} event but varies in that
+     * RECEIVE events are intended to be used as the event that introduces the FlowFile into
+     * the system, whereas FETCH is used to indicate that the contents of an existing FlowFile
+     * were overwritten.
+     */
+    FETCH,
+
     /**
      * Indicates a provenance event for sending data to an external process
      */
     SEND,
+
     /**
      * Indicates a provenance event for the conclusion of an object's life for
      * some reason other than object expiration
      */
     DROP,
+
     /**
      * Indicates a provenance event for the conclusion of an object's life due
      * to the fact that the object could not be processed in a timely manner
      */
     EXPIRE,
+
     /**
      * FORK is used to indicate that one or more FlowFile was derived from a
      * parent FlowFile.
      */
     FORK,
+
     /**
      * JOIN is used to indicate that a single FlowFile is derived from joining
      * together multiple parent FlowFiles.
      */
     JOIN,
+
     /**
      * CLONE is used to indicate that a FlowFile is an exact duplicate of its
      * parent FlowFile.
      */
     CLONE,
+
     /**
      * CONTENT_MODIFIED is used to indicate that a FlowFile's content was
      * modified in some way. When using this Event Type, it is advisable to
      * provide details about how the content is modified.
      */
     CONTENT_MODIFIED,
+
     /**
      * ATTRIBUTES_MODIFIED is used to indicate that a FlowFile's attributes were
      * modified in some way. This event is not needed when another event is
@@ -69,17 +90,20 @@ public enum ProvenanceEventType {
      * FlowFile attributes.
      */
     ATTRIBUTES_MODIFIED,
+
     /**
      * ROUTE is used to show that a FlowFile was routed to a specified
      * {@link org.apache.nifi.processor.Relationship Relationship} and should provide
      * information about why the FlowFile was routed to this relationship.
      */
     ROUTE,
+
     /**
      * Indicates a provenance event for adding additional information such as a
      * new linkage to a new URI or UUID
      */
     ADDINFO,
+
     /**
      * Indicates a provenance event for replaying a FlowFile. The UUID of the
      * event will indicate the UUID of the original FlowFile that is being

http://git-wip-us.apache.org/repos/asf/nifi/blob/51f56402/nifi-api/src/main/java/org/apache/nifi/provenance/ProvenanceReporter.java
----------------------------------------------------------------------
diff --git a/nifi-api/src/main/java/org/apache/nifi/provenance/ProvenanceReporter.java b/nifi-api/src/main/java/org/apache/nifi/provenance/ProvenanceReporter.java
index db589f8..0fd29fd 100644
--- a/nifi-api/src/main/java/org/apache/nifi/provenance/ProvenanceReporter.java
+++ b/nifi-api/src/main/java/org/apache/nifi/provenance/ProvenanceReporter.java
@@ -124,6 +124,43 @@ public interface ProvenanceReporter {
     void receive(FlowFile flowFile, String transitUri, String sourceSystemFlowFileIdentifier, String details, long transmissionMillis);
 
     /**
+     * Emits a Provenance Event of type
+     * {@link ProvenanceEventType#FETCH FETCH} that indicates that the content of the given
+     * FlowFile was overwritten with the data received from an external source.
+     *
+     * @param flowFile the FlowFile whose content was replaced
+     * @param transitUri A URI that provides information about the System and
+     * Protocol information over which the transfer occurred.
+     */
+    void fetch(FlowFile flowFile, String transitUri);
+
+    /**
+     * Emits a Provenance Event of type
+     * {@link ProvenanceEventType#FETCH FETCH} that indicates that the content of the given
+     * FlowFile was overwritten with the data received from an external source.
+     *
+     * @param flowFile the FlowFile whose content was replaced
+     * @param transitUri A URI that provides information about the System and
+     * Protocol information over which the transfer occurred.
+     * @param transmissionMillis the number of milliseconds taken to transfer the data
+     */
+    void fetch(FlowFile flowFile, String transitUri, long transmissionMillis);
+
+    /**
+     * Emits a Provenance Event of type
+     * {@link ProvenanceEventType#FETCH FETCH} that indicates that the content of the given
+     * FlowFile was overwritten with the data received from an external source.
+     *
+     * @param flowFile the FlowFile whose content was replaced
+     * @param transitUri A URI that provides information about the System and
+     * Protocol information over which the transfer occurred.
+     * @param details details about the event
+     * @param transmissionMillis the number of milliseconds taken to transfer
+     * the data
+     */
+    void fetch(FlowFile flowFile, String transitUri, String details, long transmissionMillis);
+    
+    /**
      * Emits a Provenance Event of type {@link ProvenanceEventType#SEND SEND}
      * that indicates that a copy of the given FlowFile was sent to an external
      * destination. The external destination may be a remote system or may be a

http://git-wip-us.apache.org/repos/asf/nifi/blob/51f56402/nifi-mock/src/main/java/org/apache/nifi/util/MockProvenanceReporter.java
----------------------------------------------------------------------
diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/MockProvenanceReporter.java b/nifi-mock/src/main/java/org/apache/nifi/util/MockProvenanceReporter.java
index 8c9a320..8458715 100644
--- a/nifi-mock/src/main/java/org/apache/nifi/util/MockProvenanceReporter.java
+++ b/nifi-mock/src/main/java/org/apache/nifi/util/MockProvenanceReporter.java
@@ -124,7 +124,40 @@ public class MockProvenanceReporter implements ProvenanceReporter {
 
         try {
             final ProvenanceEventRecord record = build(flowFile, ProvenanceEventType.RECEIVE)
-                .setTransitUri(transitUri).setSourceSystemFlowFileIdentifier(sourceSystemFlowFileIdentifier).setEventDuration(transmissionMillis).setDetails(details).build();
+                .setTransitUri(transitUri)
+                .setSourceSystemFlowFileIdentifier(sourceSystemFlowFileIdentifier)
+                .setEventDuration(transmissionMillis)
+                .setDetails(details)
+                .build();
+            events.add(record);
+        } catch (final Exception e) {
+            logger.error("Failed to generate Provenance Event due to " + e);
+            if (logger.isDebugEnabled()) {
+                logger.error("", e);
+            }
+        }
+    }
+
+    @Override
+    public void fetch(final FlowFile flowFile, final String transitUri) {
+        fetch(flowFile, transitUri, -1L);
+    }
+
+    @Override
+    public void fetch(final FlowFile flowFile, final String transitUri, final long transmissionMillis) {
+        fetch(flowFile, transitUri, null, transmissionMillis);
+    }
+
+    @Override
+    public void fetch(final FlowFile flowFile, final String transitUri, final String details, final long transmissionMillis) {
+        verifyFlowFileKnown(flowFile);
+
+        try {
+            final ProvenanceEventRecord record = build(flowFile, ProvenanceEventType.FETCH)
+                .setTransitUri(transitUri)
+                .setEventDuration(transmissionMillis)
+                .setDetails(details)
+                .build();
             events.add(record);
         } catch (final Exception e) {
             logger.error("Failed to generate Provenance Event due to " + e);

http://git-wip-us.apache.org/repos/asf/nifi/blob/51f56402/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java
index 131e671..bc6aeec 100644
--- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java
@@ -152,7 +152,7 @@ public class FetchS3Object extends AbstractS3Processor {
         session.transfer(flowFile, REL_SUCCESS);
         final long transferMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
         getLogger().info("Successfully retrieved S3 Object for {} in {} millis; routing to success", new Object[]{flowFile, transferMillis});
-        session.getProvenanceReporter().receive(flowFile, "http://" + bucket + ".amazonaws.com/" + key, transferMillis);
+        session.getProvenanceReporter().fetch(flowFile, "http://" + bucket + ".amazonaws.com/" + key, transferMillis);
     }
 
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/51f56402/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProvenanceReporter.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProvenanceReporter.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProvenanceReporter.java
index 8852f42..8a89dbf 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProvenanceReporter.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProvenanceReporter.java
@@ -124,7 +124,36 @@ public class StandardProvenanceReporter implements ProvenanceReporter {
 
         try {
             final ProvenanceEventRecord record = build(flowFile, ProvenanceEventType.RECEIVE)
-                    .setTransitUri(transitUri).setSourceSystemFlowFileIdentifier(sourceSystemFlowFileIdentifier).setEventDuration(transmissionMillis).setDetails(details).build();
+                .setTransitUri(transitUri).setSourceSystemFlowFileIdentifier(sourceSystemFlowFileIdentifier).setEventDuration(transmissionMillis).setDetails(details).build();
+            events.add(record);
+        } catch (final Exception e) {
+            logger.error("Failed to generate Provenance Event due to " + e);
+            if (logger.isDebugEnabled()) {
+                logger.error("", e);
+            }
+        }
+    }
+
+    @Override
+    public void fetch(final FlowFile flowFile, final String transitUri) {
+        fetch(flowFile, transitUri, -1L);
+    }
+
+    @Override
+    public void fetch(final FlowFile flowFile, final String transitUri, final long transmissionMillis) {
+        fetch(flowFile, transitUri, null, transmissionMillis);
+    }
+
+    @Override
+    public void fetch(final FlowFile flowFile, final String transitUri, final String details, final long transmissionMillis) {
+        verifyFlowFileKnown(flowFile);
+
+        try {
+            final ProvenanceEventRecord record = build(flowFile, ProvenanceEventType.FETCH)
+                .setTransitUri(transitUri)
+                .setEventDuration(transmissionMillis)
+                .setDetails(details)
+                .build();
             events.add(record);
         } catch (final Exception e) {
             logger.error("Failed to generate Provenance Event due to " + e);

http://git-wip-us.apache.org/repos/asf/nifi/blob/51f56402/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/InvokeHTTP.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/InvokeHTTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/InvokeHTTP.java
index c7be728..848652a 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/InvokeHTTP.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/InvokeHTTP.java
@@ -89,7 +89,7 @@ import org.joda.time.format.DateTimeFormatter;
     @WritesAttribute(attribute = "invokehttp.tx.id", description = "The transaction ID that is returned after reading the response"),
     @WritesAttribute(attribute = "invokehttp.remote.dn", description = "The DN of the remote server")})
 @DynamicProperty(name = "Trusted Hostname", value = "A hostname", description = "Bypass the normal truststore hostname verifier to allow the specified (single) remote hostname as trusted "
-        + "Enabling this property has MITM security implications, use wisely. Only valid with SSL (HTTPS) connections.")
+    + "Enabling this property has MITM security implications, use wisely. Only valid with SSL (HTTPS) connections.")
 public final class InvokeHTTP extends AbstractProcessor {
 
     @Override
@@ -170,76 +170,75 @@ public final class InvokeHTTP extends AbstractProcessor {
         // This set includes our strings defined above as well as some standard flowfile
         // attributes.
         public static final Set<String> IGNORED_ATTRIBUTES = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
-                STATUS_CODE, STATUS_MESSAGE, RESPONSE_BODY, REQUEST_URL, TRANSACTION_ID, REMOTE_DN,
-                "uuid", "filename", "path"
-        )));
+            STATUS_CODE, STATUS_MESSAGE, RESPONSE_BODY, REQUEST_URL, TRANSACTION_ID, REMOTE_DN,
+            "uuid", "filename", "path")));
 
         // properties
         public static final PropertyDescriptor PROP_METHOD = new PropertyDescriptor.Builder()
-                .name("HTTP Method")
-                .description("HTTP request method (GET, POST, PUT, DELETE, HEAD, OPTIONS).")
-                .required(true)
-                .defaultValue("GET")
-                .expressionLanguageSupported(true)
-                .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-                .build();
+            .name("HTTP Method")
+            .description("HTTP request method (GET, POST, PUT, DELETE, HEAD, OPTIONS).")
+            .required(true)
+            .defaultValue("GET")
+            .expressionLanguageSupported(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
 
         public static final PropertyDescriptor PROP_URL = new PropertyDescriptor.Builder()
-                .name("Remote URL")
-                .description("Remote URL which will be connected to, including scheme, host, port, path.")
-                .required(true)
-                .expressionLanguageSupported(true)
-                .addValidator(StandardValidators.URL_VALIDATOR)
-                .build();
+            .name("Remote URL")
+            .description("Remote URL which will be connected to, including scheme, host, port, path.")
+            .required(true)
+            .expressionLanguageSupported(true)
+            .addValidator(StandardValidators.URL_VALIDATOR)
+            .build();
 
         public static final PropertyDescriptor PROP_CONNECT_TIMEOUT = new PropertyDescriptor.Builder()
-                .name("Connection Timeout")
-                .description("Max wait time for connection to remote service.")
-                .required(true)
-                .defaultValue("5 secs")
-                .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
-                .build();
+            .name("Connection Timeout")
+            .description("Max wait time for connection to remote service.")
+            .required(true)
+            .defaultValue("5 secs")
+            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+            .build();
 
         public static final PropertyDescriptor PROP_READ_TIMEOUT = new PropertyDescriptor.Builder()
-                .name("Read Timeout")
-                .description("Max wait time for response from remote service.")
-                .required(true)
-                .defaultValue("15 secs")
-                .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
-                .build();
+            .name("Read Timeout")
+            .description("Max wait time for response from remote service.")
+            .required(true)
+            .defaultValue("15 secs")
+            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+            .build();
 
         public static final PropertyDescriptor PROP_DATE_HEADER = new PropertyDescriptor.Builder()
-                .name("Include Date Header")
-                .description("Include an RFC-2616 Date header in the request.")
-                .required(true)
-                .defaultValue("True")
-                .allowableValues("True", "False")
-                .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
-                .build();
+            .name("Include Date Header")
+            .description("Include an RFC-2616 Date header in the request.")
+            .required(true)
+            .defaultValue("True")
+            .allowableValues("True", "False")
+            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+            .build();
 
         public static final PropertyDescriptor PROP_FOLLOW_REDIRECTS = new PropertyDescriptor.Builder()
-                .name("Follow Redirects")
-                .description("Follow HTTP redirects issued by remote server.")
-                .required(true)
-                .defaultValue("True")
-                .allowableValues("True", "False")
-                .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
-                .build();
+            .name("Follow Redirects")
+            .description("Follow HTTP redirects issued by remote server.")
+            .required(true)
+            .defaultValue("True")
+            .allowableValues("True", "False")
+            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+            .build();
 
         public static final PropertyDescriptor PROP_ATTRIBUTES_TO_SEND = new PropertyDescriptor.Builder()
-                .name("Attributes to Send")
-                .description("Regular expression that defines which attributes to send as HTTP headers in the request. "
-                        + "If not defined, no attributes are sent as headers.")
-                .required(false)
-                .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
-                .build();
+            .name("Attributes to Send")
+            .description("Regular expression that defines which attributes to send as HTTP headers in the request. "
+                + "If not defined, no attributes are sent as headers.")
+            .required(false)
+            .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
+            .build();
 
         public static final PropertyDescriptor PROP_SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
-                .name("SSL Context Service")
-                .description("The SSL Context Service used to provide client certificate information for TLS/SSL (https) connections.")
-                .required(false)
-                .identifiesControllerService(SSLContextService.class)
-                .build();
+            .name("SSL Context Service")
+            .description("The SSL Context Service used to provide client certificate information for TLS/SSL (https) connections.")
+            .required(false)
+            .identifiesControllerService(SSLContextService.class)
+            .build();
 
         public static final PropertyDescriptor PROP_PROXY_HOST = new PropertyDescriptor.Builder()
             .name("Proxy Host")
@@ -256,33 +255,33 @@ public final class InvokeHTTP extends AbstractProcessor {
             .build();
 
         // Per RFC 7235, 2617, and 2616.
-        //      basic-credentials   = base64-user-pass
-        //      base64-user-pass    = userid ":" password
-        //      userid              = *<TEXT excluding ":">
-        //      password            = *TEXT
+        // basic-credentials = base64-user-pass
+        // base64-user-pass = userid ":" password
+        // userid = *<TEXT excluding ":">
+        // password = *TEXT
         //
-        //      OCTET          = <any 8-bit sequence of data>
-        //      CTL            = <any US-ASCII control character (octets 0 - 31) and DEL (127)>
-        //      LWS            = [CRLF] 1*( SP | HT )
-        //      TEXT           = <any OCTET except CTLs but including LWS>
+        // OCTET = <any 8-bit sequence of data>
+        // CTL = <any US-ASCII control character (octets 0 - 31) and DEL (127)>
+        // LWS = [CRLF] 1*( SP | HT )
+        // TEXT = <any OCTET except CTLs but including LWS>
         //
         // Per RFC 7230, username & password in URL are now disallowed in HTTP and HTTPS URIs.
         public static final PropertyDescriptor PROP_BASIC_AUTH_USERNAME = new PropertyDescriptor.Builder()
-                .name("Basic Authentication Username")
-                .displayName("Basic Authentication Username")
-                .description("The username to be used by the client to authenticate against the Remote URL.  Cannot include control characters (0-31), ':', or DEL (127).")
-                .required(false)
-                .addValidator(StandardValidators.createRegexMatchingValidator(Pattern.compile("^[\\x20-\\x39\\x3b-\\x7e\\x80-\\xff]+$")))
-                .build();
+            .name("Basic Authentication Username")
+            .displayName("Basic Authentication Username")
+            .description("The username to be used by the client to authenticate against the Remote URL.  Cannot include control characters (0-31), ':', or DEL (127).")
+            .required(false)
+            .addValidator(StandardValidators.createRegexMatchingValidator(Pattern.compile("^[\\x20-\\x39\\x3b-\\x7e\\x80-\\xff]+$")))
+            .build();
 
         public static final PropertyDescriptor PROP_BASIC_AUTH_PASSWORD = new PropertyDescriptor.Builder()
-                .name("Basic Authentication Password")
-                .displayName("Basic Authentication Password")
-                .description("The password to be used by the client to authenticate against the Remote URL.")
-                .required(false)
-                .sensitive(true)
-                .addValidator(StandardValidators.createRegexMatchingValidator(Pattern.compile("^[\\x20-\\x7e\\x80-\\xff]+$")))
-                .build();
+            .name("Basic Authentication Password")
+            .displayName("Basic Authentication Password")
+            .description("The password to be used by the client to authenticate against the Remote URL.")
+            .required(false)
+            .sensitive(true)
+            .addValidator(StandardValidators.createRegexMatchingValidator(Pattern.compile("^[\\x20-\\x7e\\x80-\\xff]+$")))
+            .build();
 
         public static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(Arrays.asList(
             PROP_METHOD,
@@ -296,48 +295,46 @@ public final class InvokeHTTP extends AbstractProcessor {
             PROP_BASIC_AUTH_USERNAME,
             PROP_BASIC_AUTH_PASSWORD,
             PROP_PROXY_HOST,
-            PROP_PROXY_PORT
-        ));
+            PROP_PROXY_PORT));
 
         // property to allow the hostname verifier to be overridden
         // this is a "hidden" property - it's configured using a dynamic user property
         public static final PropertyDescriptor PROP_TRUSTED_HOSTNAME = new PropertyDescriptor.Builder()
-                .name("Trusted Hostname")
-                .description("Bypass the normal truststore hostname verifier to allow the specified (single) remote hostname as trusted "
-                        + "Enabling this property has MITM security implications, use wisely. Only valid with SSL (HTTPS) connections.")
-                .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-                .dynamic(true)
-                .build();
+            .name("Trusted Hostname")
+            .description("Bypass the normal truststore hostname verifier to allow the specified (single) remote hostname as trusted "
+                + "Enabling this property has MITM security implications, use wisely. Only valid with SSL (HTTPS) connections.")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .dynamic(true)
+            .build();
 
         // relationships
         public static final Relationship REL_SUCCESS_REQ = new Relationship.Builder()
-                .name("Original")
-                .description("Original FlowFile will be routed upon success (2xx status codes).")
-                .build();
+            .name("Original")
+            .description("Original FlowFile will be routed upon success (2xx status codes).")
+            .build();
 
         public static final Relationship REL_SUCCESS_RESP = new Relationship.Builder()
-                .name("Response")
-                .description("Response FlowFile will be routed upon success (2xx status codes).")
-                .build();
+            .name("Response")
+            .description("Response FlowFile will be routed upon success (2xx status codes).")
+            .build();
 
         public static final Relationship REL_RETRY = new Relationship.Builder()
-                .name("Retry")
-                .description("FlowFile will be routed on any status code that can be retried (5xx status codes).")
-                .build();
+            .name("Retry")
+            .description("FlowFile will be routed on any status code that can be retried (5xx status codes).")
+            .build();
 
         public static final Relationship REL_NO_RETRY = new Relationship.Builder()
-                .name("No Retry")
-                .description("FlowFile will be routed on any status code that should NOT be retried (1xx, 3xx, 4xx status codes).")
-                .build();
+            .name("No Retry")
+            .description("FlowFile will be routed on any status code that should NOT be retried (1xx, 3xx, 4xx status codes).")
+            .build();
 
         public static final Relationship REL_FAILURE = new Relationship.Builder()
-                .name("Failure")
-                .description("FlowFile will be routed on any type of connection failure, timeout or general exception.")
-                .build();
+            .name("Failure")
+            .description("FlowFile will be routed on any type of connection failure, timeout or general exception.")
+            .build();
 
         public static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
-                REL_SUCCESS_REQ, REL_SUCCESS_RESP, REL_RETRY, REL_NO_RETRY, REL_FAILURE
-        )));
+            REL_SUCCESS_REQ, REL_SUCCESS_RESP, REL_RETRY, REL_NO_RETRY, REL_FAILURE)));
 
     }
 
@@ -403,7 +400,7 @@ public final class InvokeHTTP extends AbstractProcessor {
                 transfer();
             } catch (final Exception e) {
                 // log exception
-                logger.error("Routing to {} due to exception: {}", new Object[] { REL_FAILURE.getName(), e }, e);
+                logger.error("Routing to {} due to exception: {}", new Object[] {REL_FAILURE.getName(), e}, e);
 
                 // penalize
                 request = session.penalize(request);
@@ -417,7 +414,7 @@ public final class InvokeHTTP extends AbstractProcessor {
                         session.remove(response);
                     }
                 } catch (final Exception e1) {
-                    logger.error("Could not cleanup response flowfile due to exception: {}", new Object[] { e1 }, e1);
+                    logger.error("Could not cleanup response flowfile due to exception: {}", new Object[] {e1}, e1);
                 }
             }
         }
@@ -545,7 +542,7 @@ public final class InvokeHTTP extends AbstractProcessor {
 
                         // emit provenance event
                         final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
-                        session.getProvenanceReporter().modifyContent(response, "Updated content with data received from " + conn.getURL().toExternalForm(), millis);
+                        session.getProvenanceReporter().fetch(response, conn.getURL().toExternalForm(), millis);
                     }
 
                 }
@@ -562,7 +559,7 @@ public final class InvokeHTTP extends AbstractProcessor {
 
             // log the status codes from the response
             logger.info("Request to {} returned status code {} for {}",
-                    new Object[]{conn.getURL().toExternalForm(), statusCode, request});
+                new Object[] {conn.getURL().toExternalForm(), statusCode, request});
 
             // transfer to the correct relationship
             // 2xx -> SUCCESS
@@ -660,12 +657,12 @@ public final class InvokeHTTP extends AbstractProcessor {
 
         private void logRequest() {
             logger.debug("\nRequest to remote service:\n\t{}\n{}",
-                    new Object[]{conn.getURL().toExternalForm(), getLogString(conn.getRequestProperties())});
+                new Object[] {conn.getURL().toExternalForm(), getLogString(conn.getRequestProperties())});
         }
 
         private void logResponse() {
             logger.debug("\nResponse from remote service:\n\t{}\n{}",
-                    new Object[]{conn.getURL().toExternalForm(), getLogString(conn.getHeaderFields())});
+                new Object[] {conn.getURL().toExternalForm(), getLogString(conn.getHeaderFields())});
         }
 
         private String getLogString(Map<String, List<String>> map) {
@@ -753,7 +750,7 @@ public final class InvokeHTTP extends AbstractProcessor {
                 return new BufferedInputStream(is);
 
             } catch (IOException e) {
-                logger.warn("Response stream threw an exception: {}", new Object[]{e}, e);
+                logger.warn("Response stream threw an exception: {}", new Object[] {e}, e);
                 return null;
             }
         }

http://git-wip-us.apache.org/repos/asf/nifi/blob/51f56402/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestInvokeHTTP.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestInvokeHTTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestInvokeHTTP.java
index 46cacca..a4fd3d7 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestInvokeHTTP.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestInvokeHTTP.java
@@ -149,8 +149,8 @@ public class TestInvokeHTTP {
         runner.assertTransferCount(Config.REL_NO_RETRY, 0);
         runner.assertTransferCount(Config.REL_FAILURE, 0);
 
-        //expected in request status.code and status.message
-        //original flow file (+attributes)??????????
+        // expected in request status.code and status.message
+        // original flow file (+attributes)??????????
         final MockFlowFile bundle = runner.getFlowFilesForRelationship(Config.REL_SUCCESS_REQ).get(0);
         bundle.assertAttributeEquals(Config.STATUS_CODE, "200");
         bundle.assertAttributeEquals(Config.STATUS_MESSAGE, "OK");
@@ -159,10 +159,10 @@ public class TestInvokeHTTP {
         Assert.assertEquals(expected, actual);
         bundle.assertAttributeEquals("Foo", "Bar");
 
-        //expected in response
-        //status code, status message, all headers from server response --> ff attributes
-        //server response message body into payload of ff
-        //should not contain any original ff attributes
+        // expected in response
+        // status code, status message, all headers from server response --> ff attributes
+        // server response message body into payload of ff
+        // should not contain any original ff attributes
         final MockFlowFile bundle1 = runner.getFlowFilesForRelationship(Config.REL_SUCCESS_RESP).get(0);
         bundle1.assertContentEquals("/status/200".getBytes("UTF-8"));
         bundle1.assertAttributeEquals(Config.STATUS_CODE, "200");
@@ -198,8 +198,8 @@ public class TestInvokeHTTP {
         runner.assertTransferCount(Config.REL_NO_RETRY, 0);
         runner.assertTransferCount(Config.REL_FAILURE, 0);
 
-        //expected in request status.code and status.message
-        //original flow file (+attributes)??????????
+        // expected in request status.code and status.message
+        // original flow file (+attributes)??????????
         final MockFlowFile bundle = runner.getFlowFilesForRelationship(Config.REL_SUCCESS_REQ).get(0);
         bundle.assertAttributeEquals(Config.STATUS_CODE, "200");
         bundle.assertAttributeEquals(Config.STATUS_MESSAGE, "OK");
@@ -208,10 +208,10 @@ public class TestInvokeHTTP {
         final String expected = "Hello";
         Assert.assertEquals(expected, actual);
 
-        //expected in response
-        //status code, status message, all headers from server response --> ff attributes
-        //server response message body into payload of ff
-        //should not contain any original ff attributes
+        // expected in response
+        // status code, status message, all headers from server response --> ff attributes
+        // server response message body into payload of ff
+        // should not contain any original ff attributes
         final MockFlowFile bundle1 = runner.getFlowFilesForRelationship(Config.REL_SUCCESS_RESP).get(0);
         final String bundle1Content = new String(bundle1.toByteArray(), StandardCharsets.UTF_8);
         assertTrue(bundle1Content.startsWith(expAuth)); // use startsWith instead of equals so we can ignore line endings
@@ -223,17 +223,17 @@ public class TestInvokeHTTP {
         final List<ProvenanceEventRecord> provEvents = runner.getProvenanceEvents();
         assertEquals(2, provEvents.size());
         boolean forkEvent = false;
-        boolean contentModEvent = false;
+        boolean fetchEvent = false;
         for (final ProvenanceEventRecord event : provEvents) {
             if (event.getEventType() == ProvenanceEventType.FORK) {
                 forkEvent = true;
-            } else if (event.getEventType() == ProvenanceEventType.CONTENT_MODIFIED) {
-                contentModEvent = true;
+            } else if (event.getEventType() == ProvenanceEventType.FETCH) {
+                fetchEvent = true;
             }
         }
 
         assertTrue(forkEvent);
-        assertTrue(contentModEvent);
+        assertTrue(fetchEvent);
     }
 
     @Test
@@ -257,8 +257,8 @@ public class TestInvokeHTTP {
         runner.assertTransferCount(Config.REL_NO_RETRY, 1);
         runner.assertTransferCount(Config.REL_FAILURE, 0);
 
-        //expected in request status.code and status.message
-        //original flow file (+attributes)??????????
+        // expected in request status.code and status.message
+        // original flow file (+attributes)??????????
         final MockFlowFile bundle = runner.getFlowFilesForRelationship(Config.REL_NO_RETRY).get(0);
         bundle.assertAttributeEquals(Config.STATUS_CODE, "401");
         bundle.assertAttributeEquals(Config.STATUS_MESSAGE, "Unauthorized");
@@ -286,7 +286,7 @@ public class TestInvokeHTTP {
         runner.assertTransferCount(Config.REL_NO_RETRY, 0);
         runner.assertTransferCount(Config.REL_FAILURE, 0);
 
-        //expected in response
+        // expected in response
         final MockFlowFile bundle = runner.getFlowFilesForRelationship(Config.REL_RETRY).get(0);
         final String actual = new String(bundle.toByteArray(), StandardCharsets.UTF_8);
         bundle.assertAttributeEquals(Config.STATUS_CODE, "500");
@@ -313,8 +313,8 @@ public class TestInvokeHTTP {
         runner.assertTransferCount(Config.REL_RETRY, 0);
         runner.assertTransferCount(Config.REL_NO_RETRY, 1);
         runner.assertTransferCount(Config.REL_FAILURE, 0);
-        //getMyFlowFiles();
-        //expected in response
+        // getMyFlowFiles();
+        // expected in response
         final MockFlowFile bundle = runner.getFlowFilesForRelationship(Config.REL_NO_RETRY).get(0);
         final String actual = new String(bundle.toByteArray(), StandardCharsets.UTF_8);
 
@@ -340,8 +340,8 @@ public class TestInvokeHTTP {
         runner.assertTransferCount(Config.REL_RETRY, 0);
         runner.assertTransferCount(Config.REL_NO_RETRY, 1);
         runner.assertTransferCount(Config.REL_FAILURE, 0);
-        //getMyFlowFiles();
-        //expected in response
+        // getMyFlowFiles();
+        // expected in response
         final MockFlowFile bundle = runner.getFlowFilesForRelationship(Config.REL_NO_RETRY).get(0);
         final String actual = new String(bundle.toByteArray(), StandardCharsets.UTF_8);
 
@@ -367,8 +367,8 @@ public class TestInvokeHTTP {
         runner.assertTransferCount(Config.REL_RETRY, 0);
         runner.assertTransferCount(Config.REL_NO_RETRY, 1);
         runner.assertTransferCount(Config.REL_FAILURE, 0);
-        //getMyFlowFiles();
-        //expected in response
+        // getMyFlowFiles();
+        // expected in response
         final MockFlowFile bundle = runner.getFlowFilesForRelationship(Config.REL_NO_RETRY).get(0);
         final String actual = new String(bundle.toByteArray(), StandardCharsets.UTF_8);
 
@@ -397,7 +397,7 @@ public class TestInvokeHTTP {
         runner.assertTransferCount(Config.REL_NO_RETRY, 1);
         runner.assertTransferCount(Config.REL_FAILURE, 0);
 
-        //expected in response
+        // expected in response
         final MockFlowFile bundle = runner.getFlowFilesForRelationship(Config.REL_NO_RETRY).get(0);
         final String actual = new String(bundle.toByteArray(), StandardCharsets.UTF_8);
 
@@ -593,7 +593,7 @@ public class TestInvokeHTTP {
 
         @Override
         public void handle(String target, Request baseRequest,
-                HttpServletRequest request, HttpServletResponse response)
+            HttpServletRequest request, HttpServletResponse response)
                 throws IOException, ServletException {
 
             baseRequest.setHandled(true);


[3/3] nifi git commit: NIFI-10: Fixed checkstyle violation

Posted by ma...@apache.org.
NIFI-10: Fixed checkstyle violation


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/17006335
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/17006335
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/17006335

Branch: refs/heads/master
Commit: 17006335e5b687a8a268d1571ae04a98235e6394
Parents: fc2aa27
Author: Mark Payne <ma...@hotmail.com>
Authored: Mon Oct 26 17:09:51 2015 -0400
Committer: Mark Payne <ma...@hotmail.com>
Committed: Mon Oct 26 17:09:51 2015 -0400

----------------------------------------------------------------------
 .../main/java/org/apache/nifi/provenance/ProvenanceReporter.java   | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/17006335/nifi-api/src/main/java/org/apache/nifi/provenance/ProvenanceReporter.java
----------------------------------------------------------------------
diff --git a/nifi-api/src/main/java/org/apache/nifi/provenance/ProvenanceReporter.java b/nifi-api/src/main/java/org/apache/nifi/provenance/ProvenanceReporter.java
index 0fd29fd..39eed43 100644
--- a/nifi-api/src/main/java/org/apache/nifi/provenance/ProvenanceReporter.java
+++ b/nifi-api/src/main/java/org/apache/nifi/provenance/ProvenanceReporter.java
@@ -159,7 +159,7 @@ public interface ProvenanceReporter {
      * the data
      */
     void fetch(FlowFile flowFile, String transitUri, String details, long transmissionMillis);
-    
+
     /**
      * Emits a Provenance Event of type {@link ProvenanceEventType#SEND SEND}
      * that indicates that a copy of the given FlowFile was sent to an external