You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pa...@apache.org on 2016/01/14 21:56:02 UTC

[11/20] storm git commit: More tests and tests for FileLock. fixing UT TestHdfsSpout.testSimpleSequenceFile

More tests and tests for FileLock. fixing UT TestHdfsSpout.testSimpleSequenceFile


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/de37de68
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/de37de68
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/de37de68

Branch: refs/heads/master
Commit: de37de68f3a97c6e0d4d6aa38f972fd8d1ecb032
Parents: dcc930b
Author: Roshan Naik <ro...@hortonworks.com>
Authored: Mon Dec 14 16:19:54 2015 -0800
Committer: Roshan Naik <ro...@hortonworks.com>
Committed: Thu Jan 14 11:34:56 2016 -0800

----------------------------------------------------------------------
 .../org/apache/storm/hdfs/common/HdfsUtils.java |   1 -
 .../org/apache/storm/hdfs/spout/FileLock.java   |  47 +++-
 .../org/apache/storm/hdfs/spout/HdfsSpout.java  |   2 +-
 .../apache/storm/hdfs/spout/TestDirLock.java    |  13 +-
 .../apache/storm/hdfs/spout/TestFileLock.java   | 273 ++++++++++++++++++-
 .../apache/storm/hdfs/spout/TestHdfsSpout.java  |  12 +-
 6 files changed, 314 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/de37de68/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/HdfsUtils.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/HdfsUtils.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/HdfsUtils.java
index e8c32aa..0574c6a 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/HdfsUtils.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/HdfsUtils.java
@@ -26,7 +26,6 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
 import org.apache.hadoop.ipc.RemoteException;
-import org.apache.storm.hdfs.spout.DirLock;
 
 import java.io.IOException;
 import java.util.ArrayList;

http://git-wip-us.apache.org/repos/asf/storm/blob/de37de68/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileLock.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileLock.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileLock.java
index 1974e44..76a459d 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileLock.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileLock.java
@@ -23,12 +23,13 @@ import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
+import org.apache.hadoop.ipc.RemoteException;
 import org.apache.storm.hdfs.common.HdfsUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.BufferedReader;
-import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.InputStreamReader;
 import java.util.Collection;
@@ -44,12 +45,12 @@ public class FileLock {
   private final FileSystem fs;
   private final String componentID;
   private final Path lockFile;
-  private final DataOutputStream lockFileStream;
+  private final FSDataOutputStream lockFileStream;
   private LogEntry lastEntry;
 
   private static final Logger log = LoggerFactory.getLogger(DirLock.class);
 
-  private FileLock(FileSystem fs, Path lockFile, DataOutputStream lockFileStream, String spoutId)
+  private FileLock(FileSystem fs, Path lockFile, FSDataOutputStream lockFileStream, String spoutId)
           throws IOException {
     this.fs = fs;
     this.lockFile = lockFile;
@@ -83,7 +84,8 @@ public class FileLock {
       lockFileStream.writeBytes(System.lineSeparator() + line);
     else
       lockFileStream.writeBytes(line);
-    lockFileStream.flush();
+    lockFileStream.hflush();
+
     lastEntry = entry; // update this only after writing to hdfs
   }
 
@@ -125,7 +127,8 @@ public class FileLock {
    */
   public static LogEntry getLastEntryIfStale(FileSystem fs, Path lockFile, long olderThan)
           throws IOException {
-    if( fs.getFileStatus(lockFile).getModificationTime() >= olderThan ) {
+    long modifiedTime = fs.getFileStatus(lockFile).getModificationTime();
+    if( modifiedTime <= olderThan ) { // look
       //Impt: HDFS timestamp may not reflect recent appends, so we double check the
       // timestamp in last line of file to see when the last update was made
       LogEntry lastEntry =  getLastEntry(fs, lockFile);
@@ -158,18 +161,28 @@ public class FileLock {
 
   // takes ownership of the lock file
   /**
-   * Takes ownership of the lock file.
+   * Takes ownership of the lock file if possible.
    * @param lockFile
    * @param lastEntry   last entry in the lock file. this param is an optimization.
    *                    we dont scan the lock file again to find its last entry here since
    *                    its already been done once by the logic used to check if the lock
    *                    file is stale. so this value comes from that earlier scan.
    * @param spoutId     spout id
-   * @return
+   * @throws IOException if unable to acquire
+   * @return null if lock File is being used by another thread
    */
   public static FileLock takeOwnership(FileSystem fs, Path lockFile, LogEntry lastEntry, String spoutId)
           throws IOException {
-    return new FileLock(fs, lockFile, spoutId, lastEntry);
+    try {
+      return new FileLock(fs, lockFile, spoutId, lastEntry);
+    } catch (RemoteException e) {
+      if (e.getClassName().contentEquals(AlreadyBeingCreatedException.class.getName())) {
+        log.info("Lock file {} is currently open. cannot transfer ownership on.", lockFile);
+        return null;
+      } else { // unexpected error
+        throw e;
+      }
+    }
   }
 
   /**
@@ -188,15 +201,19 @@ public class FileLock {
     long olderThan = System.currentTimeMillis() - (locktimeoutSec*1000);
     Collection<Path> listing = HdfsUtils.listFilesByModificationTime(fs, lockFilesDir, olderThan);
 
-    // locate oldest expired lock file (if any) and take ownership
+    // locate expired lock files (if any). Try to take ownership (oldest lock first)
     for (Path file : listing) {
       if(file.getName().equalsIgnoreCase( DirLock.DIR_LOCK_FILE) )
         continue;
       LogEntry lastEntry = getLastEntryIfStale(fs, file, olderThan);
-      if(lastEntry!=null)
-        return FileLock.takeOwnership(fs, file, lastEntry, spoutId);
+      if(lastEntry!=null) {
+        FileLock lock = FileLock.takeOwnership(fs, file, lastEntry, spoutId);
+        if(lock!=null)
+          return lock;
+      }
     }
-    log.info("No abandoned files found");
+    if(listing.isEmpty())
+      log.info("No abandoned files to be refound");
     return null;
   }
 
@@ -209,14 +226,14 @@ public class FileLock {
    * @param fs
    * @param lockFilesDir
    * @param locktimeoutSec
-   * @param spoutId
    * @return a Pair<lock file path, last entry in lock file> .. if expired lock file found
    * @throws IOException
    */
-  public static HdfsUtils.Pair<Path,LogEntry> locateOldestExpiredLock(FileSystem fs, Path lockFilesDir, int locktimeoutSec, String spoutId)
+  public static HdfsUtils.Pair<Path,LogEntry> locateOldestExpiredLock(FileSystem fs, Path lockFilesDir, int locktimeoutSec)
           throws IOException {
     // list files
-    long olderThan = System.currentTimeMillis() - (locktimeoutSec*1000);
+    long now =  System.currentTimeMillis();
+    long olderThan = now - (locktimeoutSec*1000);
     Collection<Path> listing = HdfsUtils.listFilesByModificationTime(fs, lockFilesDir, olderThan);
 
     // locate oldest expired lock file (if any) and take ownership

http://git-wip-us.apache.org/repos/asf/storm/blob/de37de68/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java
index d8aa3f4..7977b96 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java
@@ -504,7 +504,7 @@ public class HdfsSpout extends BaseRichSpout {
       // 3 - if clocks are not in sync ..
       if( lastExpiredLock == null ) {
         // just make a note of the oldest expired lock now and check if its still unmodified after lockTimeoutSec
-        lastExpiredLock = FileLock.locateOldestExpiredLock(hdfs, lockDirPath, lockTimeoutSec, spoutId);
+        lastExpiredLock = FileLock.locateOldestExpiredLock(hdfs, lockDirPath, lockTimeoutSec);
         lastExpiredLockTime = System.currentTimeMillis();
         return null;
       }

http://git-wip-us.apache.org/repos/asf/storm/blob/de37de68/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestDirLock.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestDirLock.java b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestDirLock.java
index bdb0cdf..667248e 100644
--- a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestDirLock.java
+++ b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestDirLock.java
@@ -127,18 +127,21 @@ public class TestDirLock {
 
   class DirLockingThread extends Thread {
 
+    private int thdNum;
     private final FileSystem fs;
     private final Path dir;
     public boolean cleanExit = false;
 
-    public DirLockingThread(int thdNum,FileSystem fs, Path dir) throws IOException {
+    public DirLockingThread(int thdNum,FileSystem fs, Path dir)
+            throws IOException {
+      this.thdNum = thdNum;
       this.fs = fs;
       this.dir = dir;
-      Thread.currentThread().setName("DirLockingThread-" + thdNum);
     }
 
     @Override
     public void run() {
+      Thread.currentThread().setName("DirLockingThread-" + thdNum);
       DirLock lock = null;
       try {
         do {
@@ -146,7 +149,7 @@ public class TestDirLock {
           lock = DirLock.tryLock(fs, dir);
           System.err.println("Acquired lock " + getName());
           if(lock==null) {
-            System.out.println("Retrying lock - " + Thread.currentThread().getId());
+            System.out.println("Retrying lock - " + getName());
           }
         } while (lock==null);
         cleanExit= true;
@@ -164,7 +167,7 @@ public class TestDirLock {
           }
       }
       System.err.println("Thread exiting " + getName());
-    }
+    } // run()
 
-  }
+  } // class DirLockingThread
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/de37de68/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestFileLock.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestFileLock.java b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestFileLock.java
index 8031041..1f22a5b 100644
--- a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestFileLock.java
+++ b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestFileLock.java
@@ -20,10 +20,12 @@ package org.apache.storm.hdfs.spout;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.storm.hdfs.common.HdfsUtils;
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Assert;
@@ -31,7 +33,11 @@ import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
+import java.io.BufferedReader;
+import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.ArrayList;
 
 public class TestFileLock {
 
@@ -41,8 +47,8 @@ public class TestFileLock {
   static String hdfsURI;
   static HdfsConfiguration conf = new  HdfsConfiguration();
 
-  private Path filesDir = new Path("/tmp/lockdir");
-  private Path locksDir = new Path("/tmp/lockdir");
+  private Path filesDir = new Path("/tmp/filesdir");
+  private Path locksDir = new Path("/tmp/locskdir");
 
   @BeforeClass
   public static void setupClass() throws IOException {
@@ -70,7 +76,7 @@ public class TestFileLock {
   }
 
   @Test
-  public void testBasic() throws Exception {
+  public void testBasicLocking() throws Exception {
   // create empty files in filesDir
     Path file1 = new Path(filesDir + Path.SEPARATOR + "file1");
     Path file2 = new Path(filesDir + Path.SEPARATOR + "file2");
@@ -82,7 +88,7 @@ public class TestFileLock {
     Assert.assertNotNull(lock1a);
     Assert.assertTrue(fs.exists(lock1a.getLockFile()));
     Assert.assertEquals(lock1a.getLockFile().getParent(), locksDir); // verify lock file location
-    Assert.assertEquals(lock1a.getLockFile().getName(), file1.getName()); // very lock filename
+    Assert.assertEquals(lock1a.getLockFile().getName(), file1.getName()); // verify lock filename
 
     // acquire another lock on file1 and verify it failed
     FileLock lock1b = FileLock.tryLock(fs, file1, locksDir, "spout1");
@@ -97,14 +103,14 @@ public class TestFileLock {
     Assert.assertNotNull(lock1c);
     Assert.assertTrue(fs.exists(lock1c.getLockFile()));
     Assert.assertEquals(lock1c.getLockFile().getParent(), locksDir); // verify lock file location
-    Assert.assertEquals(lock1c.getLockFile().getName(), file1.getName()); // very lock filename
+    Assert.assertEquals(lock1c.getLockFile().getName(), file1.getName()); // verify lock filename
 
     // try locking another file2 at the same time
     FileLock lock2a = FileLock.tryLock(fs, file2, locksDir, "spout1");
     Assert.assertNotNull(lock2a);
     Assert.assertTrue(fs.exists(lock2a.getLockFile()));
     Assert.assertEquals(lock2a.getLockFile().getParent(), locksDir); // verify lock file location
-    Assert.assertEquals(lock2a.getLockFile().getName(), file1.getName()); // very lock filename
+    Assert.assertEquals(lock2a.getLockFile().getName(), file2.getName()); // verify lock filename
 
     // release both locks
     lock2a.release();
@@ -113,5 +119,260 @@ public class TestFileLock {
     Assert.assertFalse(fs.exists(lock1c.getLockFile()));
   }
 
+  @Test
+  public void testHeartbeat() throws Exception {
+    Path file1 = new Path(filesDir + Path.SEPARATOR + "file1");
+    fs.create(file1).close();
+
+    // acquire lock on file1
+    FileLock lock1 = FileLock.tryLock(fs, file1, locksDir, "spout1");
+    Assert.assertNotNull(lock1);
+    Assert.assertTrue(fs.exists(lock1.getLockFile()));
+
+    ArrayList<String> lines = readTextFile(lock1.getLockFile());
+    Assert.assertEquals("heartbeats appear to be missing", 1, lines.size());
+
+    // hearbeat upon it
+    lock1.heartbeat("1");
+    lock1.heartbeat("2");
+    lock1.heartbeat("3");
+
+    lines = readTextFile(lock1.getLockFile());
+    Assert.assertEquals("heartbeats appear to be missing", 4, lines.size());
+
+    lock1.heartbeat("4");
+    lock1.heartbeat("5");
+    lock1.heartbeat("6");
+
+    lines = readTextFile(lock1.getLockFile());
+    Assert.assertEquals("heartbeats appear to be missing", 7,  lines.size());
+
+    lock1.release();
+    lines = readTextFile(lock1.getLockFile());
+    Assert.assertNull(lines);
+    Assert.assertFalse(fs.exists(lock1.getLockFile()));
+  }
+
+  @Test
+  public void testConcurrentLocking() throws IOException, InterruptedException {
+    Path file1 = new Path(filesDir + Path.SEPARATOR + "file1");
+    fs.create(file1).close();
+
+    FileLockingThread[] thds = startThreads(100, file1, locksDir);
+    for (FileLockingThread thd : thds) {
+      thd.join();
+      if( !thd.cleanExit)
+        System.err.println(thd.getName() + " did not exit cleanly");
+      Assert.assertTrue(thd.cleanExit);
+    }
+
+    Path lockFile = new Path(locksDir + Path.SEPARATOR + file1.getName());
+    Assert.assertFalse(fs.exists(lockFile));
+  }
+
+  private FileLockingThread[] startThreads(int thdCount, Path fileToLock, Path locksDir)
+          throws IOException {
+    FileLockingThread[] result = new FileLockingThread[thdCount];
+    for (int i = 0; i < thdCount; i++) {
+      result[i] = new FileLockingThread(i, fs, fileToLock, locksDir, "spout" + Integer.toString(i));
+    }
+
+    for (FileLockingThread thd : result) {
+      thd.start();
+    }
+    return result;
+  }
+
+
+  @Test
+  public void testStaleLockDetection_SingleLock() throws Exception {
+    final int LOCK_EXPIRY_SEC = 1;
+    final int WAIT_MSEC = 1500;
+    Path file1 = new Path(filesDir + Path.SEPARATOR + "file1");
+    fs.create(file1).close();
+    FileLock lock1 = FileLock.tryLock(fs, file1, locksDir, "spout1");
+    try {
+      // acquire lock on file1
+      Assert.assertNotNull(lock1);
+      Assert.assertTrue(fs.exists(lock1.getLockFile()));
+      Thread.sleep(WAIT_MSEC);   // wait for lock to expire
+      HdfsUtils.Pair<Path, FileLock.LogEntry> expired = FileLock.locateOldestExpiredLock(fs, locksDir, LOCK_EXPIRY_SEC);
+      Assert.assertNotNull(expired);
+
+      // heartbeat, ensure its no longer stale and read back the heartbeat data
+      lock1.heartbeat("1");
+      expired = FileLock.locateOldestExpiredLock(fs, locksDir, 1);
+      Assert.assertNull(expired);
+
+      FileLock.LogEntry lastEntry = lock1.getLastLogEntry();
+      Assert.assertNotNull(lastEntry);
+      Assert.assertEquals("1", lastEntry.fileOffset);
+
+      // wait and check for expiry again
+      Thread.sleep(WAIT_MSEC);
+      expired = FileLock.locateOldestExpiredLock(fs, locksDir, LOCK_EXPIRY_SEC);
+      Assert.assertNotNull(expired);
+    } finally {
+      lock1.release();
+      fs.delete(file1, false);
+    }
+  }
+
+  @Test
+  public void testStaleLockDetection_MultipleLocks() throws Exception {
+    final int LOCK_EXPIRY_SEC = 1;
+    final int WAIT_MSEC = 1500;
+    Path file1 = new Path(filesDir + Path.SEPARATOR + "file1");
+    Path file2 = new Path(filesDir + Path.SEPARATOR + "file2");
+    Path file3 = new Path(filesDir + Path.SEPARATOR + "file3");
+
+    fs.create(file1).close();
+    fs.create(file2).close();
+    fs.create(file3).close();
+
+    // 1) acquire locks on file1,file2,file3
+    FileLock lock1 = FileLock.tryLock(fs, file1, locksDir, "spout1");
+    FileLock lock2 = FileLock.tryLock(fs, file2, locksDir, "spout2");
+    FileLock lock3 = FileLock.tryLock(fs, file3, locksDir, "spout3");
+    Assert.assertNotNull(lock1);
+    Assert.assertNotNull(lock2);
+    Assert.assertNotNull(lock3);
+
+    try {
+      HdfsUtils.Pair<Path, FileLock.LogEntry> expired = FileLock.locateOldestExpiredLock(fs, locksDir, LOCK_EXPIRY_SEC);
+      Assert.assertNull(expired);
+
+      // 2) wait for all 3 locks to expire then heart beat on 2 locks and verify stale lock
+      Thread.sleep(WAIT_MSEC);
+      lock1.heartbeat("1");
+      lock2.heartbeat("1");
+
+      expired = FileLock.locateOldestExpiredLock(fs, locksDir, LOCK_EXPIRY_SEC);
+      Assert.assertNotNull(expired);
+      Assert.assertEquals("spout3", expired.getValue().componentID);
+    } finally {
+      lock1.release();
+      lock2.release();
+      lock3.release();
+      fs.delete(file1, false);
+      fs.delete(file2, false);
+      fs.delete(file3, false);
+    }
+  }
+
+  @Test
+  public void testStaleLockRecovery() throws Exception {
+    final int LOCK_EXPIRY_SEC = 1;
+    final int WAIT_MSEC = 1500;
+    Path file1 = new Path(filesDir + Path.SEPARATOR + "file1");
+    Path file2 = new Path(filesDir + Path.SEPARATOR + "file2");
+    Path file3 = new Path(filesDir + Path.SEPARATOR + "file3");
+
+    fs.create(file1).close();
+    fs.create(file2).close();
+    fs.create(file3).close();
+
+    // 1) acquire locks on file1,file2,file3
+    FileLock lock1 = FileLock.tryLock(fs, file1, locksDir, "spout1");
+    FileLock lock2 = FileLock.tryLock(fs, file2, locksDir, "spout2");
+    FileLock lock3 = FileLock.tryLock(fs, file3, locksDir, "spout3");
+    Assert.assertNotNull(lock1);
+    Assert.assertNotNull(lock2);
+    Assert.assertNotNull(lock3);
+
+    try {
+      HdfsUtils.Pair<Path, FileLock.LogEntry> expired = FileLock.locateOldestExpiredLock(fs, locksDir, LOCK_EXPIRY_SEC);
+      Assert.assertNull(expired);
+
+      // 2) wait for all 3 locks to expire then heart beat on 2 locks
+      Thread.sleep(WAIT_MSEC);
+      lock1.heartbeat("1");
+      lock2.heartbeat("1");
+
+      //todo: configure the HDFS lease timeout
+
+      // 3) Take ownership of stale lock
+      FileLock lock3b = FileLock.acquireOldestExpiredLock(fs, locksDir, LOCK_EXPIRY_SEC, "spout1");
+//      Assert.assertNotNull(lock3b);
+//      Assert.assertEquals("Expected lock3 file", lock3b.getLockFile(), lock3.getLockFile());
+    }finally {
+      lock1.release();
+      lock2.release();
+      lock3.release();
+      fs.delete(file1, false);
+      fs.delete(file2, false);
+      fs.delete(file3, false);
+    }
+  }
+
+  /** return null if file not found */
+  private ArrayList<String> readTextFile(Path file) throws IOException {
+    FSDataInputStream os = null;
+    try {
+      os = fs.open(file);
+      if (os == null)
+        return null;
+      BufferedReader reader = new BufferedReader(new InputStreamReader(os));
+      ArrayList<String> lines = new ArrayList<>();
+      for (String line = reader.readLine(); line != null; line = reader.readLine()) {
+        lines.add(line);
+      }
+      return lines;
+    } catch( FileNotFoundException e) {
+      return null;
+    } finally {
+      if(os!=null)
+        os.close();
+    }
+  }
+
+  class FileLockingThread extends Thread {
+
+    private int thdNum;
+    private final FileSystem fs;
+    public boolean cleanExit = false;
+    private Path fileToLock;
+    private Path locksDir;
+    private String spoutId;
+
+    public FileLockingThread(int thdNum, FileSystem fs, Path fileToLock, Path locksDir, String spoutId)
+            throws IOException {
+      this.thdNum = thdNum;
+      this.fs = fs;
+      this.fileToLock = fileToLock;
+      this.locksDir = locksDir;
+      this.spoutId = spoutId;
+    }
+
+    @Override
+    public void run() {
+      Thread.currentThread().setName("FileLockingThread-" + thdNum);
+      FileLock lock = null;
+      try {
+        do {
+          System.err.println("Trying lock - " + getName());
+          lock = FileLock.tryLock(fs, this.fileToLock, this.locksDir, spoutId);
+          System.err.println("Acquired lock - " + getName());
+          if(lock==null) {
+            System.out.println("Retrying lock - " + getName());
+          }
+        } while (lock==null);
+        cleanExit= true;
+      } catch (Exception e) {
+        e.printStackTrace();
+      }
+      finally {
+        try {
+          if(lock!=null) {
+            lock.release();
+            System.err.println("Released lock - " + getName());
+          }
+        } catch (IOException e) {
+          e.printStackTrace(System.err);
+        }
+      }
+      System.err.println("Thread exiting - " + getName());
+    } // run()
 
+  } // class FileLockingThread
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/de37de68/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSpout.java b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSpout.java
index 9200c90..d967572 100644
--- a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSpout.java
+++ b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSpout.java
@@ -175,17 +175,17 @@ public class TestHdfsSpout {
     ArrayList<String> result = new ArrayList<>();
 
     for (Path seqFile : seqFiles) {
-      FSDataInputStream istream = fs.open(seqFile);
+      Path file = new Path(fs.getUri().toString() + seqFile.toString());
+      SequenceFile.Reader reader = new SequenceFile.Reader(conf, SequenceFile.Reader.file(file));
       try {
-        SequenceFile.Reader reader = new SequenceFile.Reader(conf, SequenceFile.Reader.file(seqFile));
         Writable key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
         Writable value = (Writable) ReflectionUtils.newInstance(reader.getValueClass(), conf);
-        while (reader.next(key, value) ) {
-          String keyValStr = Arrays.asList(key,value).toString();
+        while (reader.next(key, value)) {
+          String keyValStr = Arrays.asList(key, value).toString();
           result.add(keyValStr);
         }
       } finally {
-        istream.close();
+        reader.close();
       }
     }// for
     return result;
@@ -235,7 +235,7 @@ public class TestHdfsSpout {
       System.err.println(re);
     }
 
-    listDir(source);
+    listDir(archive);
 
 
     Path f1 = new Path(archive + "/file1.seq");