You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pa...@apache.org on 2016/01/14 21:55:59 UTC

[08/20] storm git commit: Addressing review comments from Arun.

Addressing review comments from Arun.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/0b07f8b3
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/0b07f8b3
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/0b07f8b3

Branch: refs/heads/master
Commit: 0b07f8b3a8a458f39cc9f64be1e5623b0a6815d2
Parents: b5240a7
Author: Roshan Naik <ro...@hortonworks.com>
Authored: Thu Jan 7 16:25:45 2016 -0800
Committer: Roshan Naik <ro...@hortonworks.com>
Committed: Thu Jan 14 11:34:56 2016 -0800

----------------------------------------------------------------------
 external/storm-hdfs/README.md                   | 10 ++--
 .../org/apache/storm/hdfs/spout/HdfsSpout.java  | 50 ++++++++++++++------
 .../apache/storm/hdfs/spout/TestHdfsSpout.java  |  1 -
 3 files changed, 41 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/0b07f8b3/external/storm-hdfs/README.md
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/README.md b/external/storm-hdfs/README.md
index 8008bd8..bf63ad9 100644
--- a/external/storm-hdfs/README.md
+++ b/external/storm-hdfs/README.md
@@ -452,9 +452,9 @@ before selecting the next file for consumption.
 **Lock on *.lock* Directory**
 Hdfs spout instances create a *DIRLOCK* file in the .lock directory to co-ordinate certain accesses to 
 the .lock dir itself. A spout will try to create it when it needs access to the .lock directory and
-then delete it when done.  In case of a topology crash or force kill, this file may not get deleted.
-In this case it should be deleted manually to allow the new topology instance to regain  full access 
-to the  .lock  directory and resume normal processing. 
+then delete it when done.  In error conditions such as a topology crash, force kill or untimely death 
+of a spout, this file may not get deleted. Future running instances of the spout will eventually recover
+this once the DIRLOCK file becomes stale due to inactivity for hdfsspout.lock.timeout.sec seconds.
 
 ## Usage
 
@@ -515,13 +515,13 @@ Only settings mentioned in **bold** are required.
 |**hdfsspout.source.dir**      |             | HDFS location from where to read.  E.g. /data/inputfiles  |
 |**hdfsspout.archive.dir**     |             | After a file is processed completely it will be moved to this directory. E.g. /data/done|
 |**hdfsspout.badfiles.dir**    |             | if there is an error parsing a file's contents, the file is moved to this location.  E.g. /data/badfiles  |
-|hdfsspout.lock.dir            | '.lock' subdirectory under hdfsspout.source.dir | Dir in which lock files will be created. Concurrent HDFS spout instances synchronize using *lock* files. Before processing a file the spout instance creates a lock file in this directory with same name as input file and deletes this lock file after processing the file. Spout also periodically makes a note of its progress (wrt reading the input file) in the lock file so that another spout instance can resume progress on the same file if the spout dies for any reason. When a toplogy is killed, if a .lock/DIRLOCK file is left behind it can be safely deleted to allow normal resumption of the topology on restart.|
+|hdfsspout.lock.dir            | '.lock' subdirectory under hdfsspout.source.dir | Dir in which lock files will be created. Concurrent HDFS spout instances synchronize using *lock* files. Before processing a file the spout instance creates a lock file in this directory with same name as input file and deletes this lock file after processing the file. Spouts also periodically makes a note of their progress (wrt reading the input file) in the lock file so that another spout instance can resume progress on the same file if the spout dies for any reason.|
 |hdfsspout.ignore.suffix       |   .ignore   | File names with this suffix in the in the hdfsspout.source.dir location will not be processed|
 |hdfsspout.commit.count        |    20000    | Record progress in the lock file after these many records are processed. If set to 0, this criterion will not be used. |
 |hdfsspout.commit.sec          |    10       | Record progress in the lock file after these many seconds have elapsed. Must be greater than 0 |
 |hdfsspout.max.outstanding     |   10000     | Limits the number of unACKed tuples by pausing tuple generation (if ACKers are used in the topology) |
 |hdfsspout.lock.timeout.sec    |  5 minutes  | Duration of inactivity after which a lock file is considered to be abandoned and ready for another spout to take ownership |
-|hdfsspout.clocks.insync       |    true     | Indicates whether clocks on the storm machines are in sync (using services like NTP)       |
+|hdfsspout.clocks.insync       |    true     | Indicates whether clocks on the storm machines are in sync (using services like NTP). Used for detecting stale locks. |
 |hdfs.config (unless changed)  |             | Set it to a Map of Key/value pairs indicating the HDFS settigns to be used. For example, keytab and principle could be set using this. See section **Using keytabs on all worker hosts** under HDFS bolt below.| 
 
 ---

http://git-wip-us.apache.org/repos/asf/storm/blob/0b07f8b3/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java
index 93d08d5..994d87e 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java
@@ -258,13 +258,19 @@ public class HdfsSpout extends BaseRichSpout {
 
     reader.close();
     reader = null;
+    releaseLockAndLog(lock, spoutId);
+    lock = null;
+  }
+
+  private static void releaseLockAndLog(FileLock fLock, String spoutId) {
     try {
-      lock.release();
-      LOG.debug("Spout {} released FileLock. SpoutId = {}", lock.getLockFile(), spoutId);
+      if(fLock!=null) {
+        fLock.release();
+        LOG.debug("Spout {} released FileLock. SpoutId = {}", fLock.getLockFile(), spoutId);
+      }
     } catch (IOException e) {
-      LOG.error("Unable to delete lock file : " + this.lock.getLockFile() + " SpoutId =" + spoutId, e);
+      LOG.error("Unable to delete lock file : " +fLock.getLockFile() + " SpoutId =" + spoutId, e);
     }
-    lock = null;
   }
 
   protected void emitData(List<Object> tuple, MessageId id) {
@@ -475,7 +481,7 @@ public class HdfsSpout extends BaseRichSpout {
     }
   }
 
-  private FileReader pickNextFile()  {
+  private FileReader pickNextFile() {
     try {
       // 1) If there are any abandoned files, pick oldest one
       lock = getOldestExpiredLock();
@@ -491,19 +497,19 @@ public class HdfsSpout extends BaseRichSpout {
       Collection<Path> listing = HdfsUtils.listFilesByModificationTime(hdfs, sourceDirPath, 0);
 
       for (Path file : listing) {
-        if( file.getName().endsWith(inprogress_suffix) ) {
+        if (file.getName().endsWith(inprogress_suffix)) {
           continue;
         }
-        if( file.getName().endsWith(ignoreSuffix) ) {
+        if (file.getName().endsWith(ignoreSuffix)) {
           continue;
         }
         lock = FileLock.tryLock(hdfs, file, lockDirPath, spoutId);
-        if( lock==null ) {
+        if (lock == null) {
           LOG.debug("Unable to get FileLock for {}, so skipping it.", file);
           continue; // could not lock, so try another file.
         }
         LOG.info("Processing : {} ", file);
-        Path newFile = renameSelectedFile(file);
+        Path newFile = renameToInProgressFile(file);
         return createFileReader(newFile);
       }
 
@@ -624,14 +630,18 @@ public class HdfsSpout extends BaseRichSpout {
     }
   }
 
-  // returns new path of renamed file
-  private Path renameSelectedFile(Path file)
+  /**
+   * Renames files with .inprogress suffix
+   * @return path of renamed file
+   * @throws if operation fails
+   */
+  private Path renameToInProgressFile(Path file)
           throws IOException {
     Path newFile =  new Path( file.toString() + inprogress_suffix );
-    if( ! hdfs.rename(file, newFile) ) {
-      throw new IOException("Rename failed for file: " + file);
+    if (hdfs.rename(file, newFile)) {
+      return newFile;
     }
-    return newFile;
+    throw new IOException("Rename of " + file + " to " + newFile + " failed");
   }
 
   /** Returns the corresponding input file in the 'sourceDirPath' for the specified lock file.
@@ -699,4 +709,16 @@ public class HdfsSpout extends BaseRichSpout {
     }
   }
 
+  private static class RenameFailedException extends IOException {
+    public final Path file;
+    public RenameFailedException(Path file) {
+      super("Rename failed for file: " + file);
+      this.file = file;
+    }
+
+    public RenameFailedException(Path file, IOException e) {
+      super("Rename failed for file: " + file, e);
+      this.file = file;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/0b07f8b3/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSpout.java b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSpout.java
index 0412126..835a714 100644
--- a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSpout.java
+++ b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSpout.java
@@ -103,7 +103,6 @@ public class TestHdfsSpout {
     fs.mkdirs(archive);
     badfiles = new Path(baseFolder.toString() + "/bad");
     fs.mkdirs(badfiles);
-
   }
 
   @After