You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pa...@apache.org on 2016/01/14 21:56:09 UTC

[18/20] storm git commit: Addressing another review comment from Arun about releasing lock file on exception.

Addressing another review comment from Arun about releasing  lock file on exception.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/2c02bc91
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/2c02bc91
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/2c02bc91

Branch: refs/heads/master
Commit: 2c02bc91d8a9b81b55a4e023c927a73068bcc927
Parents: 0b07f8b
Author: Roshan Naik <ro...@hortonworks.com>
Authored: Thu Jan 7 19:23:19 2016 -0800
Committer: Roshan Naik <ro...@hortonworks.com>
Committed: Thu Jan 14 11:34:57 2016 -0800

----------------------------------------------------------------------
 .../org/apache/storm/hdfs/spout/HdfsSpout.java  | 45 +++++++++++++-------
 1 file changed, 30 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/2c02bc91/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java
index 994d87e..5428570 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java
@@ -508,9 +508,16 @@ public class HdfsSpout extends BaseRichSpout {
           LOG.debug("Unable to get FileLock for {}, so skipping it.", file);
           continue; // could not lock, so try another file.
         }
-        LOG.info("Processing : {} ", file);
-        Path newFile = renameToInProgressFile(file);
-        return createFileReader(newFile);
+        try {
+          Path newFile = renameToInProgressFile(file);
+          FileReader result = createFileReader(newFile);
+          LOG.info("Processing : {} ", file);
+          return result;
+        } catch (Exception e) {
+          LOG.error("Skipping file " + file, e);
+          releaseLockAndLog(lock, spoutId);
+          continue;
+        }
       }
 
       return null;
@@ -599,7 +606,7 @@ public class HdfsSpout extends BaseRichSpout {
       return (FileReader) constructor.newInstance(this.hdfs, file, conf);
     } catch (Exception e) {
       LOG.error(e.getMessage(), e);
-      throw new RuntimeException("Unable to instantiate " + readerType, e);
+      throw new RuntimeException("Unable to instantiate " + readerType + " reader", e);
     }
   }
 
@@ -638,10 +645,14 @@ public class HdfsSpout extends BaseRichSpout {
   private Path renameToInProgressFile(Path file)
           throws IOException {
     Path newFile =  new Path( file.toString() + inprogress_suffix );
-    if (hdfs.rename(file, newFile)) {
-      return newFile;
+    try {
+      if (hdfs.rename(file, newFile)) {
+        return newFile;
+      }
+      throw new RenameException(file, newFile);
+    } catch (IOException e){
+      throw new RenameException(file, newFile, e);
     }
-    throw new IOException("Rename of " + file + " to " + newFile + " failed");
   }
 
   /** Returns the corresponding input file in the 'sourceDirPath' for the specified lock file.
@@ -709,16 +720,20 @@ public class HdfsSpout extends BaseRichSpout {
     }
   }
 
-  private static class RenameFailedException extends IOException {
-    public final Path file;
-    public RenameFailedException(Path file) {
-      super("Rename failed for file: " + file);
-      this.file = file;
+  private static class RenameException extends IOException {
+    public final Path oldFile;
+    public final Path newFile;
+
+    public RenameException(Path oldFile, Path newFile) {
+      super("Rename of " + oldFile + " to " + newFile + " failed");
+      this.oldFile = oldFile;
+      this.newFile = newFile;
     }
 
-    public RenameFailedException(Path file, IOException e) {
-      super("Rename failed for file: " + file, e);
-      this.file = file;
+    public RenameException(Path oldFile, Path newFile, IOException cause) {
+      super("Rename of " + oldFile + " to " + newFile + " failed", cause);
+      this.oldFile = oldFile;
+      this.newFile = newFile;
     }
   }
 }