You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pa...@apache.org on 2016/01/14 21:56:03 UTC
[12/20] storm git commit: Adding stale lock recovery in DirLock.
Added tests for filelock recovery, dirlock recovery,
commit_freq_sec & commit_freq_count, TestHdfsSpout.testLocking,
TestHdfsSemantics, some review comments etc
Adding stale lock recovery in DirLock. Added tests for filelock recovery, dirlock recovery, commit_freq_sec & commit_freq_count, TestHdfsSpout.testLocking, TestHdfsSemantics, some review comments etc
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/152856d1
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/152856d1
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/152856d1
Branch: refs/heads/master
Commit: 152856d1156065f51430497629ee37412ac098b2
Parents: de37de6
Author: Roshan Naik <ro...@hortonworks.com>
Authored: Thu Dec 17 14:19:54 2015 -0800
Committer: Roshan Naik <ro...@hortonworks.com>
Committed: Thu Jan 14 11:34:56 2016 -0800
----------------------------------------------------------------------
.../hdfs/common/CmpFilesByModificationTime.java | 6 +-
.../org/apache/storm/hdfs/common/HdfsUtils.java | 8 +-
.../org/apache/storm/hdfs/spout/Configs.java | 4 +-
.../org/apache/storm/hdfs/spout/DirLock.java | 37 +++-
.../org/apache/storm/hdfs/spout/FileLock.java | 16 +-
.../org/apache/storm/hdfs/spout/HdfsSpout.java | 39 ++--
.../apache/storm/hdfs/spout/TestDirLock.java | 40 ++--
.../apache/storm/hdfs/spout/TestFileLock.java | 33 ++-
.../storm/hdfs/spout/TestHdfsSemantics.java | 204 +++++++++++++++++++
.../apache/storm/hdfs/spout/TestHdfsSpout.java | 155 ++++++++++----
.../src/test/resources/log4j.properties | 26 +++
11 files changed, 473 insertions(+), 95 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/152856d1/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/CmpFilesByModificationTime.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/CmpFilesByModificationTime.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/CmpFilesByModificationTime.java
index acee9a5..67420aa 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/CmpFilesByModificationTime.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/CmpFilesByModificationTime.java
@@ -18,15 +18,15 @@
package org.apache.storm.hdfs.common;
-import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.FileStatus;
import java.util.Comparator;
public class CmpFilesByModificationTime
- implements Comparator<LocatedFileStatus> {
+ implements Comparator<FileStatus> {
@Override
- public int compare(LocatedFileStatus o1, LocatedFileStatus o2) {
+ public int compare(FileStatus o1, FileStatus o2) {
return new Long(o1.getModificationTime()).compareTo( o1.getModificationTime() );
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/152856d1/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/HdfsUtils.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/HdfsUtils.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/HdfsUtils.java
index 0574c6a..e8df78d 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/HdfsUtils.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/HdfsUtils.java
@@ -29,13 +29,12 @@ import org.apache.hadoop.ipc.RemoteException;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Collection;
import java.util.Collections;
public class HdfsUtils {
/** list files sorted by modification time that have not been modified since 'olderThan'. if
* 'olderThan' is <= 0 then the filtering is disabled */
- public static Collection<Path> listFilesByModificationTime(FileSystem fs, Path directory, long olderThan)
+ public static ArrayList<Path> listFilesByModificationTime(FileSystem fs, Path directory, long olderThan)
throws IOException {
ArrayList<LocatedFileStatus> fstats = new ArrayList<>();
@@ -43,7 +42,7 @@ public class HdfsUtils {
while( itr.hasNext() ) {
LocatedFileStatus fileStatus = itr.next();
if(olderThan>0) {
- if( fileStatus.getModificationTime()<olderThan )
+ if( fileStatus.getModificationTime()<=olderThan )
fstats.add(fileStatus);
}
else {
@@ -69,7 +68,7 @@ public class HdfsUtils {
} catch (FileAlreadyExistsException e) {
return null;
} catch (RemoteException e) {
- if( e.getClassName().contentEquals(AlreadyBeingCreatedException.class.getName()) ) {
+ if( e.unwrapRemoteException() instanceof AlreadyBeingCreatedException ) {
return null;
} else { // unexpected error
throw e;
@@ -77,7 +76,6 @@ public class HdfsUtils {
}
}
-
public static class Pair<K,V> {
private K key;
private V value;
http://git-wip-us.apache.org/repos/asf/storm/blob/152856d1/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/Configs.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/Configs.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/Configs.java
index 9a9ae73..93d775b 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/Configs.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/Configs.java
@@ -27,8 +27,8 @@ public class Configs {
public static final String ARCHIVE_DIR = "hdfsspout.archive.dir"; // completed files will be moved here
public static final String BAD_DIR = "hdfsspout.badfiles.dir"; // unpraseable files will be moved here
public static final String LOCK_DIR = "hdfsspout.lock.dir"; // dir in which lock files will be created
- public static final String COMMIT_FREQ_COUNT = "hdfsspout.commit.count"; // commit after N records
- public static final String COMMIT_FREQ_SEC = "hdfsspout.commit.sec"; // commit after N secs
+ public static final String COMMIT_FREQ_COUNT = "hdfsspout.commit.count"; // commit after N records. 0 disables this.
+ public static final String COMMIT_FREQ_SEC = "hdfsspout.commit.sec"; // commit after N secs. cannot be disabled.
public static final String MAX_DUPLICATE = "hdfsspout.max.duplicate";
public static final String LOCK_TIMEOUT = "hdfsspout.lock.timeout.sec"; // inactivity duration after which locks are considered candidates for being reassigned to another spout
public static final String CLOCKS_INSYNC = "hdfsspout.clocks.insync"; // if clocks on machines in the Storm cluster are in sync
http://git-wip-us.apache.org/repos/asf/storm/blob/152856d1/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/DirLock.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/DirLock.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/DirLock.java
index 0ff2f37..06ca749 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/DirLock.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/DirLock.java
@@ -28,7 +28,7 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
/**
- * Facility to sychronize access to HDFS directory. The lock itself is represented
+ * Facility to synchronize access to HDFS directory. The lock itself is represented
* as a file in the same directory. Relies on atomic file creation.
*/
public class DirLock {
@@ -51,7 +51,7 @@ public class DirLock {
* @throws IOException if there were errors
*/
public static DirLock tryLock(FileSystem fs, Path dir) throws IOException {
- Path lockFile = new Path(dir.toString() + Path.SEPARATOR_CHAR + DIR_LOCK_FILE );
+ Path lockFile = getDirLockFile(dir);
try {
FSDataOutputStream ostream = HdfsUtils.tryCreateFile(fs, lockFile);
@@ -69,6 +69,10 @@ public class DirLock {
}
}
+ private static Path getDirLockFile(Path dir) {
+ return new Path(dir.toString() + Path.SEPARATOR_CHAR + DIR_LOCK_FILE );
+ }
+
private static String threadInfo () {
return "ThdId=" + Thread.currentThread().getId() + ", ThdName="
+ Thread.currentThread().getName();
@@ -80,6 +84,35 @@ public class DirLock {
log.info("Thread {} released dir lock {} ", threadInfo(), lockFile);
}
+ /** if the lock on the directory is stale, take ownership */
+ public static DirLock takeOwnershipIfStale(FileSystem fs, Path dirToLock, int lockTimeoutSec) {
+ Path dirLockFile = getDirLockFile(dirToLock);
+
+ long now = System.currentTimeMillis();
+ long expiryTime = now - (lockTimeoutSec*1000);
+
+ try {
+ long modTime = fs.getFileStatus(dirLockFile).getModificationTime();
+ if(modTime <= expiryTime)
+ return takeOwnership(fs, dirLockFile);
+ return null;
+ } catch (IOException e) {
+ return null;
+ }
+ }
+
+
+ private static DirLock takeOwnership(FileSystem fs, Path dirLockFile) throws IOException {
+ // delete and recreate lock file
+ if( fs.delete(dirLockFile, false) ) { // returns false if somebody else already deleted it (to take ownership)
+ FSDataOutputStream ostream = HdfsUtils.tryCreateFile(fs, dirLockFile);
+ if(ostream!=null)
+ ostream.close();
+ return new DirLock(fs, dirLockFile);
+ }
+ return null;
+ }
+
public Path getLockFile() {
return lockFile;
}
http://git-wip-us.apache.org/repos/asf/storm/blob/152856d1/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileLock.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileLock.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileLock.java
index 76a459d..b40d1dd 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileLock.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileLock.java
@@ -92,6 +92,12 @@ public class FileLock {
public void release() throws IOException {
lockFileStream.close();
fs.delete(lockFile, false);
+ log.debug("Released lock file {}", lockFile);
+ }
+
+ // for testing only.. invoked via reflection
+ private void forceCloseLockFile() throws IOException {
+ lockFileStream.close();
}
/** returns lock on file or null if file is already locked. throws if unexpected problem */
@@ -135,6 +141,7 @@ public class FileLock {
if(lastEntry==null) {
throw new RuntimeException(lockFile.getName() + " is empty. this file is invalid.");
}
+ log.error("{} , lastModified= {}, expiryTime= {}, diff= {}", lockFile, lastEntry.eventTime, olderThan, lastEntry.eventTime-olderThan );
if( lastEntry.eventTime <= olderThan )
return lastEntry;
}
@@ -176,8 +183,8 @@ public class FileLock {
try {
return new FileLock(fs, lockFile, spoutId, lastEntry);
} catch (RemoteException e) {
- if (e.getClassName().contentEquals(AlreadyBeingCreatedException.class.getName())) {
- log.info("Lock file {} is currently open. cannot transfer ownership on.", lockFile);
+ if (e.unwrapRemoteException() instanceof AlreadyBeingCreatedException) {
+ log.info("Lock file {} is currently open. Cannot transfer ownership.", lockFile);
return null;
} else { // unexpected error
throw e;
@@ -198,7 +205,8 @@ public class FileLock {
public static FileLock acquireOldestExpiredLock(FileSystem fs, Path lockFilesDir, int locktimeoutSec, String spoutId)
throws IOException {
// list files
- long olderThan = System.currentTimeMillis() - (locktimeoutSec*1000);
+ long now = System.currentTimeMillis();
+ long olderThan = now - (locktimeoutSec*1000);
Collection<Path> listing = HdfsUtils.listFilesByModificationTime(fs, lockFilesDir, olderThan);
// locate expired lock files (if any). Try to take ownership (oldest lock first)
@@ -213,7 +221,7 @@ public class FileLock {
}
}
if(listing.isEmpty())
- log.info("No abandoned files to be refound");
+ log.info("No abandoned lock files found");
return null;
}
http://git-wip-us.apache.org/repos/asf/storm/blob/152856d1/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java
index 7977b96..50c2172 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java
@@ -124,14 +124,14 @@ public class HdfsSpout extends BaseRichSpout {
return;
}
- // 2) If no failed tuples, then send tuples from hdfs
+ // 2) If no failed tuples to be retried, then send tuples from hdfs
while (true) {
try {
// 3) Select a new file if one is not open already
if (reader == null) {
reader = pickNextFile();
if (reader == null) {
- LOG.info("Currently no new files to process under : " + sourceDirPath);
+ LOG.debug("Currently no new files to process under : " + sourceDirPath);
return;
}
}
@@ -165,8 +165,9 @@ public class HdfsSpout extends BaseRichSpout {
LOG.error("Parsing error when processing at file location " + getFileProgress(reader) +
". Skipping remainder of file.", e);
markFileAsBad(reader.getFilePath());
- // note: Unfortunately not emitting anything here due to parse error
- // will trigger the configured spout wait strategy which is unnecessary
+ // Note: We don't return from this method on ParseException to avoid triggering the
+ // spout wait strategy (due to no emits). Instead we go back into the loop and
+ // generate a tuple from next file
}
}
@@ -192,7 +193,7 @@ public class HdfsSpout extends BaseRichSpout {
TimerTask timerTask = new TimerTask() {
@Override
public void run() {
- commitTimeElapsed.set(false);
+ commitTimeElapsed.set(true);
}
};
commitTimer.schedule(timerTask, commitFrequencySec * 1000);
@@ -206,7 +207,8 @@ public class HdfsSpout extends BaseRichSpout {
private void markFileAsDone(Path filePath) {
fileReadCompletely = false;
try {
- renameCompletedFile(reader.getFilePath());
+ Path newFile = renameCompletedFile(reader.getFilePath());
+ LOG.info("Completed processing {}", newFile);
} catch (IOException e) {
LOG.error("Unable to archive completed file" + filePath, e);
}
@@ -220,7 +222,7 @@ public class HdfsSpout extends BaseRichSpout {
String originalName = new Path(fileNameMinusSuffix).getName();
Path newFile = new Path( badFilesDirPath + Path.SEPARATOR + originalName);
- LOG.info("Moving bad file to " + newFile);
+ LOG.info("Moving bad file {} to {} ", originalName, newFile);
try {
if (!hdfs.rename(file, newFile) ) { // seems this can fail by returning false or throwing exception
throw new IOException("Move failed for bad file: " + file); // convert false ret value to exception
@@ -254,7 +256,7 @@ public class HdfsSpout extends BaseRichSpout {
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
this.conf = conf;
final String FILE_SYSTEM = "filesystem";
- LOG.info("Opening");
+ LOG.info("Opening HDFS Spout");
this.collector = collector;
this.hdfsConfig = new Configuration();
this.tupleCounter = 0;
@@ -436,7 +438,8 @@ public class HdfsSpout extends BaseRichSpout {
}
private boolean canCommitNow() {
- if( acksSinceLastCommit >= commitFrequencyCount )
+
+ if( commitFrequencyCount>0 && acksSinceLastCommit >= commitFrequencyCount )
return true;
return commitTimeElapsed.get();
}
@@ -455,7 +458,7 @@ public class HdfsSpout extends BaseRichSpout {
if (lock != null) {
Path file = getFileForLockFile(lock.getLockFile(), sourceDirPath);
String resumeFromOffset = lock.getLastLogEntry().fileOffset;
- LOG.info("Processing abandoned file : {}", file);
+ LOG.info("Resuming processing of abandoned file : {}", file);
return createFileReader(file, resumeFromOffset);
}
@@ -468,12 +471,12 @@ public class HdfsSpout extends BaseRichSpout {
if( file.getName().endsWith(ignoreSuffix) )
continue;
- LOG.info("Processing : {} ", file);
lock = FileLock.tryLock(hdfs, file, lockDirPath, spoutId);
if( lock==null ) {
- LOG.info("Unable to get lock, so skipping file: {}", file);
+ LOG.debug("Unable to get lock, so skipping file: {}", file);
continue; // could not lock, so try another file.
}
+ LOG.info("Processing : {} ", file);
Path newFile = renameSelectedFile(file);
return createFileReader(newFile);
}
@@ -494,8 +497,11 @@ public class HdfsSpout extends BaseRichSpout {
private FileLock getOldestExpiredLock() throws IOException {
// 1 - acquire lock on dir
DirLock dirlock = DirLock.tryLock(hdfs, lockDirPath);
- if (dirlock == null)
- return null;
+ if (dirlock == null) {
+ dirlock = DirLock.takeOwnershipIfStale(hdfs, lockDirPath, lockTimeoutSec);
+ if (dirlock == null)
+ return null;
+ }
try {
// 2 - if clocks are in sync then simply take ownership of the oldest expired lock
if (clocksInSync)
@@ -606,14 +612,15 @@ public class HdfsSpout extends BaseRichSpout {
}
+ // renames files and returns the new file path
private Path renameCompletedFile(Path file) throws IOException {
String fileName = file.toString();
String fileNameMinusSuffix = fileName.substring(0, fileName.indexOf(inprogress_suffix));
String newName = new Path(fileNameMinusSuffix).getName();
Path newFile = new Path( archiveDirPath + Path.SEPARATOR + newName );
- LOG.debug("Renaming complete file to " + newFile);
- LOG.info("Completed file " + fileNameMinusSuffix );
+ LOG.debug("Renaming complete file to {} ", newFile);
+ LOG.info("Completed file {}", fileNameMinusSuffix );
if (!hdfs.rename(file, newFile) ) {
throw new IOException("Rename failed for file: " + file);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/152856d1/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestDirLock.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestDirLock.java b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestDirLock.java
index 667248e..a7b73d6 100644
--- a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestDirLock.java
+++ b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestDirLock.java
@@ -25,16 +25,12 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.log4j.Level;
-import org.apache.log4j.Logger;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
-import org.junit.Rule;
import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
import java.io.IOException;
@@ -45,8 +41,8 @@ public class TestDirLock {
static FileSystem fs;
static String hdfsURI;
static HdfsConfiguration conf = new HdfsConfiguration();
-
- private Path lockDir = new Path("/tmp/lockdir");
+ static final int LOCK_EXPIRY_SEC = 1;
+ private Path locksDir = new Path("/tmp/lockdir");
@BeforeClass
public static void setupClass() throws IOException {
@@ -65,23 +61,23 @@ public class TestDirLock {
@Before
public void setUp() throws Exception {
- assert fs.mkdirs(lockDir) ;
+ assert fs.mkdirs(locksDir) ;
}
@After
public void tearDown() throws Exception {
- fs.delete(lockDir, true);
+ fs.delete(locksDir, true);
}
@Test
public void testBasicLocking() throws Exception {
// 1 grab lock
- DirLock lock = DirLock.tryLock(fs, lockDir);
+ DirLock lock = DirLock.tryLock(fs, locksDir);
Assert.assertTrue(fs.exists(lock.getLockFile()));
// 2 try to grab another lock while dir is locked
- DirLock lock2 = DirLock.tryLock(fs, lockDir); // should fail
+ DirLock lock2 = DirLock.tryLock(fs, locksDir); // should fail
Assert.assertNull(lock2);
// 3 let go first lock
@@ -89,7 +85,7 @@ public class TestDirLock {
Assert.assertFalse(fs.exists(lock.getLockFile()));
// 4 try locking again
- lock2 = DirLock.tryLock(fs, lockDir);
+ lock2 = DirLock.tryLock(fs, locksDir);
Assert.assertTrue(fs.exists(lock2.getLockFile()));
lock2.release();
Assert.assertFalse(fs.exists(lock.getLockFile()));
@@ -99,7 +95,7 @@ public class TestDirLock {
@Test
public void testConcurrentLocking() throws Exception {
- DirLockingThread[] thds = startThreads(100, lockDir );
+ DirLockingThread[] thds = startThreads(100, locksDir);
for (DirLockingThread thd : thds) {
thd.join();
if( !thd.cleanExit)
@@ -107,7 +103,7 @@ public class TestDirLock {
Assert.assertTrue(thd.cleanExit);
}
- Path lockFile = new Path(lockDir + Path.SEPARATOR + DirLock.DIR_LOCK_FILE);
+ Path lockFile = new Path(locksDir + Path.SEPARATOR + DirLock.DIR_LOCK_FILE);
Assert.assertFalse(fs.exists(lockFile));
}
@@ -124,6 +120,24 @@ public class TestDirLock {
return result;
}
+ @Test
+ public void testLockRecovery() throws Exception {
+ DirLock lock1 = DirLock.tryLock(fs, locksDir); // should pass
+ Assert.assertNotNull(lock1);
+
+ DirLock lock2 = DirLock.takeOwnershipIfStale(fs, locksDir, LOCK_EXPIRY_SEC); // should fail
+ Assert.assertNull(lock2);
+
+ Thread.sleep(LOCK_EXPIRY_SEC*1000 + 500); // wait for lock to expire
+ Assert.assertTrue(fs.exists(lock1.getLockFile()));
+
+ DirLock lock3 = DirLock.takeOwnershipIfStale(fs, locksDir, LOCK_EXPIRY_SEC); // should pass now
+ Assert.assertNotNull(lock3);
+ Assert.assertTrue(fs.exists(lock3.getLockFile()));
+ lock3.release();
+ Assert.assertFalse(fs.exists(lock3.getLockFile()));
+ lock1.release(); // should not throw
+ }
class DirLockingThread extends Thread {
http://git-wip-us.apache.org/repos/asf/storm/blob/152856d1/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestFileLock.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestFileLock.java b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestFileLock.java
index 1f22a5b..a97b3f2 100644
--- a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestFileLock.java
+++ b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestFileLock.java
@@ -33,10 +33,12 @@ import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
+
import java.io.BufferedReader;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStreamReader;
+import java.lang.reflect.Method;
import java.util.ArrayList;
public class TestFileLock {
@@ -68,11 +70,13 @@ public class TestFileLock {
@Before
public void setUp() throws Exception {
assert fs.mkdirs(filesDir) ;
+ assert fs.mkdirs(locksDir) ;
}
@After
public void tearDown() throws Exception {
fs.delete(filesDir, true);
+ fs.delete(locksDir, true);
}
@Test
@@ -261,9 +265,9 @@ public class TestFileLock {
}
@Test
- public void testStaleLockRecovery() throws Exception {
+ public void testLockRecovery() throws Exception {
final int LOCK_EXPIRY_SEC = 1;
- final int WAIT_MSEC = 1500;
+ final int WAIT_MSEC = LOCK_EXPIRY_SEC*1000 + 500;
Path file1 = new Path(filesDir + Path.SEPARATOR + "file1");
Path file2 = new Path(filesDir + Path.SEPARATOR + "file2");
Path file3 = new Path(filesDir + Path.SEPARATOR + "file3");
@@ -284,27 +288,38 @@ public class TestFileLock {
HdfsUtils.Pair<Path, FileLock.LogEntry> expired = FileLock.locateOldestExpiredLock(fs, locksDir, LOCK_EXPIRY_SEC);
Assert.assertNull(expired);
+ // 1) Simulate lock file lease expiring and getting closed by HDFS
+ closeUnderlyingLockFile(lock3);
+
// 2) wait for all 3 locks to expire then heart beat on 2 locks
- Thread.sleep(WAIT_MSEC);
+ Thread.sleep(WAIT_MSEC*2); // wait for locks to expire
lock1.heartbeat("1");
lock2.heartbeat("1");
- //todo: configure the HDFS lease timeout
-
// 3) Take ownership of stale lock
FileLock lock3b = FileLock.acquireOldestExpiredLock(fs, locksDir, LOCK_EXPIRY_SEC, "spout1");
-// Assert.assertNotNull(lock3b);
-// Assert.assertEquals("Expected lock3 file", lock3b.getLockFile(), lock3.getLockFile());
- }finally {
+ Assert.assertNotNull(lock3b);
+ Assert.assertEquals("Expected lock3 file", Path.getPathWithoutSchemeAndAuthority(lock3b.getLockFile()), lock3.getLockFile());
+ } finally {
lock1.release();
lock2.release();
lock3.release();
fs.delete(file1, false);
fs.delete(file2, false);
- fs.delete(file3, false);
+ try {
+ fs.delete(file3, false);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
}
}
+ private void closeUnderlyingLockFile(FileLock lock) throws ReflectiveOperationException {
+ Method m = FileLock.class.getDeclaredMethod("forceCloseLockFile");
+ m.setAccessible(true);
+ m.invoke(lock);
+ }
+
/** return null if file not found */
private ArrayList<String> readTextFile(Path file) throws IOException {
FSDataInputStream os = null;
http://git-wip-us.apache.org/repos/asf/storm/blob/152856d1/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSemantics.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSemantics.java b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSemantics.java
new file mode 100644
index 0000000..6628cc9
--- /dev/null
+++ b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSemantics.java
@@ -0,0 +1,204 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.hdfs.spout;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileAlreadyExistsException;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
+import org.apache.hadoop.ipc.RemoteException;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+
+public class TestHdfsSemantics {
+
+ static MiniDFSCluster.Builder builder;
+ static MiniDFSCluster hdfsCluster;
+ static FileSystem fs;
+ static String hdfsURI;
+ static HdfsConfiguration conf = new HdfsConfiguration();
+
+ private Path dir = new Path("/tmp/filesdir");
+
+ @BeforeClass
+ public static void setupClass() throws IOException {
+ conf.set(CommonConfigurationKeys.IPC_PING_INTERVAL_KEY,"5000");
+ builder = new MiniDFSCluster.Builder(new Configuration());
+ hdfsCluster = builder.build();
+ fs = hdfsCluster.getFileSystem();
+ hdfsURI = "hdfs://localhost:" + hdfsCluster.getNameNodePort() + "/";
+ }
+
+ @AfterClass
+ public static void teardownClass() throws IOException {
+ fs.close();
+ hdfsCluster.shutdown();
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ assert fs.mkdirs(dir) ;
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ fs.delete(dir, true);
+ }
+
+
+ @Test
+ public void testDeleteSemantics() throws Exception {
+ Path file = new Path(dir.toString() + Path.SEPARATOR_CHAR + "file1");
+// try {
+ // 1) Delete absent file - should return false
+ Assert.assertFalse(fs.exists(file));
+ try {
+ Assert.assertFalse(fs.delete(file, false));
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+
+ // 2) deleting open file - should return true
+ fs.create(file, false);
+ Assert.assertTrue(fs.delete(file, false));
+
+ // 3) deleting closed file - should return true
+ FSDataOutputStream os = fs.create(file, false);
+ os.close();
+ Assert.assertTrue(fs.exists(file));
+ Assert.assertTrue(fs.delete(file, false));
+ Assert.assertFalse(fs.exists(file));
+ }
+
+ @Test
+ public void testConcurrentDeletion() throws Exception {
+ Path file = new Path(dir.toString() + Path.SEPARATOR_CHAR + "file1");
+ fs.create(file).close();
+ // 1 concurrent deletion - only one thread should succeed
+ FileDeletionThread[] thds = startThreads(10, file);
+ int successCount=0;
+ for (FileDeletionThread thd : thds) {
+ thd.join();
+ if( thd.succeeded)
+ successCount++;
+ if(thd.exception!=null)
+ Assert.assertNotNull(thd.exception);
+ }
+ System.err.println(successCount);
+ Assert.assertEquals(1, successCount);
+
+ }
+
+ @Test
+ public void testAppendSemantics() throws Exception {
+ //1 try to append to an open file
+ Path file1 = new Path(dir.toString() + Path.SEPARATOR_CHAR + "file1");
+ FSDataOutputStream os1 = fs.create(file1, false);
+ try {
+ fs.append(file1); // should fail
+ Assert.assertTrue("Append did not throw an exception", false);
+ } catch (RemoteException e) {
+ // expecting AlreadyBeingCreatedException inside RemoteException
+ Assert.assertEquals(AlreadyBeingCreatedException.class, e.unwrapRemoteException().getClass());
+ }
+
+ //2 try to append to a closed file
+ os1.close();
+ FSDataOutputStream os2 = fs.append(file1); // should pass
+ os2.close();
+ }
+
+ @Test
+ public void testDoubleCreateSemantics() throws Exception {
+ //1 create an already existing open file w/o override flag
+ Path file1 = new Path(dir.toString() + Path.SEPARATOR_CHAR + "file1");
+ FSDataOutputStream os1 = fs.create(file1, false);
+ try {
+ fs.create(file1, false); // should fail
+ Assert.assertTrue("Create did not throw an exception", false);
+ } catch (RemoteException e) {
+ Assert.assertEquals(AlreadyBeingCreatedException.class, e.unwrapRemoteException().getClass());
+ }
+ //2 close file and retry creation
+ os1.close();
+ try {
+ fs.create(file1, false); // should still fail
+ } catch (FileAlreadyExistsException e) {
+ // expecting this exception
+ }
+
+ //3 delete file and retry creation
+ fs.delete(file1, false);
+ FSDataOutputStream os2 = fs.create(file1, false); // should pass
+ Assert.assertNotNull(os2);
+ os2.close();
+ }
+
+
+ private FileDeletionThread[] startThreads(int thdCount, Path file)
+ throws IOException {
+ FileDeletionThread[] result = new FileDeletionThread[thdCount];
+ for (int i = 0; i < thdCount; i++) {
+ result[i] = new FileDeletionThread(i, fs, file);
+ }
+
+ for (FileDeletionThread thd : result) {
+ thd.start();
+ }
+ return result;
+ }
+
+ private static class FileDeletionThread extends Thread {
+
+ private final int thdNum;
+ private final FileSystem fs;
+ private final Path file;
+ public boolean succeeded;
+ public Exception exception = null;
+
+ public FileDeletionThread(int thdNum, FileSystem fs, Path file)
+ throws IOException {
+ this.thdNum = thdNum;
+ this.fs = fs;
+ this.file = file;
+ }
+
+ @Override
+ public void run() {
+ Thread.currentThread().setName("FileDeletionThread-" + thdNum);
+ try {
+ succeeded = fs.delete(file, false);
+ } catch (Exception e) {
+ exception = e;
+ }
+ } // run()
+
+ } // class FileLockingThread
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/152856d1/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSpout.java b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSpout.java
index d967572..98d21f8 100644
--- a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSpout.java
+++ b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSpout.java
@@ -105,7 +105,7 @@ public class TestHdfsSpout {
@After
public void shutDown() throws IOException {
- fs.delete(new Path(baseFolder.toString()),true);
+ fs.delete(new Path(baseFolder.toString()), true);
}
@Test
@@ -134,7 +134,6 @@ public class TestHdfsSpout {
checkCollectorOutput_txt((MockCollector) spout.getCollector(), arc1, arc2);
}
-
private void checkCollectorOutput_txt(MockCollector collector, Path... txtFiles) throws IOException {
ArrayList<String> expected = new ArrayList<>();
for (Path txtFile : txtFiles) {
@@ -196,10 +195,6 @@ public class TestHdfsSpout {
listDir(archive);
}
- private List<String> listBadDir() throws IOException {
- return listDir(badfiles);
- }
-
private List<String> listDir(Path p) throws IOException {
ArrayList<String> result = new ArrayList<>();
System.err.println("*** Listing " + p);
@@ -207,7 +202,7 @@ public class TestHdfsSpout {
while ( fileNames.hasNext() ) {
LocatedFileStatus fileStatus = fileNames.next();
System.err.println(fileStatus.getPath());
- result.add(fileStatus.getPath().toString());
+ result.add(Path.getPathWithoutSchemeAndAuthority(fileStatus.getPath()).toString());
}
return result;
}
@@ -244,50 +239,127 @@ public class TestHdfsSpout {
checkCollectorOutput_seq((MockCollector) spout.getCollector(), f1, f2);
}
-// - TODO: this test needs the spout to fail with an exception
@Test
- public void testFailure() throws Exception {
-
+ public void testReadFailures() throws Exception {
+ // 1) create couple of input files to read
Path file1 = new Path(source.toString() + "/file1.txt");
- createTextFile(file1, 5);
+ Path file2 = new Path(source.toString() + "/file2.txt");
- listDir(source);
+ createTextFile(file1, 6);
+ createTextFile(file2, 7);
+ Assert.assertEquals(2, listDir(source).size());
+ // 2) run spout
Map conf = getDefaultConfig();
-// conf.put(HdfsSpout.Configs.BACKOFF_SEC, "2");
HdfsSpout spout = makeSpout(0, conf, MockTextFailingReader.class.getName());
- List<String> res = runSpout(spout, "r3");
- for (String re : res) {
- System.err.println(re);
- }
-
- listCompletedDir();
- List<String> badFiles = listBadDir();
- Assert.assertEquals( badFiles.size(), 1);
- Assert.assertEquals(((MockCollector) spout.getCollector()).lines.size(), 1);
+ List<String> res = runSpout(spout, "r11");
+ String[] expected = new String[] {"[line 0]","[line 1]","[line 2]","[line 0]","[line 1]","[line 2]"};
+ Assert.assertArrayEquals(expected, res.toArray());
+
+ // 3) make sure 6 lines (3 from each file) were read in all
+ Assert.assertEquals(((MockCollector) spout.getCollector()).lines.size(), 6);
+ ArrayList<Path> badFiles = HdfsUtils.listFilesByModificationTime(fs, badfiles, 0);
+ Assert.assertEquals(badFiles.size(), 2);
}
- // @Test
+ // check lock creation/deletion and contents
+ @Test
public void testLocking() throws Exception {
+ Path file1 = new Path(source.toString() + "/file1.txt");
+ createTextFile(file1, 10);
+
+ // 0) config spout to log progress in lock file for each tuple
+ Map conf = getDefaultConfig();
+ conf.put(Configs.COMMIT_FREQ_COUNT, "1");
+ conf.put(Configs.COMMIT_FREQ_SEC, "100"); // make it irrelvant
+ HdfsSpout spout = makeSpout(0, conf, Configs.TEXT);
+
+ // 1) read initial lines in file, then check if lock exists
+ List<String> res = runSpout(spout, "r5");
+ Assert.assertEquals(5, res.size());
+ List<String> lockFiles = listDir(spout.getLockDirPath());
+ Assert.assertEquals(1, lockFiles.size());
+
+ // 2) check log file content line count == tuples emitted + 1
+ List<String> lines = readTextFile(fs, lockFiles.get(0));
+ Assert.assertEquals(lines.size(), res.size()+1);
+
+ // 3) read remaining lines in file, then ensure lock is gone
+ runSpout(spout, "r6");
+ lockFiles = listDir(spout.getLockDirPath());
+ Assert.assertEquals(0, lockFiles.size());
+
+
+ // 4) --- Create another input file and reverify same behavior ---
+ Path file2 = new Path(source.toString() + "/file2.txt");
+ createTextFile(file2, 10);
+
+ // 5) read initial lines in file, then check if lock exists
+ res = runSpout(spout, "r5");
+ Assert.assertEquals(15, res.size());
+ lockFiles = listDir(spout.getLockDirPath());
+ Assert.assertEquals(1, lockFiles.size());
+
+ // 6) check log file content line count == tuples emitted + 1
+ lines = readTextFile(fs, lockFiles.get(0));
+ Assert.assertEquals(6, lines.size());
+
+ // 7) read remaining lines in file, then ensure lock is gone
+ runSpout(spout, "r6");
+ lockFiles = listDir(spout.getLockDirPath());
+ Assert.assertEquals(0, lockFiles.size());
+ }
+
+ @Test
+ public void testLockLoggingFreqCount() throws Exception {
Path file1 = new Path(source.toString() + "/file1.txt");
- createTextFile(file1, 5);
+ createTextFile(file1, 10);
- listDir(source);
+ // 0) config spout to log progress in lock file for each tuple
+ Map conf = getDefaultConfig();
+ conf.put(Configs.COMMIT_FREQ_COUNT, "2"); // 1 lock log entry every 2 tuples
+ conf.put(Configs.COMMIT_FREQ_SEC, "1000"); // make it irrelevant for this test
+ HdfsSpout spout = makeSpout(0, conf, Configs.TEXT);
+
+ // 1) read 5 lines in file,
+ runSpout(spout, "r5");
+
+ // 2) check log file contents
+ String lockFile = listDir(spout.getLockDirPath()).get(0);
+ List<String> lines = readTextFile(fs, lockFile);
+ Assert.assertEquals(lines.size(), 3);
+
+ // 3) read 6th line and see if another log entry was made
+ runSpout(spout, "r1");
+ lines = readTextFile(fs, lockFile);
+ Assert.assertEquals(lines.size(), 4);
+ }
+ @Test
+ public void testLockLoggingFreqSec() throws Exception {
+ Path file1 = new Path(source.toString() + "/file1.txt");
+ createTextFile(file1, 10);
+
+ // 0) config spout to log progress in lock file for each tuple
Map conf = getDefaultConfig();
- conf.put(Configs.COMMIT_FREQ_COUNT, "1");
- conf.put(Configs.COMMIT_FREQ_SEC, "1");
+ conf.put(Configs.COMMIT_FREQ_COUNT, "0"); // disable it
+ conf.put(Configs.COMMIT_FREQ_SEC, "2"); // log every 2 sec
+
HdfsSpout spout = makeSpout(0, conf, Configs.TEXT);
- List<String> res = runSpout(spout,"r4");
- for (String re : res) {
- System.err.println(re);
- }
- List<String> lockFiles = listDir(spout.getLockDirPath());
- Assert.assertEquals(1, lockFiles.size());
- runSpout(spout, "r3");
- List<String> lines = readTextFile(fs, lockFiles.get(0));
- System.err.println(lines);
- Assert.assertEquals(6, lines.size());
+
+ // 1) read 5 lines in file
+ runSpout(spout, "r5");
+
+ // 2) check log file contents
+ String lockFile = listDir(spout.getLockDirPath()).get(0);
+ List<String> lines = readTextFile(fs, lockFile);
+ Assert.assertEquals(lines.size(), 1);
+ Thread.sleep(3000); // allow freq_sec to expire
+
+ // 3) read another line and see if another log entry was made
+ runSpout(spout, "r1");
+ lines = readTextFile(fs, lockFile);
+ Assert.assertEquals(2, lines.size());
}
private static List<String> readTextFile(FileSystem fs, String f) throws IOException {
@@ -320,7 +392,7 @@ public class TestHdfsSpout {
}
/**
- * Execute a sequence of calls to EventHubSpout.
+ * Execute a sequence of calls on HdfsSpout.
*
* @param cmds: set of commands to run,
* e.g. "r,r,r,r,a1,f2,...". The commands are:
@@ -427,7 +499,8 @@ public class TestHdfsSpout {
- // Throws exceptions for 2nd and 3rd line read attempt
+ // Throws IOExceptions for 3rd & 4th call to next(), succeeds on 5th, thereafter
+ // throws ParseException. Effectively produces 3 lines (1,2 & 3) from each file read
static class MockTextFailingReader extends TextFileReader {
int readAttempts = 0;
@@ -438,9 +511,9 @@ public class TestHdfsSpout {
@Override
public List<Object> next() throws IOException, ParseException {
readAttempts++;
- if (readAttempts == 2) {
+ if (readAttempts == 3 || readAttempts ==4) {
throw new IOException("mock test exception");
- } else if (readAttempts >= 3) {
+ } else if (readAttempts > 5 ) {
throw new ParseException("mock test exception", null);
}
return super.next();
http://git-wip-us.apache.org/repos/asf/storm/blob/152856d1/external/storm-hdfs/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/test/resources/log4j.properties b/external/storm-hdfs/src/test/resources/log4j.properties
new file mode 100644
index 0000000..1f92e45
--- /dev/null
+++ b/external/storm-hdfs/src/test/resources/log4j.properties
@@ -0,0 +1,26 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+
+log4j.rootLogger = WARN, out
+
+log4j.appender.out = org.apache.log4j.ConsoleAppender
+log4j.appender.out.layout = org.apache.log4j.PatternLayout
+log4j.appender.out.layout.ConversionPattern = %d (%t) [%p - %l] %m%n
+
+log4j.logger.org.apache.storm.hdfs = INFO
+