You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pa...@apache.org on 2016/01/14 21:56:02 UTC
[11/20] storm git commit: More tests and tests for FileLock. fixing
UT TestHdfsSpout.testSimpleSequenceFile
More tests and tests for FileLock. fixing UT TestHdfsSpout.testSimpleSequenceFile
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/de37de68
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/de37de68
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/de37de68
Branch: refs/heads/master
Commit: de37de68f3a97c6e0d4d6aa38f972fd8d1ecb032
Parents: dcc930b
Author: Roshan Naik <ro...@hortonworks.com>
Authored: Mon Dec 14 16:19:54 2015 -0800
Committer: Roshan Naik <ro...@hortonworks.com>
Committed: Thu Jan 14 11:34:56 2016 -0800
----------------------------------------------------------------------
.../org/apache/storm/hdfs/common/HdfsUtils.java | 1 -
.../org/apache/storm/hdfs/spout/FileLock.java | 47 +++-
.../org/apache/storm/hdfs/spout/HdfsSpout.java | 2 +-
.../apache/storm/hdfs/spout/TestDirLock.java | 13 +-
.../apache/storm/hdfs/spout/TestFileLock.java | 273 ++++++++++++++++++-
.../apache/storm/hdfs/spout/TestHdfsSpout.java | 12 +-
6 files changed, 314 insertions(+), 34 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/de37de68/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/HdfsUtils.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/HdfsUtils.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/HdfsUtils.java
index e8c32aa..0574c6a 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/HdfsUtils.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/HdfsUtils.java
@@ -26,7 +26,6 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
import org.apache.hadoop.ipc.RemoteException;
-import org.apache.storm.hdfs.spout.DirLock;
import java.io.IOException;
import java.util.ArrayList;
http://git-wip-us.apache.org/repos/asf/storm/blob/de37de68/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileLock.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileLock.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileLock.java
index 1974e44..76a459d 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileLock.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileLock.java
@@ -23,12 +23,13 @@ import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
+import org.apache.hadoop.ipc.RemoteException;
import org.apache.storm.hdfs.common.HdfsUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.BufferedReader;
-import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.Collection;
@@ -44,12 +45,12 @@ public class FileLock {
private final FileSystem fs;
private final String componentID;
private final Path lockFile;
- private final DataOutputStream lockFileStream;
+ private final FSDataOutputStream lockFileStream;
private LogEntry lastEntry;
private static final Logger log = LoggerFactory.getLogger(DirLock.class);
- private FileLock(FileSystem fs, Path lockFile, DataOutputStream lockFileStream, String spoutId)
+ private FileLock(FileSystem fs, Path lockFile, FSDataOutputStream lockFileStream, String spoutId)
throws IOException {
this.fs = fs;
this.lockFile = lockFile;
@@ -83,7 +84,8 @@ public class FileLock {
lockFileStream.writeBytes(System.lineSeparator() + line);
else
lockFileStream.writeBytes(line);
- lockFileStream.flush();
+ lockFileStream.hflush();
+
lastEntry = entry; // update this only after writing to hdfs
}
@@ -125,7 +127,8 @@ public class FileLock {
*/
public static LogEntry getLastEntryIfStale(FileSystem fs, Path lockFile, long olderThan)
throws IOException {
- if( fs.getFileStatus(lockFile).getModificationTime() >= olderThan ) {
+ long modifiedTime = fs.getFileStatus(lockFile).getModificationTime();
+ if( modifiedTime <= olderThan ) { // look
//Impt: HDFS timestamp may not reflect recent appends, so we double check the
// timestamp in last line of file to see when the last update was made
LogEntry lastEntry = getLastEntry(fs, lockFile);
@@ -158,18 +161,28 @@ public class FileLock {
// takes ownership of the lock file
/**
- * Takes ownership of the lock file.
+ * Takes ownership of the lock file if possible.
* @param lockFile
* @param lastEntry last entry in the lock file. this param is an optimization.
* we dont scan the lock file again to find its last entry here since
* its already been done once by the logic used to check if the lock
* file is stale. so this value comes from that earlier scan.
* @param spoutId spout id
- * @return
+ * @throws IOException if unable to acquire
+ * @return null if lock File is being used by another thread
*/
public static FileLock takeOwnership(FileSystem fs, Path lockFile, LogEntry lastEntry, String spoutId)
throws IOException {
- return new FileLock(fs, lockFile, spoutId, lastEntry);
+ try {
+ return new FileLock(fs, lockFile, spoutId, lastEntry);
+ } catch (RemoteException e) {
+ if (e.getClassName().contentEquals(AlreadyBeingCreatedException.class.getName())) {
+ log.info("Lock file {} is currently open. cannot transfer ownership on.", lockFile);
+ return null;
+ } else { // unexpected error
+ throw e;
+ }
+ }
}
/**
@@ -188,15 +201,19 @@ public class FileLock {
long olderThan = System.currentTimeMillis() - (locktimeoutSec*1000);
Collection<Path> listing = HdfsUtils.listFilesByModificationTime(fs, lockFilesDir, olderThan);
- // locate oldest expired lock file (if any) and take ownership
+ // locate expired lock files (if any). Try to take ownership (oldest lock first)
for (Path file : listing) {
if(file.getName().equalsIgnoreCase( DirLock.DIR_LOCK_FILE) )
continue;
LogEntry lastEntry = getLastEntryIfStale(fs, file, olderThan);
- if(lastEntry!=null)
- return FileLock.takeOwnership(fs, file, lastEntry, spoutId);
+ if(lastEntry!=null) {
+ FileLock lock = FileLock.takeOwnership(fs, file, lastEntry, spoutId);
+ if(lock!=null)
+ return lock;
+ }
}
- log.info("No abandoned files found");
+ if(listing.isEmpty())
+ log.info("No abandoned files to be refound");
return null;
}
@@ -209,14 +226,14 @@ public class FileLock {
* @param fs
* @param lockFilesDir
* @param locktimeoutSec
- * @param spoutId
* @return a Pair<lock file path, last entry in lock file> .. if expired lock file found
* @throws IOException
*/
- public static HdfsUtils.Pair<Path,LogEntry> locateOldestExpiredLock(FileSystem fs, Path lockFilesDir, int locktimeoutSec, String spoutId)
+ public static HdfsUtils.Pair<Path,LogEntry> locateOldestExpiredLock(FileSystem fs, Path lockFilesDir, int locktimeoutSec)
throws IOException {
// list files
- long olderThan = System.currentTimeMillis() - (locktimeoutSec*1000);
+ long now = System.currentTimeMillis();
+ long olderThan = now - (locktimeoutSec*1000);
Collection<Path> listing = HdfsUtils.listFilesByModificationTime(fs, lockFilesDir, olderThan);
// locate oldest expired lock file (if any) and take ownership
http://git-wip-us.apache.org/repos/asf/storm/blob/de37de68/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java
index d8aa3f4..7977b96 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java
@@ -504,7 +504,7 @@ public class HdfsSpout extends BaseRichSpout {
// 3 - if clocks are not in sync ..
if( lastExpiredLock == null ) {
// just make a note of the oldest expired lock now and check if its still unmodified after lockTimeoutSec
- lastExpiredLock = FileLock.locateOldestExpiredLock(hdfs, lockDirPath, lockTimeoutSec, spoutId);
+ lastExpiredLock = FileLock.locateOldestExpiredLock(hdfs, lockDirPath, lockTimeoutSec);
lastExpiredLockTime = System.currentTimeMillis();
return null;
}
http://git-wip-us.apache.org/repos/asf/storm/blob/de37de68/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestDirLock.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestDirLock.java b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestDirLock.java
index bdb0cdf..667248e 100644
--- a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestDirLock.java
+++ b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestDirLock.java
@@ -127,18 +127,21 @@ public class TestDirLock {
class DirLockingThread extends Thread {
+ private int thdNum;
private final FileSystem fs;
private final Path dir;
public boolean cleanExit = false;
- public DirLockingThread(int thdNum,FileSystem fs, Path dir) throws IOException {
+ public DirLockingThread(int thdNum,FileSystem fs, Path dir)
+ throws IOException {
+ this.thdNum = thdNum;
this.fs = fs;
this.dir = dir;
- Thread.currentThread().setName("DirLockingThread-" + thdNum);
}
@Override
public void run() {
+ Thread.currentThread().setName("DirLockingThread-" + thdNum);
DirLock lock = null;
try {
do {
@@ -146,7 +149,7 @@ public class TestDirLock {
lock = DirLock.tryLock(fs, dir);
System.err.println("Acquired lock " + getName());
if(lock==null) {
- System.out.println("Retrying lock - " + Thread.currentThread().getId());
+ System.out.println("Retrying lock - " + getName());
}
} while (lock==null);
cleanExit= true;
@@ -164,7 +167,7 @@ public class TestDirLock {
}
}
System.err.println("Thread exiting " + getName());
- }
+ } // run()
- }
+ } // class DirLockingThread
}
http://git-wip-us.apache.org/repos/asf/storm/blob/de37de68/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestFileLock.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestFileLock.java b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestFileLock.java
index 8031041..1f22a5b 100644
--- a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestFileLock.java
+++ b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestFileLock.java
@@ -20,10 +20,12 @@ package org.apache.storm.hdfs.spout;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.storm.hdfs.common.HdfsUtils;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
@@ -31,7 +33,11 @@ import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
+import java.io.BufferedReader;
+import java.io.FileNotFoundException;
import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.ArrayList;
public class TestFileLock {
@@ -41,8 +47,8 @@ public class TestFileLock {
static String hdfsURI;
static HdfsConfiguration conf = new HdfsConfiguration();
- private Path filesDir = new Path("/tmp/lockdir");
- private Path locksDir = new Path("/tmp/lockdir");
+ private Path filesDir = new Path("/tmp/filesdir");
+ private Path locksDir = new Path("/tmp/locskdir");
@BeforeClass
public static void setupClass() throws IOException {
@@ -70,7 +76,7 @@ public class TestFileLock {
}
@Test
- public void testBasic() throws Exception {
+ public void testBasicLocking() throws Exception {
// create empty files in filesDir
Path file1 = new Path(filesDir + Path.SEPARATOR + "file1");
Path file2 = new Path(filesDir + Path.SEPARATOR + "file2");
@@ -82,7 +88,7 @@ public class TestFileLock {
Assert.assertNotNull(lock1a);
Assert.assertTrue(fs.exists(lock1a.getLockFile()));
Assert.assertEquals(lock1a.getLockFile().getParent(), locksDir); // verify lock file location
- Assert.assertEquals(lock1a.getLockFile().getName(), file1.getName()); // very lock filename
+ Assert.assertEquals(lock1a.getLockFile().getName(), file1.getName()); // verify lock filename
// acquire another lock on file1 and verify it failed
FileLock lock1b = FileLock.tryLock(fs, file1, locksDir, "spout1");
@@ -97,14 +103,14 @@ public class TestFileLock {
Assert.assertNotNull(lock1c);
Assert.assertTrue(fs.exists(lock1c.getLockFile()));
Assert.assertEquals(lock1c.getLockFile().getParent(), locksDir); // verify lock file location
- Assert.assertEquals(lock1c.getLockFile().getName(), file1.getName()); // very lock filename
+ Assert.assertEquals(lock1c.getLockFile().getName(), file1.getName()); // verify lock filename
// try locking another file2 at the same time
FileLock lock2a = FileLock.tryLock(fs, file2, locksDir, "spout1");
Assert.assertNotNull(lock2a);
Assert.assertTrue(fs.exists(lock2a.getLockFile()));
Assert.assertEquals(lock2a.getLockFile().getParent(), locksDir); // verify lock file location
- Assert.assertEquals(lock2a.getLockFile().getName(), file1.getName()); // very lock filename
+ Assert.assertEquals(lock2a.getLockFile().getName(), file2.getName()); // verify lock filename
// release both locks
lock2a.release();
@@ -113,5 +119,260 @@ public class TestFileLock {
Assert.assertFalse(fs.exists(lock1c.getLockFile()));
}
+ @Test
+ public void testHeartbeat() throws Exception {
+ Path file1 = new Path(filesDir + Path.SEPARATOR + "file1");
+ fs.create(file1).close();
+
+ // acquire lock on file1
+ FileLock lock1 = FileLock.tryLock(fs, file1, locksDir, "spout1");
+ Assert.assertNotNull(lock1);
+ Assert.assertTrue(fs.exists(lock1.getLockFile()));
+
+ ArrayList<String> lines = readTextFile(lock1.getLockFile());
+ Assert.assertEquals("heartbeats appear to be missing", 1, lines.size());
+
+ // hearbeat upon it
+ lock1.heartbeat("1");
+ lock1.heartbeat("2");
+ lock1.heartbeat("3");
+
+ lines = readTextFile(lock1.getLockFile());
+ Assert.assertEquals("heartbeats appear to be missing", 4, lines.size());
+
+ lock1.heartbeat("4");
+ lock1.heartbeat("5");
+ lock1.heartbeat("6");
+
+ lines = readTextFile(lock1.getLockFile());
+ Assert.assertEquals("heartbeats appear to be missing", 7, lines.size());
+
+ lock1.release();
+ lines = readTextFile(lock1.getLockFile());
+ Assert.assertNull(lines);
+ Assert.assertFalse(fs.exists(lock1.getLockFile()));
+ }
+
+ @Test
+ public void testConcurrentLocking() throws IOException, InterruptedException {
+ Path file1 = new Path(filesDir + Path.SEPARATOR + "file1");
+ fs.create(file1).close();
+
+ FileLockingThread[] thds = startThreads(100, file1, locksDir);
+ for (FileLockingThread thd : thds) {
+ thd.join();
+ if( !thd.cleanExit)
+ System.err.println(thd.getName() + " did not exit cleanly");
+ Assert.assertTrue(thd.cleanExit);
+ }
+
+ Path lockFile = new Path(locksDir + Path.SEPARATOR + file1.getName());
+ Assert.assertFalse(fs.exists(lockFile));
+ }
+
+ private FileLockingThread[] startThreads(int thdCount, Path fileToLock, Path locksDir)
+ throws IOException {
+ FileLockingThread[] result = new FileLockingThread[thdCount];
+ for (int i = 0; i < thdCount; i++) {
+ result[i] = new FileLockingThread(i, fs, fileToLock, locksDir, "spout" + Integer.toString(i));
+ }
+
+ for (FileLockingThread thd : result) {
+ thd.start();
+ }
+ return result;
+ }
+
+
+ @Test
+ public void testStaleLockDetection_SingleLock() throws Exception {
+ final int LOCK_EXPIRY_SEC = 1;
+ final int WAIT_MSEC = 1500;
+ Path file1 = new Path(filesDir + Path.SEPARATOR + "file1");
+ fs.create(file1).close();
+ FileLock lock1 = FileLock.tryLock(fs, file1, locksDir, "spout1");
+ try {
+ // acquire lock on file1
+ Assert.assertNotNull(lock1);
+ Assert.assertTrue(fs.exists(lock1.getLockFile()));
+ Thread.sleep(WAIT_MSEC); // wait for lock to expire
+ HdfsUtils.Pair<Path, FileLock.LogEntry> expired = FileLock.locateOldestExpiredLock(fs, locksDir, LOCK_EXPIRY_SEC);
+ Assert.assertNotNull(expired);
+
+ // heartbeat, ensure its no longer stale and read back the heartbeat data
+ lock1.heartbeat("1");
+ expired = FileLock.locateOldestExpiredLock(fs, locksDir, 1);
+ Assert.assertNull(expired);
+
+ FileLock.LogEntry lastEntry = lock1.getLastLogEntry();
+ Assert.assertNotNull(lastEntry);
+ Assert.assertEquals("1", lastEntry.fileOffset);
+
+ // wait and check for expiry again
+ Thread.sleep(WAIT_MSEC);
+ expired = FileLock.locateOldestExpiredLock(fs, locksDir, LOCK_EXPIRY_SEC);
+ Assert.assertNotNull(expired);
+ } finally {
+ lock1.release();
+ fs.delete(file1, false);
+ }
+ }
+
+ @Test
+ public void testStaleLockDetection_MultipleLocks() throws Exception {
+ final int LOCK_EXPIRY_SEC = 1;
+ final int WAIT_MSEC = 1500;
+ Path file1 = new Path(filesDir + Path.SEPARATOR + "file1");
+ Path file2 = new Path(filesDir + Path.SEPARATOR + "file2");
+ Path file3 = new Path(filesDir + Path.SEPARATOR + "file3");
+
+ fs.create(file1).close();
+ fs.create(file2).close();
+ fs.create(file3).close();
+
+ // 1) acquire locks on file1,file2,file3
+ FileLock lock1 = FileLock.tryLock(fs, file1, locksDir, "spout1");
+ FileLock lock2 = FileLock.tryLock(fs, file2, locksDir, "spout2");
+ FileLock lock3 = FileLock.tryLock(fs, file3, locksDir, "spout3");
+ Assert.assertNotNull(lock1);
+ Assert.assertNotNull(lock2);
+ Assert.assertNotNull(lock3);
+
+ try {
+ HdfsUtils.Pair<Path, FileLock.LogEntry> expired = FileLock.locateOldestExpiredLock(fs, locksDir, LOCK_EXPIRY_SEC);
+ Assert.assertNull(expired);
+
+ // 2) wait for all 3 locks to expire then heart beat on 2 locks and verify stale lock
+ Thread.sleep(WAIT_MSEC);
+ lock1.heartbeat("1");
+ lock2.heartbeat("1");
+
+ expired = FileLock.locateOldestExpiredLock(fs, locksDir, LOCK_EXPIRY_SEC);
+ Assert.assertNotNull(expired);
+ Assert.assertEquals("spout3", expired.getValue().componentID);
+ } finally {
+ lock1.release();
+ lock2.release();
+ lock3.release();
+ fs.delete(file1, false);
+ fs.delete(file2, false);
+ fs.delete(file3, false);
+ }
+ }
+
+ @Test
+ public void testStaleLockRecovery() throws Exception {
+ final int LOCK_EXPIRY_SEC = 1;
+ final int WAIT_MSEC = 1500;
+ Path file1 = new Path(filesDir + Path.SEPARATOR + "file1");
+ Path file2 = new Path(filesDir + Path.SEPARATOR + "file2");
+ Path file3 = new Path(filesDir + Path.SEPARATOR + "file3");
+
+ fs.create(file1).close();
+ fs.create(file2).close();
+ fs.create(file3).close();
+
+ // 1) acquire locks on file1,file2,file3
+ FileLock lock1 = FileLock.tryLock(fs, file1, locksDir, "spout1");
+ FileLock lock2 = FileLock.tryLock(fs, file2, locksDir, "spout2");
+ FileLock lock3 = FileLock.tryLock(fs, file3, locksDir, "spout3");
+ Assert.assertNotNull(lock1);
+ Assert.assertNotNull(lock2);
+ Assert.assertNotNull(lock3);
+
+ try {
+ HdfsUtils.Pair<Path, FileLock.LogEntry> expired = FileLock.locateOldestExpiredLock(fs, locksDir, LOCK_EXPIRY_SEC);
+ Assert.assertNull(expired);
+
+ // 2) wait for all 3 locks to expire then heart beat on 2 locks
+ Thread.sleep(WAIT_MSEC);
+ lock1.heartbeat("1");
+ lock2.heartbeat("1");
+
+ //todo: configure the HDFS lease timeout
+
+ // 3) Take ownership of stale lock
+ FileLock lock3b = FileLock.acquireOldestExpiredLock(fs, locksDir, LOCK_EXPIRY_SEC, "spout1");
+// Assert.assertNotNull(lock3b);
+// Assert.assertEquals("Expected lock3 file", lock3b.getLockFile(), lock3.getLockFile());
+ }finally {
+ lock1.release();
+ lock2.release();
+ lock3.release();
+ fs.delete(file1, false);
+ fs.delete(file2, false);
+ fs.delete(file3, false);
+ }
+ }
+
+ /** return null if file not found */
+ private ArrayList<String> readTextFile(Path file) throws IOException {
+ FSDataInputStream os = null;
+ try {
+ os = fs.open(file);
+ if (os == null)
+ return null;
+ BufferedReader reader = new BufferedReader(new InputStreamReader(os));
+ ArrayList<String> lines = new ArrayList<>();
+ for (String line = reader.readLine(); line != null; line = reader.readLine()) {
+ lines.add(line);
+ }
+ return lines;
+ } catch( FileNotFoundException e) {
+ return null;
+ } finally {
+ if(os!=null)
+ os.close();
+ }
+ }
+
+ class FileLockingThread extends Thread {
+
+ private int thdNum;
+ private final FileSystem fs;
+ public boolean cleanExit = false;
+ private Path fileToLock;
+ private Path locksDir;
+ private String spoutId;
+
+ public FileLockingThread(int thdNum, FileSystem fs, Path fileToLock, Path locksDir, String spoutId)
+ throws IOException {
+ this.thdNum = thdNum;
+ this.fs = fs;
+ this.fileToLock = fileToLock;
+ this.locksDir = locksDir;
+ this.spoutId = spoutId;
+ }
+
+ @Override
+ public void run() {
+ Thread.currentThread().setName("FileLockingThread-" + thdNum);
+ FileLock lock = null;
+ try {
+ do {
+ System.err.println("Trying lock - " + getName());
+ lock = FileLock.tryLock(fs, this.fileToLock, this.locksDir, spoutId);
+ System.err.println("Acquired lock - " + getName());
+ if(lock==null) {
+ System.out.println("Retrying lock - " + getName());
+ }
+ } while (lock==null);
+ cleanExit= true;
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ finally {
+ try {
+ if(lock!=null) {
+ lock.release();
+ System.err.println("Released lock - " + getName());
+ }
+ } catch (IOException e) {
+ e.printStackTrace(System.err);
+ }
+ }
+ System.err.println("Thread exiting - " + getName());
+ } // run()
+ } // class FileLockingThread
}
http://git-wip-us.apache.org/repos/asf/storm/blob/de37de68/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSpout.java b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSpout.java
index 9200c90..d967572 100644
--- a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSpout.java
+++ b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSpout.java
@@ -175,17 +175,17 @@ public class TestHdfsSpout {
ArrayList<String> result = new ArrayList<>();
for (Path seqFile : seqFiles) {
- FSDataInputStream istream = fs.open(seqFile);
+ Path file = new Path(fs.getUri().toString() + seqFile.toString());
+ SequenceFile.Reader reader = new SequenceFile.Reader(conf, SequenceFile.Reader.file(file));
try {
- SequenceFile.Reader reader = new SequenceFile.Reader(conf, SequenceFile.Reader.file(seqFile));
Writable key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
Writable value = (Writable) ReflectionUtils.newInstance(reader.getValueClass(), conf);
- while (reader.next(key, value) ) {
- String keyValStr = Arrays.asList(key,value).toString();
+ while (reader.next(key, value)) {
+ String keyValStr = Arrays.asList(key, value).toString();
result.add(keyValStr);
}
} finally {
- istream.close();
+ reader.close();
}
}// for
return result;
@@ -235,7 +235,7 @@ public class TestHdfsSpout {
System.err.println(re);
}
- listDir(source);
+ listDir(archive);
Path f1 = new Path(archive + "/file1.seq");