You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pa...@apache.org on 2016/01/14 21:56:09 UTC
[18/20] storm git commit: Addressing another review comment from Arun
about releasing lock file on exception.
Addressing another review comment from Arun about releasing lock file on exception.
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/2c02bc91
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/2c02bc91
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/2c02bc91
Branch: refs/heads/master
Commit: 2c02bc91d8a9b81b55a4e023c927a73068bcc927
Parents: 0b07f8b
Author: Roshan Naik <ro...@hortonworks.com>
Authored: Thu Jan 7 19:23:19 2016 -0800
Committer: Roshan Naik <ro...@hortonworks.com>
Committed: Thu Jan 14 11:34:57 2016 -0800
----------------------------------------------------------------------
.../org/apache/storm/hdfs/spout/HdfsSpout.java | 45 +++++++++++++-------
1 file changed, 30 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/2c02bc91/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java
index 994d87e..5428570 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java
@@ -508,9 +508,16 @@ public class HdfsSpout extends BaseRichSpout {
LOG.debug("Unable to get FileLock for {}, so skipping it.", file);
continue; // could not lock, so try another file.
}
- LOG.info("Processing : {} ", file);
- Path newFile = renameToInProgressFile(file);
- return createFileReader(newFile);
+ try {
+ Path newFile = renameToInProgressFile(file);
+ FileReader result = createFileReader(newFile);
+ LOG.info("Processing : {} ", file);
+ return result;
+ } catch (Exception e) {
+ LOG.error("Skipping file " + file, e);
+ releaseLockAndLog(lock, spoutId);
+ continue;
+ }
}
return null;
@@ -599,7 +606,7 @@ public class HdfsSpout extends BaseRichSpout {
return (FileReader) constructor.newInstance(this.hdfs, file, conf);
} catch (Exception e) {
LOG.error(e.getMessage(), e);
- throw new RuntimeException("Unable to instantiate " + readerType, e);
+ throw new RuntimeException("Unable to instantiate " + readerType + " reader", e);
}
}
@@ -638,10 +645,14 @@ public class HdfsSpout extends BaseRichSpout {
private Path renameToInProgressFile(Path file)
throws IOException {
Path newFile = new Path( file.toString() + inprogress_suffix );
- if (hdfs.rename(file, newFile)) {
- return newFile;
+ try {
+ if (hdfs.rename(file, newFile)) {
+ return newFile;
+ }
+ throw new RenameException(file, newFile);
+ } catch (IOException e){
+ throw new RenameException(file, newFile, e);
}
- throw new IOException("Rename of " + file + " to " + newFile + " failed");
}
/** Returns the corresponding input file in the 'sourceDirPath' for the specified lock file.
@@ -709,16 +720,20 @@ public class HdfsSpout extends BaseRichSpout {
}
}
- private static class RenameFailedException extends IOException {
- public final Path file;
- public RenameFailedException(Path file) {
- super("Rename failed for file: " + file);
- this.file = file;
+ private static class RenameException extends IOException {
+ public final Path oldFile;
+ public final Path newFile;
+
+ public RenameException(Path oldFile, Path newFile) {
+ super("Rename of " + oldFile + " to " + newFile + " failed");
+ this.oldFile = oldFile;
+ this.newFile = newFile;
}
- public RenameFailedException(Path file, IOException e) {
- super("Rename failed for file: " + file, e);
- this.file = file;
+ public RenameException(Path oldFile, Path newFile, IOException cause) {
+ super("Rename of " + oldFile + " to " + newFile + " failed", cause);
+ this.oldFile = oldFile;
+ this.newFile = newFile;
}
}
}