You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2014/12/15 20:28:17 UTC

incubator-nifi git commit: NIFI-35: Provide an EventReporter to the FlowFileSwapManager and provide events for any errors

Repository: incubator-nifi
Updated Branches:
  refs/heads/NIFI-35 [created] 1cc3ce575


NIFI-35: Provide an EventReporter to the FlowFileSwapManager and provide events for any errors


Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/1cc3ce57
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/1cc3ce57
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/1cc3ce57

Branch: refs/heads/NIFI-35
Commit: 1cc3ce57556eb7cf9a5f94b269eb24b284e518eb
Parents: 9e60aa0
Author: Mark Payne <ma...@hotmail.com>
Authored: Mon Dec 15 14:28:11 2014 -0500
Committer: Mark Payne <ma...@hotmail.com>
Committed: Mon Dec 15 14:28:11 2014 -0500

----------------------------------------------------------------------
 .../nifi/controller/FileSystemSwapManager.java  | 33 +++++++++++++++-----
 .../apache/nifi/controller/FlowController.java  | 20 +++++++-----
 .../repository/FlowFileSwapManager.java         |  5 ++-
 3 files changed, 41 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/1cc3ce57/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java
index 3af2098..ad95f8e 100644
--- a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java
+++ b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java
@@ -61,11 +61,12 @@ import org.apache.nifi.controller.repository.StandardFlowFileRecord;
 import org.apache.nifi.controller.repository.claim.ContentClaim;
 import org.apache.nifi.controller.repository.claim.ContentClaimManager;
 import org.apache.nifi.engine.FlowEngine;
+import org.apache.nifi.events.EventReporter;
 import org.apache.nifi.io.BufferedOutputStream;
 import org.apache.nifi.processor.QueueSize;
+import org.apache.nifi.reporting.Severity;
 import org.apache.nifi.util.FormatUtils;
 import org.apache.nifi.util.NiFiProperties;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -80,10 +81,12 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
     public static final int MINIMUM_SWAP_COUNT = 10000;
     private static final Pattern SWAP_FILE_PATTERN = Pattern.compile("\\d+-.+\\.swap");
     public static final int SWAP_ENCODING_VERSION = 6;
+    public static final String EVENT_CATEGORY = "Swap FlowFiles";
 
     private final ScheduledExecutorService swapQueueIdentifierExecutor;
     private final ScheduledExecutorService swapInExecutor;
     private volatile FlowFileRepository flowFileRepository;
+    private volatile EventReporter eventReporter;
 
     // Maintains a mapping of FlowFile Queue to the a QueueLockWrapper, which provides queue locking and necessary state for swapping back in
     private final ConcurrentMap<FlowFileQueue, QueueLockWrapper> swapMap = new ConcurrentHashMap<>();
@@ -129,9 +132,10 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
         }
     }
 
-    public synchronized void start(final FlowFileRepository flowFileRepository, final QueueProvider connectionProvider, final ContentClaimManager claimManager) {
+    public synchronized void start(final FlowFileRepository flowFileRepository, final QueueProvider connectionProvider, final ContentClaimManager claimManager, final EventReporter eventReporter) {
         this.claimManager = claimManager;
         this.flowFileRepository = flowFileRepository;
+        this.eventReporter = eventReporter;
         swapQueueIdentifierExecutor.scheduleWithFixedDelay(new QueueIdentifier(connectionProvider), swapOutMillis, swapOutMillis, TimeUnit.MILLISECONDS);
         swapInExecutor.scheduleWithFixedDelay(new SwapInTask(), swapInMillis, swapInMillis, TimeUnit.MILLISECONDS);
     }
@@ -437,10 +441,15 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
                                 }
 
                                 if (!swapFile.delete()) {
-                                    logger.warn("Swapped in FlowFiles from file " + swapFile.getAbsolutePath() + " but failed to delete the file; this file can be cleaned up manually");
+                                    final String errMsg = "Swapped in FlowFiles from file " + swapFile.getAbsolutePath() + " but failed to delete the file; this file should be cleaned up manually";
+                                    logger.warn(errMsg);
+                                    eventReporter.reportEvent(Severity.WARNING, EVENT_CATEGORY, errMsg);
                                 }
                             } catch (final Exception e) {
-                                logger.error("Failed to Swap In FlowFiles for {} due to {}", new Object[]{flowFileQueue, e.toString()}, e);
+                                final String errMsg = "Failed to Swap In FlowFiles for " + flowFileQueue + " due to " + e;
+                                logger.error(errMsg);
+                                eventReporter.reportEvent(Severity.ERROR, EVENT_CATEGORY, errMsg);
+                                
                                 if (swapFile != null) {
                                     queue.add(swapFile);
                                 }
@@ -488,7 +497,9 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
                     } catch (final IOException ioe) {
                         recordsSwapped = 0;
                         flowFileQueue.putSwappedRecords(toSwap);
-                        logger.error("Failed to swap out {} FlowFiles from {} to Swap File {} due to {}", new Object[]{toSwap.size(), flowFileQueue, swapLocation, ioe.toString()}, ioe);
+                        final String errMsg = "Failed to swap out " + toSwap.size() + " FlowFiles from " + flowFileQueue + " to Swap File " + swapLocation + " due to " + ioe;
+                        logger.error(errMsg);
+                        eventReporter.reportEvent(Severity.ERROR, EVENT_CATEGORY, errMsg);
                     }
 
                     if (recordsSwapped > 0) {
@@ -549,14 +560,18 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
 
                 final int swapEncodingVersion = in.readInt();
                 if (swapEncodingVersion > SWAP_ENCODING_VERSION) {
-                    throw new IOException("Cannot swap FlowFiles in from " + swapFile + " because the encoding version is "
-                            + swapEncodingVersion + ", which is too new (expecting " + SWAP_ENCODING_VERSION + " or less)");
+                    final String errMsg = "Cannot swap FlowFiles in from " + swapFile + " because the encoding version is "
+                            + swapEncodingVersion + ", which is too new (expecting " + SWAP_ENCODING_VERSION + " or less)";
+
+                    eventReporter.reportEvent(Severity.ERROR, EVENT_CATEGORY, errMsg);
+                    throw new IOException(errMsg);
                 }
 
                 final String connectionId = in.readUTF();
                 final FlowFileQueue queue = queueMap.get(connectionId);
                 if (queue == null) {
                     logger.error("Cannot recover Swapped FlowFiles from Swap File {} because the FlowFiles belong to a Connection with ID {} and that Connection does not exist", swapFile, connectionId);
+                    eventReporter.reportEvent(Severity.ERROR, EVENT_CATEGORY, "Cannot recover Swapped FlowFiles from Swap File " + swapFile + " because the FlowFiles belong to a Connection with ID " + connectionId + " and that Connection does not exist");
                     continue;
                 }
 
@@ -579,7 +594,9 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
                     maxRecoveredId = maxId;
                 }
             } catch (final IOException ioe) {
-                logger.error("Cannot recover Swapped FlowFiles from Swap File {} due to {}", swapFile, ioe.toString());
+                final String errMsg = "Cannot recover Swapped FlowFiles from Swap File " + swapFile + " due to " + ioe;
+                logger.error(errMsg);
+                eventReporter.reportEvent(Severity.ERROR, EVENT_CATEGORY, errMsg);
                 if (logger.isDebugEnabled()) {
                     logger.error("", ioe);
                 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/1cc3ce57/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/FlowController.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/FlowController.java b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/FlowController.java
index e1abe4e..545017a 100644
--- a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/FlowController.java
+++ b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -388,13 +388,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, H
 
         try {
             this.provenanceEventRepository = createProvenanceRepository(properties);
-            this.provenanceEventRepository.initialize(new EventReporter() {
-                @Override
-                public void reportEvent(final Severity severity, final String category, final String message) {
-                    final Bulletin bulletin = BulletinFactory.createBulletin(category, severity.name(), message);
-                    bulletinRepository.addBulletin(bulletin);
-                }
-            });
+            this.provenanceEventRepository.initialize(createEventReporter(bulletinRepository));
 
             this.contentRepository = createContentRepository(properties);
         } catch (final Exception e) {
@@ -516,6 +510,16 @@ public class FlowController implements EventAccess, ControllerServiceProvider, H
         }
     }
 
+    private static EventReporter createEventReporter(final BulletinRepository bulletinRepository) {
+        return new EventReporter() {
+            @Override
+            public void reportEvent(final Severity severity, final String category, final String message) {
+                final Bulletin bulletin = BulletinFactory.createBulletin(category, severity.name(), message);
+                bulletinRepository.addBulletin(bulletin);
+            }
+        };
+    }
+    
     public void initializeFlow() throws IOException {
         writeLock.lock();
         try {
@@ -537,7 +541,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, H
             contentRepository.cleanup();
 
             if (flowFileSwapManager != null) {
-                flowFileSwapManager.start(flowFileRepository, this, contentClaimManager);
+                flowFileSwapManager.start(flowFileRepository, this, contentClaimManager, createEventReporter(bulletinRepository));
             }
 
             if (externalSiteListener != null) {

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/1cc3ce57/nifi-api/src/main/java/org/apache/nifi/controller/repository/FlowFileSwapManager.java
----------------------------------------------------------------------
diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/repository/FlowFileSwapManager.java b/nifi-api/src/main/java/org/apache/nifi/controller/repository/FlowFileSwapManager.java
index 739cb2b..c6daab8 100644
--- a/nifi-api/src/main/java/org/apache/nifi/controller/repository/FlowFileSwapManager.java
+++ b/nifi-api/src/main/java/org/apache/nifi/controller/repository/FlowFileSwapManager.java
@@ -17,6 +17,7 @@
 package org.apache.nifi.controller.repository;
 
 import org.apache.nifi.controller.repository.claim.ContentClaimManager;
+import org.apache.nifi.events.EventReporter;
 
 /**
  * Defines a mechanism by which FlowFiles can be move into external storage or
@@ -34,8 +35,10 @@ public interface FlowFileSwapManager {
      * can be obtained and restored
      * @param claimManager the ContentClaimManager to use for interacting with
      * Content Claims
+     * @param reporter the EventReporter that can be used for notifying users of
+     * important events
      */
-    void start(FlowFileRepository flowFileRepository, QueueProvider queueProvider, ContentClaimManager claimManager);
+    void start(FlowFileRepository flowFileRepository, QueueProvider queueProvider, ContentClaimManager claimManager, EventReporter reporter);
 
     /**
      * Shuts down the manager