You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pa...@apache.org on 2016/01/14 21:56:03 UTC

[12/20] storm git commit: Adding stale lock recovery in DirLock. Added tests for filelock recovery, dirlock recovery, commit_freq_sec & commit_freq_count, TestHdfsSpout.testLocking, TestHdfsSemantics, some review comments etc

Adding stale lock recovery in DirLock. Added tests for filelock recovery, dirlock recovery, commit_freq_sec & commit_freq_count, TestHdfsSpout.testLocking, TestHdfsSemantics, some review comments etc


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/152856d1
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/152856d1
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/152856d1

Branch: refs/heads/master
Commit: 152856d1156065f51430497629ee37412ac098b2
Parents: de37de6
Author: Roshan Naik <ro...@hortonworks.com>
Authored: Thu Dec 17 14:19:54 2015 -0800
Committer: Roshan Naik <ro...@hortonworks.com>
Committed: Thu Jan 14 11:34:56 2016 -0800

----------------------------------------------------------------------
 .../hdfs/common/CmpFilesByModificationTime.java |   6 +-
 .../org/apache/storm/hdfs/common/HdfsUtils.java |   8 +-
 .../org/apache/storm/hdfs/spout/Configs.java    |   4 +-
 .../org/apache/storm/hdfs/spout/DirLock.java    |  37 +++-
 .../org/apache/storm/hdfs/spout/FileLock.java   |  16 +-
 .../org/apache/storm/hdfs/spout/HdfsSpout.java  |  39 ++--
 .../apache/storm/hdfs/spout/TestDirLock.java    |  40 ++--
 .../apache/storm/hdfs/spout/TestFileLock.java   |  33 ++-
 .../storm/hdfs/spout/TestHdfsSemantics.java     | 204 +++++++++++++++++++
 .../apache/storm/hdfs/spout/TestHdfsSpout.java  | 155 ++++++++++----
 .../src/test/resources/log4j.properties         |  26 +++
 11 files changed, 473 insertions(+), 95 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/152856d1/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/CmpFilesByModificationTime.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/CmpFilesByModificationTime.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/CmpFilesByModificationTime.java
index acee9a5..67420aa 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/CmpFilesByModificationTime.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/CmpFilesByModificationTime.java
@@ -18,15 +18,15 @@
 
 package org.apache.storm.hdfs.common;
 
-import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.FileStatus;
 
 import java.util.Comparator;
 
 
 public class CmpFilesByModificationTime
-        implements Comparator<LocatedFileStatus> {
+        implements Comparator<FileStatus> {
    @Override
-    public int compare(LocatedFileStatus o1, LocatedFileStatus o2) {
+    public int compare(FileStatus o1, FileStatus o2) {
       return new Long(o1.getModificationTime()).compareTo( o1.getModificationTime() );
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/152856d1/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/HdfsUtils.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/HdfsUtils.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/HdfsUtils.java
index 0574c6a..e8df78d 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/HdfsUtils.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/HdfsUtils.java
@@ -29,13 +29,12 @@ import org.apache.hadoop.ipc.RemoteException;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.Collections;
 
 public class HdfsUtils {
   /** list files sorted by modification time that have not been modified since 'olderThan'. if
    * 'olderThan' is <= 0 then the filtering is disabled */
-  public static Collection<Path> listFilesByModificationTime(FileSystem fs, Path directory, long olderThan)
+  public static ArrayList<Path> listFilesByModificationTime(FileSystem fs, Path directory, long olderThan)
           throws IOException {
     ArrayList<LocatedFileStatus> fstats = new ArrayList<>();
 
@@ -43,7 +42,7 @@ public class HdfsUtils {
     while( itr.hasNext() ) {
       LocatedFileStatus fileStatus = itr.next();
       if(olderThan>0) {
-        if( fileStatus.getModificationTime()<olderThan )
+        if( fileStatus.getModificationTime()<=olderThan )
           fstats.add(fileStatus);
       }
       else {
@@ -69,7 +68,7 @@ public class HdfsUtils {
     } catch (FileAlreadyExistsException e) {
       return null;
     } catch (RemoteException e) {
-      if( e.getClassName().contentEquals(AlreadyBeingCreatedException.class.getName()) ) {
+      if( e.unwrapRemoteException() instanceof AlreadyBeingCreatedException ) {
         return null;
       } else { // unexpected error
         throw e;
@@ -77,7 +76,6 @@ public class HdfsUtils {
     }
   }
 
-
   public static class Pair<K,V> {
     private K key;
     private V value;

http://git-wip-us.apache.org/repos/asf/storm/blob/152856d1/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/Configs.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/Configs.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/Configs.java
index 9a9ae73..93d775b 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/Configs.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/Configs.java
@@ -27,8 +27,8 @@ public class Configs {
   public static final String ARCHIVE_DIR = "hdfsspout.archive.dir";         // completed files will be moved here
   public static final String BAD_DIR = "hdfsspout.badfiles.dir";            // unpraseable files will be moved here
   public static final String LOCK_DIR = "hdfsspout.lock.dir";               // dir in which lock files will be created
-  public static final String COMMIT_FREQ_COUNT = "hdfsspout.commit.count";  // commit after N records
-  public static final String COMMIT_FREQ_SEC = "hdfsspout.commit.sec";      // commit after N secs
+  public static final String COMMIT_FREQ_COUNT = "hdfsspout.commit.count";  // commit after N records. 0 disables this.
+  public static final String COMMIT_FREQ_SEC = "hdfsspout.commit.sec";      // commit after N secs. cannot be disabled.
   public static final String MAX_DUPLICATE = "hdfsspout.max.duplicate";
   public static final String LOCK_TIMEOUT = "hdfsspout.lock.timeout.sec";   // inactivity duration after which locks are considered candidates for being reassigned to another spout
   public static final String CLOCKS_INSYNC = "hdfsspout.clocks.insync";     // if clocks on machines in the Storm cluster are in sync

http://git-wip-us.apache.org/repos/asf/storm/blob/152856d1/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/DirLock.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/DirLock.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/DirLock.java
index 0ff2f37..06ca749 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/DirLock.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/DirLock.java
@@ -28,7 +28,7 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 
 /**
- * Facility to sychronize access to HDFS directory. The lock itself is represented
+ * Facility to synchronize access to HDFS directory. The lock itself is represented
  * as a file in the same directory. Relies on atomic file creation.
  */
 public class DirLock {
@@ -51,7 +51,7 @@ public class DirLock {
    * @throws IOException if there were errors
    */
   public static DirLock tryLock(FileSystem fs, Path dir) throws IOException {
-    Path lockFile = new Path(dir.toString() + Path.SEPARATOR_CHAR + DIR_LOCK_FILE );
+    Path lockFile = getDirLockFile(dir);
 
     try {
       FSDataOutputStream ostream = HdfsUtils.tryCreateFile(fs, lockFile);
@@ -69,6 +69,10 @@ public class DirLock {
     }
   }
 
+  private static Path getDirLockFile(Path dir) {
+    return new Path(dir.toString() + Path.SEPARATOR_CHAR + DIR_LOCK_FILE );
+  }
+
   private static String threadInfo () {
     return "ThdId=" + Thread.currentThread().getId() + ", ThdName="
             + Thread.currentThread().getName();
@@ -80,6 +84,35 @@ public class DirLock {
     log.info("Thread {} released dir lock {} ", threadInfo(), lockFile);
   }
 
+  /** if the lock on the directory is stale, take ownership */
+  public static DirLock takeOwnershipIfStale(FileSystem fs, Path dirToLock, int lockTimeoutSec) {
+    Path dirLockFile = getDirLockFile(dirToLock);
+
+    long now =  System.currentTimeMillis();
+    long expiryTime = now - (lockTimeoutSec*1000);
+
+    try {
+      long modTime = fs.getFileStatus(dirLockFile).getModificationTime();
+      if(modTime <= expiryTime)
+        return takeOwnership(fs, dirLockFile);
+      return null;
+    } catch (IOException e)  {
+      return  null;
+    }
+  }
+
+
+  private static DirLock takeOwnership(FileSystem fs, Path dirLockFile) throws IOException {
+    // delete and recreate lock file
+    if( fs.delete(dirLockFile, false) ) { // returns false if somebody else already deleted it (to take ownership)
+      FSDataOutputStream ostream = HdfsUtils.tryCreateFile(fs, dirLockFile);
+      if(ostream!=null)
+        ostream.close();
+      return new DirLock(fs, dirLockFile);
+    }
+    return null;
+  }
+
   public Path getLockFile() {
     return lockFile;
   }

http://git-wip-us.apache.org/repos/asf/storm/blob/152856d1/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileLock.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileLock.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileLock.java
index 76a459d..b40d1dd 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileLock.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileLock.java
@@ -92,6 +92,12 @@ public class FileLock {
   public void release() throws IOException {
     lockFileStream.close();
     fs.delete(lockFile, false);
+    log.debug("Released lock file {}", lockFile);
+  }
+
+  // for testing only.. invoked via reflection
+  private void forceCloseLockFile() throws IOException {
+    lockFileStream.close();
   }
 
   /** returns lock on file or null if file is already locked. throws if unexpected problem */
@@ -135,6 +141,7 @@ public class FileLock {
       if(lastEntry==null) {
         throw new RuntimeException(lockFile.getName() + " is empty. this file is invalid.");
       }
+      log.error("{} , lastModified= {},  expiryTime= {},  diff= {}", lockFile, lastEntry.eventTime, olderThan,  lastEntry.eventTime-olderThan );
       if( lastEntry.eventTime <= olderThan )
         return lastEntry;
     }
@@ -176,8 +183,8 @@ public class FileLock {
     try {
       return new FileLock(fs, lockFile, spoutId, lastEntry);
     } catch (RemoteException e) {
-      if (e.getClassName().contentEquals(AlreadyBeingCreatedException.class.getName())) {
-        log.info("Lock file {} is currently open. cannot transfer ownership on.", lockFile);
+      if (e.unwrapRemoteException() instanceof AlreadyBeingCreatedException) {
+        log.info("Lock file {} is currently open. Cannot transfer ownership.", lockFile);
         return null;
       } else { // unexpected error
         throw e;
@@ -198,7 +205,8 @@ public class FileLock {
   public static FileLock acquireOldestExpiredLock(FileSystem fs, Path lockFilesDir, int locktimeoutSec, String spoutId)
           throws IOException {
     // list files
-    long olderThan = System.currentTimeMillis() - (locktimeoutSec*1000);
+    long now = System.currentTimeMillis();
+    long olderThan = now - (locktimeoutSec*1000);
     Collection<Path> listing = HdfsUtils.listFilesByModificationTime(fs, lockFilesDir, olderThan);
 
     // locate expired lock files (if any). Try to take ownership (oldest lock first)
@@ -213,7 +221,7 @@ public class FileLock {
       }
     }
     if(listing.isEmpty())
-      log.info("No abandoned files to be refound");
+      log.info("No abandoned lock files found");
     return null;
   }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/152856d1/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java
index 7977b96..50c2172 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java
@@ -124,14 +124,14 @@ public class HdfsSpout extends BaseRichSpout {
       return;
     }
 
-    // 2) If no failed tuples, then send tuples from hdfs
+    // 2) If no failed tuples to be retried, then send tuples from hdfs
     while (true) {
       try {
         // 3) Select a new file if one is not open already
         if (reader == null) {
           reader = pickNextFile();
           if (reader == null) {
-            LOG.info("Currently no new files to process under : " + sourceDirPath);
+            LOG.debug("Currently no new files to process under : " + sourceDirPath);
             return;
           }
         }
@@ -165,8 +165,9 @@ public class HdfsSpout extends BaseRichSpout {
         LOG.error("Parsing error when processing at file location " + getFileProgress(reader) +
                 ". Skipping remainder of file.", e);
         markFileAsBad(reader.getFilePath());
-        // note: Unfortunately not emitting anything here due to parse error
-        // will trigger the configured spout wait strategy which is unnecessary
+        // Note: We don't return from this method on ParseException to avoid triggering the
+        // spout wait strategy (due to no emits). Instead we go back into the loop and
+        // generate a tuple from next file
       }
     }
 
@@ -192,7 +193,7 @@ public class HdfsSpout extends BaseRichSpout {
     TimerTask timerTask = new TimerTask() {
       @Override
       public void run() {
-        commitTimeElapsed.set(false);
+        commitTimeElapsed.set(true);
       }
     };
     commitTimer.schedule(timerTask, commitFrequencySec * 1000);
@@ -206,7 +207,8 @@ public class HdfsSpout extends BaseRichSpout {
   private void markFileAsDone(Path filePath) {
     fileReadCompletely = false;
     try {
-      renameCompletedFile(reader.getFilePath());
+      Path newFile = renameCompletedFile(reader.getFilePath());
+      LOG.info("Completed processing {}", newFile);
     } catch (IOException e) {
       LOG.error("Unable to archive completed file" + filePath, e);
     }
@@ -220,7 +222,7 @@ public class HdfsSpout extends BaseRichSpout {
     String originalName = new Path(fileNameMinusSuffix).getName();
     Path  newFile = new Path( badFilesDirPath + Path.SEPARATOR + originalName);
 
-    LOG.info("Moving bad file to " + newFile);
+    LOG.info("Moving bad file {} to {} ", originalName, newFile);
     try {
       if (!hdfs.rename(file, newFile) ) { // seems this can fail by returning false or throwing exception
         throw new IOException("Move failed for bad file: " + file); // convert false ret value to exception
@@ -254,7 +256,7 @@ public class HdfsSpout extends BaseRichSpout {
   public void open(Map conf, TopologyContext context,  SpoutOutputCollector collector) {
     this.conf = conf;
     final String FILE_SYSTEM = "filesystem";
-    LOG.info("Opening");
+    LOG.info("Opening HDFS Spout");
     this.collector = collector;
     this.hdfsConfig = new Configuration();
     this.tupleCounter = 0;
@@ -436,7 +438,8 @@ public class HdfsSpout extends BaseRichSpout {
   }
 
   private boolean canCommitNow() {
-    if( acksSinceLastCommit >= commitFrequencyCount )
+
+    if( commitFrequencyCount>0 &&  acksSinceLastCommit >= commitFrequencyCount )
       return true;
     return commitTimeElapsed.get();
   }
@@ -455,7 +458,7 @@ public class HdfsSpout extends BaseRichSpout {
       if (lock != null) {
         Path file = getFileForLockFile(lock.getLockFile(), sourceDirPath);
         String resumeFromOffset = lock.getLastLogEntry().fileOffset;
-        LOG.info("Processing abandoned file : {}", file);
+        LOG.info("Resuming processing of abandoned file : {}", file);
         return createFileReader(file, resumeFromOffset);
       }
 
@@ -468,12 +471,12 @@ public class HdfsSpout extends BaseRichSpout {
         if( file.getName().endsWith(ignoreSuffix) )
           continue;
 
-        LOG.info("Processing : {} ", file);
         lock = FileLock.tryLock(hdfs, file, lockDirPath, spoutId);
         if( lock==null ) {
-          LOG.info("Unable to get lock, so skipping file: {}", file);
+          LOG.debug("Unable to get lock, so skipping file: {}", file);
           continue; // could not lock, so try another file.
         }
+        LOG.info("Processing : {} ", file);
         Path newFile = renameSelectedFile(file);
         return createFileReader(newFile);
       }
@@ -494,8 +497,11 @@ public class HdfsSpout extends BaseRichSpout {
   private FileLock getOldestExpiredLock() throws IOException {
     // 1 - acquire lock on dir
     DirLock dirlock = DirLock.tryLock(hdfs, lockDirPath);
-    if (dirlock == null)
-      return null;
+    if (dirlock == null) {
+      dirlock = DirLock.takeOwnershipIfStale(hdfs, lockDirPath, lockTimeoutSec);
+      if (dirlock == null)
+        return null;
+    }
     try {
       // 2 - if clocks are in sync then simply take ownership of the oldest expired lock
       if (clocksInSync)
@@ -606,14 +612,15 @@ public class HdfsSpout extends BaseRichSpout {
   }
 
 
+  // renames files and returns the new file path
   private Path renameCompletedFile(Path file) throws IOException {
     String fileName = file.toString();
     String fileNameMinusSuffix = fileName.substring(0, fileName.indexOf(inprogress_suffix));
     String newName = new Path(fileNameMinusSuffix).getName();
 
     Path  newFile = new Path( archiveDirPath + Path.SEPARATOR + newName );
-    LOG.debug("Renaming complete file to " + newFile);
-    LOG.info("Completed file " + fileNameMinusSuffix );
+    LOG.debug("Renaming complete file to {} ", newFile);
+    LOG.info("Completed file {}", fileNameMinusSuffix );
     if (!hdfs.rename(file, newFile) ) {
       throw new IOException("Rename failed for file: " + file);
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/152856d1/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestDirLock.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestDirLock.java b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestDirLock.java
index 667248e..a7b73d6 100644
--- a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestDirLock.java
+++ b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestDirLock.java
@@ -25,16 +25,12 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.log4j.Level;
-import org.apache.log4j.Logger;
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.BeforeClass;
-import org.junit.Rule;
 import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
 
 import java.io.IOException;
 
@@ -45,8 +41,8 @@ public class TestDirLock {
   static FileSystem fs;
   static String hdfsURI;
   static HdfsConfiguration conf = new  HdfsConfiguration();
-
-  private Path lockDir = new Path("/tmp/lockdir");
+  static final int LOCK_EXPIRY_SEC = 1;
+  private Path locksDir = new Path("/tmp/lockdir");
 
   @BeforeClass
   public static void setupClass() throws IOException {
@@ -65,23 +61,23 @@ public class TestDirLock {
 
   @Before
   public void setUp() throws Exception {
-    assert fs.mkdirs(lockDir) ;
+    assert fs.mkdirs(locksDir) ;
   }
 
   @After
   public void tearDown() throws Exception {
-    fs.delete(lockDir, true);
+    fs.delete(locksDir, true);
   }
 
 
   @Test
   public void testBasicLocking() throws Exception {
     // 1 grab lock
-    DirLock lock = DirLock.tryLock(fs, lockDir);
+    DirLock lock = DirLock.tryLock(fs, locksDir);
     Assert.assertTrue(fs.exists(lock.getLockFile()));
 
     // 2 try to grab another lock while dir is locked
-    DirLock lock2 = DirLock.tryLock(fs, lockDir); // should fail
+    DirLock lock2 = DirLock.tryLock(fs, locksDir); // should fail
     Assert.assertNull(lock2);
 
     // 3 let go first lock
@@ -89,7 +85,7 @@ public class TestDirLock {
     Assert.assertFalse(fs.exists(lock.getLockFile()));
 
     // 4 try locking again
-    lock2  = DirLock.tryLock(fs, lockDir);
+    lock2  = DirLock.tryLock(fs, locksDir);
     Assert.assertTrue(fs.exists(lock2.getLockFile()));
     lock2.release();
     Assert.assertFalse(fs.exists(lock.getLockFile()));
@@ -99,7 +95,7 @@ public class TestDirLock {
 
   @Test
   public void testConcurrentLocking() throws Exception {
-    DirLockingThread[] thds = startThreads(100, lockDir );
+    DirLockingThread[] thds = startThreads(100, locksDir);
     for (DirLockingThread thd : thds) {
       thd.join();
       if( !thd.cleanExit)
@@ -107,7 +103,7 @@ public class TestDirLock {
       Assert.assertTrue(thd.cleanExit);
     }
 
-    Path lockFile = new Path(lockDir + Path.SEPARATOR + DirLock.DIR_LOCK_FILE);
+    Path lockFile = new Path(locksDir + Path.SEPARATOR + DirLock.DIR_LOCK_FILE);
     Assert.assertFalse(fs.exists(lockFile));
   }
 
@@ -124,6 +120,24 @@ public class TestDirLock {
     return result;
   }
 
+  @Test
+  public void testLockRecovery() throws Exception {
+    DirLock lock1 = DirLock.tryLock(fs, locksDir);   // should pass
+    Assert.assertNotNull(lock1);
+
+    DirLock lock2 = DirLock.takeOwnershipIfStale(fs, locksDir, LOCK_EXPIRY_SEC); // should fail
+    Assert.assertNull(lock2);
+
+    Thread.sleep(LOCK_EXPIRY_SEC*1000 + 500); // wait for lock to expire
+    Assert.assertTrue(fs.exists(lock1.getLockFile()));
+
+    DirLock lock3 = DirLock.takeOwnershipIfStale(fs, locksDir, LOCK_EXPIRY_SEC); // should pass now
+    Assert.assertNotNull(lock3);
+    Assert.assertTrue(fs.exists(lock3.getLockFile()));
+    lock3.release();
+    Assert.assertFalse(fs.exists(lock3.getLockFile()));
+    lock1.release(); // should not throw
+  }
 
   class DirLockingThread extends Thread {
 

http://git-wip-us.apache.org/repos/asf/storm/blob/152856d1/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestFileLock.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestFileLock.java b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestFileLock.java
index 1f22a5b..a97b3f2 100644
--- a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestFileLock.java
+++ b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestFileLock.java
@@ -33,10 +33,12 @@ import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
+
 import java.io.BufferedReader;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InputStreamReader;
+import java.lang.reflect.Method;
 import java.util.ArrayList;
 
 public class TestFileLock {
@@ -68,11 +70,13 @@ public class TestFileLock {
   @Before
   public void setUp() throws Exception {
     assert fs.mkdirs(filesDir) ;
+    assert fs.mkdirs(locksDir) ;
   }
 
   @After
   public void tearDown() throws Exception {
     fs.delete(filesDir, true);
+    fs.delete(locksDir, true);
   }
 
   @Test
@@ -261,9 +265,9 @@ public class TestFileLock {
   }
 
   @Test
-  public void testStaleLockRecovery() throws Exception {
+  public void testLockRecovery() throws Exception {
     final int LOCK_EXPIRY_SEC = 1;
-    final int WAIT_MSEC = 1500;
+    final int WAIT_MSEC = LOCK_EXPIRY_SEC*1000 + 500;
     Path file1 = new Path(filesDir + Path.SEPARATOR + "file1");
     Path file2 = new Path(filesDir + Path.SEPARATOR + "file2");
     Path file3 = new Path(filesDir + Path.SEPARATOR + "file3");
@@ -284,27 +288,38 @@ public class TestFileLock {
       HdfsUtils.Pair<Path, FileLock.LogEntry> expired = FileLock.locateOldestExpiredLock(fs, locksDir, LOCK_EXPIRY_SEC);
       Assert.assertNull(expired);
 
+      // 1) Simulate lock file lease expiring and getting closed by HDFS
+      closeUnderlyingLockFile(lock3);
+
       // 2) wait for all 3 locks to expire then heart beat on 2 locks
-      Thread.sleep(WAIT_MSEC);
+      Thread.sleep(WAIT_MSEC*2); // wait for locks to expire
       lock1.heartbeat("1");
       lock2.heartbeat("1");
 
-      //todo: configure the HDFS lease timeout
-
       // 3) Take ownership of stale lock
       FileLock lock3b = FileLock.acquireOldestExpiredLock(fs, locksDir, LOCK_EXPIRY_SEC, "spout1");
-//      Assert.assertNotNull(lock3b);
-//      Assert.assertEquals("Expected lock3 file", lock3b.getLockFile(), lock3.getLockFile());
-    }finally {
+      Assert.assertNotNull(lock3b);
+      Assert.assertEquals("Expected lock3 file", Path.getPathWithoutSchemeAndAuthority(lock3b.getLockFile()), lock3.getLockFile());
+    } finally {
       lock1.release();
       lock2.release();
       lock3.release();
       fs.delete(file1, false);
       fs.delete(file2, false);
-      fs.delete(file3, false);
+      try {
+        fs.delete(file3, false);
+      } catch (Exception e) {
+        e.printStackTrace();
+      }
     }
   }
 
+  private void closeUnderlyingLockFile(FileLock lock) throws ReflectiveOperationException {
+    Method m = FileLock.class.getDeclaredMethod("forceCloseLockFile");
+    m.setAccessible(true);
+    m.invoke(lock);
+  }
+
   /** return null if file not found */
   private ArrayList<String> readTextFile(Path file) throws IOException {
     FSDataInputStream os = null;

http://git-wip-us.apache.org/repos/asf/storm/blob/152856d1/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSemantics.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSemantics.java b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSemantics.java
new file mode 100644
index 0000000..6628cc9
--- /dev/null
+++ b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSemantics.java
@@ -0,0 +1,204 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.hdfs.spout;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileAlreadyExistsException;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
+import org.apache.hadoop.ipc.RemoteException;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+
+public class TestHdfsSemantics {
+
+  static MiniDFSCluster.Builder builder;
+  static MiniDFSCluster hdfsCluster;
+  static FileSystem fs;
+  static String hdfsURI;
+  static HdfsConfiguration conf = new  HdfsConfiguration();
+
+  private Path dir = new Path("/tmp/filesdir");
+
+  @BeforeClass
+  public static void setupClass() throws IOException {
+    conf.set(CommonConfigurationKeys.IPC_PING_INTERVAL_KEY,"5000");
+    builder = new MiniDFSCluster.Builder(new Configuration());
+    hdfsCluster = builder.build();
+    fs  = hdfsCluster.getFileSystem();
+    hdfsURI = "hdfs://localhost:" + hdfsCluster.getNameNodePort() + "/";
+  }
+
+  @AfterClass
+  public static void teardownClass() throws IOException {
+    fs.close();
+    hdfsCluster.shutdown();
+  }
+
+  @Before
+  public void setUp() throws Exception {
+    assert fs.mkdirs(dir) ;
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    fs.delete(dir, true);
+  }
+
+
+  @Test
+  public void testDeleteSemantics() throws Exception {
+    Path file = new Path(dir.toString() + Path.SEPARATOR_CHAR + "file1");
+//    try {
+    // 1) Delete absent file - should return false
+    Assert.assertFalse(fs.exists(file));
+    try {
+      Assert.assertFalse(fs.delete(file, false));
+    } catch (IOException e) {
+      e.printStackTrace();
+    }
+
+    // 2) deleting open file - should return true
+    fs.create(file, false);
+    Assert.assertTrue(fs.delete(file, false));
+
+    // 3) deleting closed file  - should return true
+    FSDataOutputStream os = fs.create(file, false);
+    os.close();
+    Assert.assertTrue(fs.exists(file));
+    Assert.assertTrue(fs.delete(file, false));
+    Assert.assertFalse(fs.exists(file));
+  }
+
+  @Test
+  public void testConcurrentDeletion() throws Exception {
+    Path file = new Path(dir.toString() + Path.SEPARATOR_CHAR + "file1");
+    fs.create(file).close();
+    // 1 concurrent deletion - only one thread should succeed
+    FileDeletionThread[] thds = startThreads(10, file);
+    int successCount=0;
+    for (FileDeletionThread thd : thds) {
+      thd.join();
+      if( thd.succeeded)
+        successCount++;
+      if(thd.exception!=null)
+        Assert.assertNotNull(thd.exception);
+    }
+    System.err.println(successCount);
+    Assert.assertEquals(1, successCount);
+
+  }
+
+  @Test
+  public void testAppendSemantics() throws Exception {
+    //1 try to append to an open file
+    Path file1 = new Path(dir.toString() + Path.SEPARATOR_CHAR + "file1");
+    FSDataOutputStream os1 = fs.create(file1, false);
+    try {
+      fs.append(file1); // should fail
+      Assert.assertTrue("Append did not throw an exception", false);
+    } catch (RemoteException e) {
+      // expecting AlreadyBeingCreatedException inside RemoteException
+      Assert.assertEquals(AlreadyBeingCreatedException.class, e.unwrapRemoteException().getClass());
+    }
+
+    //2 try to append to a closed file
+    os1.close();
+    FSDataOutputStream os2 = fs.append(file1); // should pass
+    os2.close();
+  }
+
+  @Test
+  public void testDoubleCreateSemantics() throws Exception {
+    //1 create an already existing open file w/o override flag
+    Path file1 = new Path(dir.toString() + Path.SEPARATOR_CHAR + "file1");
+    FSDataOutputStream os1 = fs.create(file1, false);
+    try {
+      fs.create(file1, false); // should fail
+      Assert.assertTrue("Create did not throw an exception", false);
+    } catch (RemoteException e) {
+      Assert.assertEquals(AlreadyBeingCreatedException.class, e.unwrapRemoteException().getClass());
+    }
+    //2 close file and retry creation
+    os1.close();
+    try {
+      fs.create(file1, false);  // should still fail
+    } catch (FileAlreadyExistsException e) {
+      // expecting this exception
+    }
+
+    //3 delete file and retry creation
+    fs.delete(file1, false);
+    FSDataOutputStream os2 = fs.create(file1, false);  // should pass
+    Assert.assertNotNull(os2);
+    os2.close();
+  }
+
+
+  private FileDeletionThread[] startThreads(int thdCount, Path file)
+          throws IOException {
+    FileDeletionThread[] result = new FileDeletionThread[thdCount];
+    for (int i = 0; i < thdCount; i++) {
+      result[i] = new FileDeletionThread(i, fs, file);
+    }
+
+    for (FileDeletionThread thd : result) {
+      thd.start();
+    }
+    return result;
+  }
+
+  private static class FileDeletionThread extends Thread {
+
+    private final int thdNum;
+    private final FileSystem fs;
+    private final Path file;
+    public boolean succeeded;
+    public Exception exception = null;
+
+    public FileDeletionThread(int thdNum, FileSystem fs, Path file)
+            throws IOException {
+      this.thdNum = thdNum;
+      this.fs = fs;
+      this.file = file;
+    }
+
+    @Override
+    public void run() {
+      Thread.currentThread().setName("FileDeletionThread-" + thdNum);
+      try {
+        succeeded = fs.delete(file, false);
+      } catch (Exception e) {
+        exception = e;
+      }
+    } // run()
+
+  } // class FileLockingThread
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/152856d1/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSpout.java b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSpout.java
index d967572..98d21f8 100644
--- a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSpout.java
+++ b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSpout.java
@@ -105,7 +105,7 @@ public class TestHdfsSpout {
 
   @After
   public void shutDown() throws IOException {
-    fs.delete(new Path(baseFolder.toString()),true);
+    fs.delete(new Path(baseFolder.toString()), true);
   }
 
   @Test
@@ -134,7 +134,6 @@ public class TestHdfsSpout {
     checkCollectorOutput_txt((MockCollector) spout.getCollector(), arc1, arc2);
   }
 
-
   private void checkCollectorOutput_txt(MockCollector collector, Path... txtFiles) throws IOException {
     ArrayList<String> expected = new ArrayList<>();
     for (Path txtFile : txtFiles) {
@@ -196,10 +195,6 @@ public class TestHdfsSpout {
     listDir(archive);
   }
 
-  private List<String> listBadDir() throws IOException {
-    return listDir(badfiles);
-  }
-
   private List<String> listDir(Path p) throws IOException {
     ArrayList<String> result = new ArrayList<>();
     System.err.println("*** Listing " + p);
@@ -207,7 +202,7 @@ public class TestHdfsSpout {
     while ( fileNames.hasNext() ) {
       LocatedFileStatus fileStatus = fileNames.next();
       System.err.println(fileStatus.getPath());
-      result.add(fileStatus.getPath().toString());
+      result.add(Path.getPathWithoutSchemeAndAuthority(fileStatus.getPath()).toString());
     }
     return result;
   }
@@ -244,50 +239,127 @@ public class TestHdfsSpout {
     checkCollectorOutput_seq((MockCollector) spout.getCollector(), f1, f2);
   }
 
-// - TODO: this test needs the spout to fail with an exception
   @Test
-  public void testFailure() throws Exception {
-
+  public void testReadFailures() throws Exception {
+    // 1) create couple of input files to read
     Path file1 = new Path(source.toString() + "/file1.txt");
-    createTextFile(file1, 5);
+    Path file2 = new Path(source.toString() + "/file2.txt");
 
-    listDir(source);
+    createTextFile(file1, 6);
+    createTextFile(file2, 7);
+    Assert.assertEquals(2, listDir(source).size());
 
+    // 2) run spout
     Map conf = getDefaultConfig();
-//    conf.put(HdfsSpout.Configs.BACKOFF_SEC, "2");
     HdfsSpout spout = makeSpout(0, conf, MockTextFailingReader.class.getName());
-    List<String> res = runSpout(spout, "r3");
-    for (String re : res) {
-      System.err.println(re);
-    }
-
-    listCompletedDir();
-    List<String> badFiles = listBadDir();
-    Assert.assertEquals( badFiles.size(), 1);
-    Assert.assertEquals(((MockCollector) spout.getCollector()).lines.size(), 1);
+    List<String> res = runSpout(spout, "r11");
+    String[] expected = new String[] {"[line 0]","[line 1]","[line 2]","[line 0]","[line 1]","[line 2]"};
+    Assert.assertArrayEquals(expected, res.toArray());
+
+    // 3) make sure 6 lines (3 from each file) were read in all
+    Assert.assertEquals(((MockCollector) spout.getCollector()).lines.size(), 6);
+    ArrayList<Path> badFiles = HdfsUtils.listFilesByModificationTime(fs, badfiles, 0);
+    Assert.assertEquals(badFiles.size(), 2);
   }
 
-  // @Test
+  // check lock creation/deletion and contents
+   @Test
   public void testLocking() throws Exception {
+     Path file1 = new Path(source.toString() + "/file1.txt");
+     createTextFile(file1, 10);
+
+     // 0) config spout to log progress in lock file for each tuple
+     Map conf = getDefaultConfig();
+     conf.put(Configs.COMMIT_FREQ_COUNT, "1");
+     conf.put(Configs.COMMIT_FREQ_SEC, "100"); // make it irrelvant
+     HdfsSpout spout = makeSpout(0, conf, Configs.TEXT);
+
+     // 1) read initial lines in file, then check if lock exists
+     List<String> res = runSpout(spout, "r5");
+     Assert.assertEquals(5, res.size());
+     List<String> lockFiles = listDir(spout.getLockDirPath());
+     Assert.assertEquals(1, lockFiles.size());
+
+     // 2) check log file content line count == tuples emitted + 1
+     List<String> lines = readTextFile(fs, lockFiles.get(0));
+     Assert.assertEquals(lines.size(), res.size()+1);
+
+     // 3) read remaining lines in file, then ensure lock is gone
+     runSpout(spout, "r6");
+     lockFiles = listDir(spout.getLockDirPath());
+     Assert.assertEquals(0, lockFiles.size());
+
+
+     // 4)  --- Create another input file and reverify same behavior ---
+     Path file2 = new Path(source.toString() + "/file2.txt");
+     createTextFile(file2, 10);
+
+     // 5) read initial lines in file, then check if lock exists
+     res = runSpout(spout, "r5");
+     Assert.assertEquals(15, res.size());
+     lockFiles = listDir(spout.getLockDirPath());
+     Assert.assertEquals(1, lockFiles.size());
+
+     // 6) check log file content line count == tuples emitted + 1
+     lines = readTextFile(fs, lockFiles.get(0));
+     Assert.assertEquals(6, lines.size());
+
+     // 7) read remaining lines in file, then ensure lock is gone
+     runSpout(spout, "r6");
+     lockFiles = listDir(spout.getLockDirPath());
+     Assert.assertEquals(0, lockFiles.size());
+   }
+
+  @Test
+  public void testLockLoggingFreqCount() throws Exception {
     Path file1 = new Path(source.toString() + "/file1.txt");
-    createTextFile(file1, 5);
+    createTextFile(file1, 10);
 
-    listDir(source);
+    // 0) config spout to log progress in lock file for each tuple
+    Map conf = getDefaultConfig();
+    conf.put(Configs.COMMIT_FREQ_COUNT, "2");  // 1 lock log entry every 2 tuples
+    conf.put(Configs.COMMIT_FREQ_SEC, "1000"); // make it irrelevant for this test
+    HdfsSpout spout = makeSpout(0, conf, Configs.TEXT);
+
+    // 1) read 5 lines in file,
+    runSpout(spout, "r5");
+
+    // 2) check log file contents
+    String lockFile = listDir(spout.getLockDirPath()).get(0);
+    List<String> lines = readTextFile(fs, lockFile);
+    Assert.assertEquals(lines.size(), 3);
+
+    // 3) read 6th line and see if another log entry was made
+    runSpout(spout, "r1");
+    lines = readTextFile(fs, lockFile);
+    Assert.assertEquals(lines.size(), 4);
+  }
 
+  @Test
+  public void testLockLoggingFreqSec() throws Exception {
+    Path file1 = new Path(source.toString() + "/file1.txt");
+    createTextFile(file1, 10);
+
+    // 0) config spout to log progress in lock file for each tuple
     Map conf = getDefaultConfig();
-    conf.put(Configs.COMMIT_FREQ_COUNT, "1");
-    conf.put(Configs.COMMIT_FREQ_SEC, "1");
+    conf.put(Configs.COMMIT_FREQ_COUNT, "0");  // disable it
+    conf.put(Configs.COMMIT_FREQ_SEC, "2"); // log every 2 sec
+
     HdfsSpout spout = makeSpout(0, conf, Configs.TEXT);
-    List<String> res = runSpout(spout,"r4");
-    for (String re : res) {
-      System.err.println(re);
-    }
-    List<String> lockFiles = listDir(spout.getLockDirPath());
-    Assert.assertEquals(1, lockFiles.size());
-    runSpout(spout, "r3");
-    List<String> lines = readTextFile(fs, lockFiles.get(0));
-    System.err.println(lines);
-    Assert.assertEquals(6, lines.size());
+
+    // 1) read 5 lines in file
+    runSpout(spout, "r5");
+
+    // 2) check log file contents
+    String lockFile = listDir(spout.getLockDirPath()).get(0);
+    List<String> lines = readTextFile(fs, lockFile);
+    Assert.assertEquals(lines.size(), 1);
+    Thread.sleep(3000); // allow freq_sec to expire
+
+    // 3) read another line and see if another log entry was made
+    runSpout(spout, "r1");
+    lines = readTextFile(fs, lockFile);
+    Assert.assertEquals(2, lines.size());
   }
 
   private static List<String> readTextFile(FileSystem fs, String f) throws IOException {
@@ -320,7 +392,7 @@ public class TestHdfsSpout {
   }
 
   /**
-   * Execute a sequence of calls to EventHubSpout.
+   * Execute a sequence of calls on HdfsSpout.
    *
    * @param cmds: set of commands to run,
    * e.g. "r,r,r,r,a1,f2,...". The commands are:
@@ -427,7 +499,8 @@ public class TestHdfsSpout {
 
 
 
-  // Throws exceptions for 2nd and 3rd line read attempt
+  // Throws IOExceptions for 3rd & 4th call to next(), succeeds on 5th, thereafter
+  // throws ParseException. Effectively produces 3 lines (1,2 & 3) from each file read
   static class MockTextFailingReader extends TextFileReader {
     int readAttempts = 0;
 
@@ -438,9 +511,9 @@ public class TestHdfsSpout {
     @Override
     public List<Object> next() throws IOException, ParseException {
       readAttempts++;
-      if (readAttempts == 2) {
+      if (readAttempts == 3 || readAttempts ==4) {
         throw new IOException("mock test exception");
-      } else if (readAttempts >= 3) {
+      } else if (readAttempts > 5 ) {
         throw new ParseException("mock test exception", null);
       }
       return super.next();

http://git-wip-us.apache.org/repos/asf/storm/blob/152856d1/external/storm-hdfs/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/test/resources/log4j.properties b/external/storm-hdfs/src/test/resources/log4j.properties
new file mode 100644
index 0000000..1f92e45
--- /dev/null
+++ b/external/storm-hdfs/src/test/resources/log4j.properties
@@ -0,0 +1,26 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#  http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+
+log4j.rootLogger = WARN, out
+
+log4j.appender.out = org.apache.log4j.ConsoleAppender
+log4j.appender.out.layout = org.apache.log4j.PatternLayout
+log4j.appender.out.layout.ConversionPattern = %d (%t) [%p - %l] %m%n
+
+log4j.logger.org.apache.storm.hdfs = INFO
+