You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2014/12/30 15:10:04 UTC

[1/3] incubator-nifi git commit: NIFI-33: Ensure that we sync to disk, then rename file when swapping out; delete partial files on restart; destroy corrupt files on EOFException/FileNotFoundException when swapping in

Repository: incubator-nifi
Updated Branches:
  refs/heads/develop 0a0b7e05a -> bfe39c0b8


 NIFI-33: Ensure that we sync to disk, then rename file when swapping out; delete partial files on restart; destroy corrupt files on EOFException/FileNotFoundException when swapping in


Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/ece5ce14
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/ece5ce14
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/ece5ce14

Branch: refs/heads/develop
Commit: ece5ce1409925bdb5a4e02476413d67d2d11ea6e
Parents: 1cc3ce5
Author: Mark Payne <ma...@hotmail.com>
Authored: Mon Dec 15 14:57:11 2014 -0500
Committer: Mark Payne <ma...@hotmail.com>
Committed: Mon Dec 15 14:57:11 2014 -0500

----------------------------------------------------------------------
 .../nifi/controller/FileSystemSwapManager.java  | 85 ++++++++++++++------
 1 file changed, 62 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/ece5ce14/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java
index ad95f8e..bf76bad 100644
--- a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java
+++ b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java
@@ -22,6 +22,7 @@ import java.io.DataOutputStream;
 import java.io.EOFException;
 import java.io.File;
 import java.io.FileInputStream;
+import java.io.FileNotFoundException;
 import java.io.FileOutputStream;
 import java.io.FilenameFilter;
 import java.io.IOException;
@@ -80,6 +81,8 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
 
     public static final int MINIMUM_SWAP_COUNT = 10000;
     private static final Pattern SWAP_FILE_PATTERN = Pattern.compile("\\d+-.+\\.swap");
+    private static final Pattern TEMP_SWAP_FILE_PATTERN = Pattern.compile("\\d+-.+\\.swap\\.part");
+    
     public static final int SWAP_ENCODING_VERSION = 6;
     public static final String EVENT_CATEGORY = "Swap FlowFiles";
 
@@ -441,14 +444,18 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
                                 }
 
                                 if (!swapFile.delete()) {
-                                    final String errMsg = "Swapped in FlowFiles from file " + swapFile.getAbsolutePath() + " but failed to delete the file; this file should be cleaned up manually";
-                                    logger.warn(errMsg);
-                                    eventReporter.reportEvent(Severity.WARNING, EVENT_CATEGORY, errMsg);
+                                    warn("Swapped in FlowFiles from file " + swapFile.getAbsolutePath() + " but failed to delete the file; this file should be cleaned up manually");
                                 }
+                            } catch (final EOFException eof) {
+                                error("Failed to Swap In FlowFiles for " + flowFileQueue + " due to: Corrupt Swap File; will remove this Swap File: " + swapFile);
+                                
+                                if ( !swapFile.delete() ) {
+                                    warn("Failed to remove corrupt Swap File " + swapFile + "; This file should be cleaned up manually");
+                                }
+                            } catch (final FileNotFoundException fnfe) {
+                                error("Failed to Swap In FlowFiles for " + flowFileQueue + " due to: Could not find Swap File " + swapFile);
                             } catch (final Exception e) {
-                                final String errMsg = "Failed to Swap In FlowFiles for " + flowFileQueue + " due to " + e;
-                                logger.error(errMsg);
-                                eventReporter.reportEvent(Severity.ERROR, EVENT_CATEGORY, errMsg);
+                                error("Failed to Swap In FlowFiles for " + flowFileQueue + " due to " + e, e);
                                 
                                 if (swapFile != null) {
                                     queue.add(swapFile);
@@ -463,8 +470,29 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
         }
     }
 
+    private void error(final String error, final Throwable t) {
+        error(error);
+        if ( logger.isDebugEnabled() ) {
+            logger.error("", t);
+        }
+    }
+    
+    private void error(final String error) {
+        logger.error(error);
+        if ( eventReporter != null ) {
+            eventReporter.reportEvent(Severity.ERROR, EVENT_CATEGORY, error);
+        }
+    }
+    
+    private void warn(final String warning) {
+        logger.warn(warning);
+        if ( eventReporter != null ) {
+            eventReporter.reportEvent(Severity.WARNING, EVENT_CATEGORY, warning);
+        }
+    }
+    
+    
     private class SwapOutTask implements Runnable {
-
         private final BlockingQueue<FlowFileQueue> connectionQueue;
 
         public SwapOutTask(final BlockingQueue<FlowFileQueue> connectionQueue) {
@@ -486,20 +514,27 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
 
                 while (flowFileQueue.getSwapQueueSize() >= MINIMUM_SWAP_COUNT) {
                     final File swapFile = new File(storageDirectory, System.currentTimeMillis() + "-" + UUID.randomUUID().toString() + ".swap");
+                    final File swapTempFile = new File(swapFile.getParentFile(), swapFile.getName() + ".part");
                     final String swapLocation = swapFile.getAbsolutePath();
                     final List<FlowFileRecord> toSwap = flowFileQueue.pollSwappableRecords();
 
                     int recordsSwapped;
-                    try (final FileOutputStream fos = new FileOutputStream(swapFile)) {
-                        recordsSwapped = serializeFlowFiles(toSwap, flowFileQueue, swapLocation, fos);
-                        flowFileRepository.swapFlowFilesOut(toSwap, flowFileQueue, swapLocation);
-                        fos.getFD().sync();
+                    try {
+                        try (final FileOutputStream fos = new FileOutputStream(swapTempFile)) {
+                            recordsSwapped = serializeFlowFiles(toSwap, flowFileQueue, swapLocation, fos);
+                            fos.getFD().sync();
+                        }
+                        
+                        if ( swapTempFile.renameTo(swapFile) ) {
+                            flowFileRepository.swapFlowFilesOut(toSwap, flowFileQueue, swapLocation);
+                        } else {
+                            error("Failed to swap out FlowFiles from " + flowFileQueue + " due to: Unable to rename swap file from " + swapTempFile + " to " + swapFile);
+                            recordsSwapped = 0;
+                        }
                     } catch (final IOException ioe) {
                         recordsSwapped = 0;
                         flowFileQueue.putSwappedRecords(toSwap);
-                        final String errMsg = "Failed to swap out " + toSwap.size() + " FlowFiles from " + flowFileQueue + " to Swap File " + swapLocation + " due to " + ioe;
-                        logger.error(errMsg);
-                        eventReporter.reportEvent(Severity.ERROR, EVENT_CATEGORY, errMsg);
+                        error("Failed to swap out " + toSwap.size() + " FlowFiles from " + flowFileQueue + " to Swap File " + swapLocation + " due to " + ioe, ioe);
                     }
 
                     if (recordsSwapped > 0) {
@@ -533,7 +568,7 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
         final File[] swapFiles = storageDirectory.listFiles(new FilenameFilter() {
             @Override
             public boolean accept(final File dir, final String name) {
-                return SWAP_FILE_PATTERN.matcher(name).matches();
+                return SWAP_FILE_PATTERN.matcher(name).matches() || TEMP_SWAP_FILE_PATTERN.matcher(name).matches();
             }
         });
 
@@ -553,6 +588,16 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
         long maxRecoveredId = 0L;
 
         for (final File swapFile : swapFiles) {
+            if ( TEMP_SWAP_FILE_PATTERN.matcher(swapFile.getName()).matches() ) {
+                if ( swapFile.delete() ) {
+                    logger.info("Removed incomplete/temporary Swap File " + swapFile);
+                } else {
+                    warn("Failed to remove incomplete/temporary Swap File " + swapFile + "; this file should be cleaned up manually");
+                }
+                
+                continue;
+            }
+            
             // read record to disk via the swap file
             try (final InputStream fis = new FileInputStream(swapFile);
                     final InputStream bufferedIn = new BufferedInputStream(fis);
@@ -570,8 +615,7 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
                 final String connectionId = in.readUTF();
                 final FlowFileQueue queue = queueMap.get(connectionId);
                 if (queue == null) {
-                    logger.error("Cannot recover Swapped FlowFiles from Swap File {} because the FlowFiles belong to a Connection with ID {} and that Connection does not exist", swapFile, connectionId);
-                    eventReporter.reportEvent(Severity.ERROR, EVENT_CATEGORY, "Cannot recover Swapped FlowFiles from Swap File " + swapFile + " because the FlowFiles belong to a Connection with ID " + connectionId + " and that Connection does not exist");
+                    error("Cannot recover Swapped FlowFiles from Swap File " + swapFile + " because the FlowFiles belong to a Connection with ID " + connectionId + " and that Connection does not exist");
                     continue;
                 }
 
@@ -594,12 +638,7 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
                     maxRecoveredId = maxId;
                 }
             } catch (final IOException ioe) {
-                final String errMsg = "Cannot recover Swapped FlowFiles from Swap File " + swapFile + " due to " + ioe;
-                logger.error(errMsg);
-                eventReporter.reportEvent(Severity.ERROR, EVENT_CATEGORY, errMsg);
-                if (logger.isDebugEnabled()) {
-                    logger.error("", ioe);
-                }
+                error("Cannot recover Swapped FlowFiles from Swap File " + swapFile + " due to " + ioe, ioe);
             }
         }
 


[2/3] incubator-nifi git commit: NIFI-33: On failure, was attempting to remove non-existent file; fixed this

Posted by ma...@apache.org.
NIFI-33: On failure, was attempting to remove non-existent file; fixed this


Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/77971847
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/77971847
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/77971847

Branch: refs/heads/develop
Commit: 77971847129133bef0d9340e44b23c8d352be303
Parents: ece5ce1
Author: Mark Payne <ma...@hotmail.com>
Authored: Mon Dec 15 15:02:44 2014 -0500
Committer: Mark Payne <ma...@hotmail.com>
Committed: Mon Dec 15 15:02:44 2014 -0500

----------------------------------------------------------------------
 .../java/org/apache/nifi/controller/FileSystemSwapManager.java     | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/77971847/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java
index bf76bad..c1ebc97 100644
--- a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java
+++ b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java
@@ -549,7 +549,7 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
 
                         swapQueue.getQueue().add(swapFile);
                     } else {
-                        swapFile.delete();
+                        swapTempFile.delete();
                     }
                 }
             }


[3/3] incubator-nifi git commit: Merge branch 'NIFI-33' into develop

Posted by ma...@apache.org.
Merge branch 'NIFI-33' into develop


Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/bfe39c0b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/bfe39c0b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/bfe39c0b

Branch: refs/heads/develop
Commit: bfe39c0b82dbf671bebfbda26b11d657a8adba2a
Parents: 0a0b7e0 7797184
Author: Mark Payne <ma...@hotmail.com>
Authored: Tue Dec 30 09:08:19 2014 -0500
Committer: Mark Payne <ma...@hotmail.com>
Committed: Tue Dec 30 09:08:19 2014 -0500

----------------------------------------------------------------------
 .../nifi/controller/FileSystemSwapManager.java  | 87 ++++++++++++++------
 1 file changed, 63 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/bfe39c0b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java
----------------------------------------------------------------------