You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2016/01/15 17:46:10 UTC

[09/24] storm git commit: Functionally complete. Not well tested. Have some UTs

Functionally complete. Not well tested. Have some UTs


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/60e7a812
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/60e7a812
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/60e7a812

Branch: refs/heads/1.x-branch
Commit: 60e7a8126aceb85fe194d1cf90818fcda696d60a
Parents: 6fcebe6
Author: Roshan Naik <ro...@hortonworks.com>
Authored: Wed Dec 9 13:10:32 2015 -0800
Committer: Roshan Naik <ro...@hortonworks.com>
Committed: Thu Jan 14 11:34:55 2016 -0800

----------------------------------------------------------------------
 .../hdfs/common/CmpFilesByModificationTime.java |  14 +
 .../org/apache/storm/hdfs/common/HdfsUtils.java |  57 ++
 .../storm/hdfs/spout/AbstractFileReader.java    |  71 ++
 .../org/apache/storm/hdfs/spout/Configs.java    |  44 ++
 .../org/apache/storm/hdfs/spout/DirLock.java    |  74 +++
 .../org/apache/storm/hdfs/spout/FileLock.java   | 263 ++++++++
 .../org/apache/storm/hdfs/spout/FileOffset.java |  36 ++
 .../org/apache/storm/hdfs/spout/FileReader.java |  49 ++
 .../org/apache/storm/hdfs/spout/HdfsSpout.java  | 645 +++++++++++++++++++
 .../apache/storm/hdfs/spout/ParseException.java |  26 +
 .../storm/hdfs/spout/ProgressTracker.java       |  67 ++
 .../storm/hdfs/spout/SequenceFileReader.java    | 227 +++++++
 .../apache/storm/hdfs/spout/TextFileReader.java | 168 +++++
 .../apache/storm/hdfs/spout/TestDirLock.java    | 143 ++++
 .../apache/storm/hdfs/spout/TestHdfsSpout.java  | 465 +++++++++++++
 .../storm/hdfs/spout/TestProgressTracker.java   | 108 ++++
 16 files changed, 2457 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/60e7a812/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/CmpFilesByModificationTime.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/CmpFilesByModificationTime.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/CmpFilesByModificationTime.java
new file mode 100644
index 0000000..d194558
--- /dev/null
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/CmpFilesByModificationTime.java
@@ -0,0 +1,14 @@
+package org.apache.storm.hdfs.common;
+
+import org.apache.hadoop.fs.LocatedFileStatus;
+
+import java.util.Comparator;
+
+
+public class CmpFilesByModificationTime
+        implements Comparator<LocatedFileStatus> {
+   @Override
+    public int compare(LocatedFileStatus o1, LocatedFileStatus o2) {
+      return new Long(o1.getModificationTime()).compareTo( o1.getModificationTime() );
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/60e7a812/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/HdfsUtils.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/HdfsUtils.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/HdfsUtils.java
new file mode 100644
index 0000000..344adf1
--- /dev/null
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/HdfsUtils.java
@@ -0,0 +1,57 @@
+package org.apache.storm.hdfs.common;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+
+public class HdfsUtils {
+  /** list files sorted by modification time that have not been modified since 'olderThan'. if
+   * 'olderThan' is <= 0 then the filtering is disabled */
+  public static Collection<Path> listFilesByModificationTime(FileSystem fs, Path directory, long olderThan)
+          throws IOException {
+    ArrayList<LocatedFileStatus> fstats = new ArrayList<>();
+
+    RemoteIterator<LocatedFileStatus> itr = fs.listFiles(directory, false);
+    while( itr.hasNext() ) {
+      LocatedFileStatus fileStatus = itr.next();
+      if(olderThan>0 && fileStatus.getModificationTime()<olderThan )
+        fstats.add(fileStatus);
+      else
+        fstats.add(fileStatus);
+    }
+    Collections.sort(fstats, new CmpFilesByModificationTime() );
+
+    ArrayList<Path> result = new ArrayList<>(fstats.size());
+    for (LocatedFileStatus fstat : fstats) {
+      result.add(fstat.getPath());
+    }
+    return result;
+  }
+
+  public static class Pair<K,V> {
+    private K key;
+    private V value;
+    public Pair(K key, V value) {
+      this.key = key;
+      this.value = value;
+    }
+
+    public K getKey() {
+      return key;
+    }
+
+    public V getValue() {
+      return value;
+    }
+
+    public static <K,V> Pair of(K key, V value) {
+      return new Pair(key,value);
+    }
+  }  // class Pair
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/60e7a812/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/AbstractFileReader.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/AbstractFileReader.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/AbstractFileReader.java
new file mode 100644
index 0000000..09dc0d3
--- /dev/null
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/AbstractFileReader.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.hdfs.spout;
+
+import backtype.storm.tuple.Fields;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+
+abstract class AbstractFileReader implements FileReader {
+
+  private final Path file;
+  private final FileSystem fs;
+  private Fields fields;
+
+  public AbstractFileReader(FileSystem fs, Path file, Fields fieldNames) {
+    if (fs == null || file == null)
+      throw new IllegalArgumentException("file and filesystem args cannot be null");
+    this.fs = fs;
+    this.file = file;
+    this.fields = fieldNames;
+  }
+
+  @Override
+  public Path getFilePath() {
+    return file;
+  }
+
+
+  @Override
+  public Fields getOutputFields() {
+    return fields;
+  }
+
+  @Override
+  public void setFields(String... fieldNames) {
+    this.fields = new Fields(fieldNames);
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) return true;
+    if (o == null || getClass() != o.getClass()) return false;
+
+    AbstractFileReader that = (AbstractFileReader) o;
+
+    return !(file != null ? !file.equals(that.file) : that.file != null);
+  }
+
+  @Override
+  public int hashCode() {
+    return file != null ? file.hashCode() : 0;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/60e7a812/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/Configs.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/Configs.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/Configs.java
new file mode 100644
index 0000000..66b8972
--- /dev/null
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/Configs.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.hdfs.spout;
+
+public class Configs {
+  public static final String READER_TYPE = "hdfsspout.reader.type";
+  public static final String TEXT = "text";
+  public static final String SEQ = "seq";
+
+  public static final String SOURCE_DIR = "hdfsspout.source.dir";         // dir from which to read files
+  public static final String ARCHIVE_DIR = "hdfsspout.archive.dir";        // completed files will be moved here
+  public static final String BAD_DIR = "hdfsspout.badfiles.dir";       // unpraseable files will be moved here
+  public static final String LOCK_DIR = "hdfsspout.lock.dir";           // dir in which lock files will be created
+  public static final String COMMIT_FREQ_COUNT = "hdfsspout.commit.count";       // commit after N records
+  public static final String COMMIT_FREQ_SEC = "hdfsspout.commit.sec";         // commit after N secs
+  public static final String MAX_DUPLICATE = "hdfsspout.max.duplicate";
+  public static final String LOCK_TIMEOUT = "hdfsspout.lock.timeout.sec";   // inactivity duration after which locks are considered candidates for being reassigned to another spout
+  public static final String CLOCKS_INSYNC = "hdfsspout.clocks.insync"; // if clocks on machines in the Storm cluster are in sync
+
+  public static final String DEFAULT_LOCK_DIR = ".lock";
+  public static final int DEFAULT_COMMIT_FREQ_COUNT = 10000;
+  public static final int DEFAULT_COMMIT_FREQ_SEC = 10;
+  public static final int DEFAULT_MAX_DUPLICATES = 100;
+  public static final int DEFAULT_LOCK_TIMEOUT = 5 * 60; // 5 min
+  public static final String DEFAULT_HDFS_CONFIG_KEY = "hdfs.config";
+
+
+} // class Configs

http://git-wip-us.apache.org/repos/asf/storm/blob/60e7a812/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/DirLock.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/DirLock.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/DirLock.java
new file mode 100644
index 0000000..ef02a8f
--- /dev/null
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/DirLock.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.hdfs.spout;
+
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileAlreadyExistsException;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+public class DirLock {
+  private FileSystem fs;
+  private final Path lockFile;
+  public static final String DIR_LOCK_FILE = "DIRLOCK";
+  private static final Logger log = LoggerFactory.getLogger(DirLock.class);
+  private DirLock(FileSystem fs, Path lockFile) throws IOException {
+    if( fs.isDirectory(lockFile) )
+      throw new IllegalArgumentException(lockFile.toString() + " is not a directory");
+    this.fs = fs;
+    this.lockFile = lockFile;
+  }
+
+  /** Returns null if somebody else has a lock
+   *
+   * @param fs
+   * @param dir  the dir on which to get a lock
+   * @return lock object
+   * @throws IOException if there were errors
+   */
+  public static DirLock tryLock(FileSystem fs, Path dir) throws IOException {
+    Path lockFile = new Path(dir.toString() + Path.SEPARATOR_CHAR + DIR_LOCK_FILE );
+    try {
+      FSDataOutputStream os = fs.create(lockFile, false);
+      if(log.isInfoEnabled()) {
+        log.info("Thread acquired dir lock  " + threadInfo() + " - lockfile " + lockFile);
+      }
+      os.close();
+      return new DirLock(fs, lockFile);
+    } catch (FileAlreadyExistsException e) {
+      return null;
+    }
+  }
+
+  private static String threadInfo () {
+    return "ThdId=" + Thread.currentThread().getId() + ", ThdName=" + Thread.currentThread().getName();
+  }
+  public void release() throws IOException {
+    fs.delete(lockFile, false);
+    log.info("Thread {} released dir lock {} ", threadInfo(), lockFile);
+  }
+
+  public Path getLockFile() {
+    return lockFile;
+  }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/60e7a812/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileLock.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileLock.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileLock.java
new file mode 100644
index 0000000..f4a6813
--- /dev/null
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileLock.java
@@ -0,0 +1,263 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.hdfs.spout;
+
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.storm.hdfs.common.HdfsUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.Collection;
+
+public class FileLock {
+
+  private final FileSystem fs;
+  private final String componentID;
+  private final Path lockFile;
+  private final FSDataOutputStream stream;
+  private LogEntry lastEntry;
+
+  private static final Logger log = LoggerFactory.getLogger(DirLock.class);
+
+  private FileLock(FileSystem fs, Path fileToLock, Path lockDirPath, String spoutId)
+          throws IOException {
+    this.fs = fs;
+    String lockFileName = lockDirPath.toString() + Path.SEPARATOR_CHAR + fileToLock.getName();
+    this.lockFile = new Path(lockFileName);
+    this.stream =  fs.create(lockFile);
+    this.componentID = spoutId;
+    logProgress("0", false);
+  }
+
+  private FileLock(FileSystem fs, Path lockFile, String spoutId, LogEntry entry)
+          throws IOException {
+    this.fs = fs;
+    this.lockFile = lockFile;
+    this.stream =  fs.append(lockFile);
+    this.componentID = spoutId;
+    log.debug("Acquired abandoned lockFile {}", lockFile);
+    logProgress(entry.fileOffset, true);
+  }
+
+  public void heartbeat(String fileOffset) throws IOException {
+    logProgress(fileOffset, true);
+  }
+
+  // new line is at beginning of each line (instead of end) for better recovery from
+  // partial writes of prior lines
+  private void logProgress(String fileOffset, boolean prefixNewLine)
+          throws IOException {
+    long now = System.currentTimeMillis();
+    LogEntry entry = new LogEntry(now, componentID, fileOffset);
+    String line = entry.toString();
+    if(prefixNewLine)
+      stream.writeBytes(System.lineSeparator() + line);
+    else
+      stream.writeBytes(line);
+    stream.flush();
+    lastEntry = entry; // update this only after writing to hdfs
+  }
+
+  public void release() throws IOException {
+    stream.close();
+    fs.delete(lockFile, false);
+  }
+
+  // throws exception immediately if not able to acquire lock
+  public static FileLock tryLock(FileSystem hdfs, Path fileToLock, Path lockDirPath, String spoutId)
+          throws IOException {
+    return new FileLock(hdfs, fileToLock, lockDirPath, spoutId);
+  }
+
+  /**
+   * checks if lockFile is older than 'olderThan' UTC time by examining the modification time
+   * on file and (if necessary) the timestamp in last log entry in the file. If its stale, then
+   * returns the last log entry, else returns null.
+   * @param fs
+   * @param lockFile
+   * @param olderThan  time (millis) in UTC.
+   * @return the last entry in the file if its too old. null if last entry is not too old
+   * @throws IOException
+   */
+  public static LogEntry getLastEntryIfStale(FileSystem fs, Path lockFile, long olderThan)
+          throws IOException {
+    if( fs.getFileStatus(lockFile).getModificationTime() >= olderThan ) {
+      // HDFS timestamp may not reflect recent updates, so we double check the
+      // timestamp in last line of file to see when the last update was made
+      LogEntry lastEntry =  getLastEntry(fs, lockFile);
+      if(lastEntry==null) {
+        throw new RuntimeException(lockFile.getName() + " is empty. this file is invalid.");
+      }
+      if( lastEntry.eventTime <= olderThan )
+        return lastEntry;
+    }
+    return null;
+  }
+
+  /**
+   * returns the last log entry
+   * @param fs
+   * @param lockFile
+   * @return
+   * @throws IOException
+   */
+  public static LogEntry getLastEntry(FileSystem fs, Path lockFile)
+          throws IOException {
+    FSDataInputStream in = fs.open(lockFile);
+    BufferedReader reader = new BufferedReader(new InputStreamReader(in));
+    String lastLine = null;
+    for(String line = reader.readLine(); line!=null; line = reader.readLine() ) {
+      lastLine=line;
+    }
+    return LogEntry.deserialize(lastLine);
+  }
+
+  // takes ownership of the lock file
+
+  /**
+   * Takes ownership of the lock file.
+   * @param lockFile
+   * @param lastEntry   last entry in the lock file. this param is an optimization.
+   *                    we dont scan the lock file again to find its last entry here since
+   *                    its already been done once by the logic used to check if the lock
+   *                    file is stale. so this value comes from that earlier scan.
+   * @param spoutId     spout id
+   * @return
+   */
+  public static FileLock takeOwnership(FileSystem fs, Path lockFile, LogEntry lastEntry, String spoutId)
+          throws IOException {
+    return new FileLock(fs, lockFile, spoutId, lastEntry);
+  }
+
+  /**
+   * Finds a oldest expired lock file (using modification timestamp), then takes
+   * ownership of the lock file
+   * Impt: Assumes access to lockFilesDir has been externally synchronized such that
+   *       only one thread accessing the same thread
+   * @param fs
+   * @param lockFilesDir
+   * @param locktimeoutSec
+   * @return
+   */
+  public static FileLock acquireOldestExpiredLock(FileSystem fs, Path lockFilesDir, int locktimeoutSec, String spoutId)
+          throws IOException {
+    // list files
+    long olderThan = System.currentTimeMillis() - (locktimeoutSec*1000);
+    Collection<Path> listing = HdfsUtils.listFilesByModificationTime(fs, lockFilesDir, olderThan);
+
+    // locate oldest expired lock file (if any) and take ownership
+    for (Path file : listing) {
+      if(file.getName().equalsIgnoreCase( DirLock.DIR_LOCK_FILE) )
+        continue;
+      LogEntry lastEntry = getLastEntryIfStale(fs, file, olderThan);
+      if(lastEntry!=null)
+        return FileLock.takeOwnership(fs, file, lastEntry, spoutId);
+    }
+    log.info("No abandoned files found");
+    return null;
+  }
+
+
+  /**
+   * Finds oldest expired lock file (using modification timestamp), then takes
+   * ownership of the lock file
+   * Impt: Assumes access to lockFilesDir has been externally synchronized such that
+   *       only one thread accessing the same thread
+   * @param fs
+   * @param lockFilesDir
+   * @param locktimeoutSec
+   * @param spoutId
+   * @return a Pair<lock file path, last entry in lock file> .. if expired lock file found
+   * @throws IOException
+   */
+  public static HdfsUtils.Pair<Path,LogEntry> locateOldestExpiredLock(FileSystem fs, Path lockFilesDir, int locktimeoutSec, String spoutId)
+          throws IOException {
+    // list files
+    long olderThan = System.currentTimeMillis() - (locktimeoutSec*1000);
+    Collection<Path> listing = HdfsUtils.listFilesByModificationTime(fs, lockFilesDir, olderThan);
+
+    // locate oldest expired lock file (if any) and take ownership
+    for (Path file : listing) {
+      if(file.getName().equalsIgnoreCase( DirLock.DIR_LOCK_FILE) )
+        continue;
+      LogEntry lastEntry = getLastEntryIfStale(fs, file, olderThan);
+      if(lastEntry!=null)
+        return new HdfsUtils.Pair<>(file, lastEntry);
+    }
+    log.info("No abandoned files found");
+    return null;
+  }
+
+  public LogEntry getLastLogEntry() {
+    return lastEntry;
+  }
+
+  public Path getLockFile() {
+    return lockFile;
+  }
+
+  public static class LogEntry {
+    private static final int NUM_FIELDS = 3;
+    public final long eventTime;
+    public final String componentID;
+    public final String fileOffset;
+
+    public LogEntry(long eventtime, String componentID, String fileOffset) {
+      this.eventTime = eventtime;
+      this.componentID = componentID;
+      this.fileOffset = fileOffset;
+    }
+
+    public String toString() {
+      return eventTime + "," + componentID + "," + fileOffset;
+    }
+    public static LogEntry deserialize(String line) {
+      String[] fields = line.split(",", NUM_FIELDS);
+      return new LogEntry(Long.parseLong(fields[0]), fields[1], fields[2]);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) return true;
+      if (!(o instanceof LogEntry)) return false;
+
+      LogEntry logEntry = (LogEntry) o;
+
+      if (eventTime != logEntry.eventTime) return false;
+      if (!componentID.equals(logEntry.componentID)) return false;
+      return fileOffset.equals(logEntry.fileOffset);
+
+    }
+
+    @Override
+    public int hashCode() {
+      int result = (int) (eventTime ^ (eventTime >>> 32));
+      result = 31 * result + componentID.hashCode();
+      result = 31 * result + fileOffset.hashCode();
+      return result;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/60e7a812/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileOffset.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileOffset.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileOffset.java
new file mode 100644
index 0000000..ea8c1e1
--- /dev/null
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileOffset.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.hdfs.spout;
+
+/**
+ * Represents the notion of an offset in a file. Idea is accommodate representing file
+ * offsets other than simple byte offset as it may be insufficient for certain formats.
+ * Reader for each format implements this as appropriate for its needs.
+ * Note: Derived types must :
+ *       - implement equals() & hashCode() appropriately.
+ *       - implement Comparable<> appropriately.
+ *       - implement toString() appropriately for serialization.
+ *       - constructor(string) for deserialization
+ */
+
+interface FileOffset extends Comparable<FileOffset>, Cloneable {
+  /** tests if rhs == currOffset+1 */
+  boolean isNextOffset(FileOffset rhs);
+  public FileOffset clone();
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/60e7a812/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileReader.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileReader.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileReader.java
new file mode 100644
index 0000000..78284cf
--- /dev/null
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileReader.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.hdfs.spout;
+
+import backtype.storm.tuple.Fields;
+import org.apache.hadoop.fs.Path;
+
+import java.io.IOException;
+import java.util.List;
+
+interface FileReader {
+  public Path getFilePath();
+
+  /**
+   * A simple numeric value may not be sufficient for certain formats consequently
+   * this is a String.
+   */
+  public FileOffset getFileOffset();
+
+  /**
+   * Get the next tuple from the file
+   *
+   * @return null if no more data
+   * @throws IOException
+   */
+  public List<Object> next() throws IOException, ParseException;
+
+  public Fields getOutputFields();
+
+  public void setFields(String... fieldNames);
+
+  public void close();
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/60e7a812/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java
new file mode 100644
index 0000000..2d4afdb
--- /dev/null
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java
@@ -0,0 +1,645 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.hdfs.spout;
+
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import backtype.storm.Config;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.storm.hdfs.common.HdfsUtils;
+import org.apache.storm.hdfs.common.security.HdfsSecurityUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import backtype.storm.spout.SpoutOutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.base.BaseRichSpout;
+import backtype.storm.tuple.Fields;
+
+public class HdfsSpout extends BaseRichSpout {
+
+  private static final Logger LOG = LoggerFactory.getLogger(HdfsSpout.class);
+
+  private Path sourceDirPath;
+  private Path archiveDirPath;
+  private Path badFilesDirPath;
+  private Path lockDirPath;
+
+  private int commitFrequencyCount = Configs.DEFAULT_COMMIT_FREQ_COUNT;
+  private int commitFrequencySec = Configs.DEFAULT_COMMIT_FREQ_SEC;
+  private int maxDuplicates = Configs.DEFAULT_MAX_DUPLICATES;
+  private int lockTimeoutSec = Configs.DEFAULT_LOCK_TIMEOUT;
+  private boolean clocksInSync = true;
+
+  private ProgressTracker tracker = new ProgressTracker();
+
+  private FileSystem hdfs;
+  private FileReader reader;
+
+  private SpoutOutputCollector collector;
+  HashMap<MessageId, List<Object> > inflight = new HashMap<>();
+  LinkedBlockingQueue<HdfsUtils.Pair<MessageId, List<Object>>> retryList = new LinkedBlockingQueue<>();
+
+  private String inprogress_suffix = ".inprogress";
+
+  private Configuration hdfsConfig;
+  private String readerType;
+
+  private Map conf = null;
+  private FileLock lock;
+  private String spoutId = null;
+
+  HdfsUtils.Pair<Path,FileLock.LogEntry> lastExpiredLock = null;
+  private long lastExpiredLockTime = 0;
+
+  private long tupleCounter = 0;
+  private boolean ackEnabled = false;
+  private int acksSinceLastCommit = 0 ;
+  private final AtomicBoolean commitTimeElapsed = new AtomicBoolean(false);
+  private final Timer commitTimer = new Timer();
+  private boolean fileReadCompletely = false;
+
+  private String configKey = Configs.DEFAULT_HDFS_CONFIG_KEY; // key for hdfs kerberos configs
+
+  public HdfsSpout() {
+  }
+
+  public Path getLockDirPath() {
+    return lockDirPath;
+  }
+
+  public SpoutOutputCollector getCollector() {
+    return collector;
+  }
+
+  public HdfsSpout withConfigKey(String configKey){
+    this.configKey = configKey;
+    return this;
+  }
+
+  public void nextTuple() {
+    LOG.debug("Next Tuple");
+    // 1) First re-emit any previously failed tuples (from retryList)
+    if (!retryList.isEmpty()) {
+      LOG.debug("Sending from retry list");
+      HdfsUtils.Pair<MessageId, List<Object>> pair = retryList.remove();
+      emitData(pair.getValue(), pair.getKey());
+      return;
+    }
+
+    if( ackEnabled  &&  tracker.size()>=maxDuplicates ) {
+      LOG.warn("Waiting for more ACKs before generating new tuples. " +
+               "Progress tracker size has reached limit {}"
+              , maxDuplicates);
+      // Don't emit anything .. allow configured spout wait strategy to kick in
+      return;
+    }
+
+    // 2) If no failed tuples, then send tuples from hdfs
+    while (true) {
+      try {
+        // 3) Select a new file if one is not open already
+        if (reader == null) {
+          reader = pickNextFile();
+          if (reader == null) {
+            LOG.info("Currently no new files to process under : " + sourceDirPath);
+            return;
+          }
+        }
+
+        // 4) Read record from file, emit to collector and record progress
+        List<Object> tuple = reader.next();
+        if (tuple != null) {
+          fileReadCompletely= false;
+          ++tupleCounter;
+          MessageId msgId = new MessageId(tupleCounter, reader.getFilePath(), reader.getFileOffset());
+          emitData(tuple, msgId);
+
+          if(!ackEnabled) {
+            ++acksSinceLastCommit; // assume message is immediately acked in non-ack mode
+            commitProgress(reader.getFileOffset());
+          } else {
+            commitProgress(tracker.getCommitPosition());
+          }
+          return;
+        } else {
+          fileReadCompletely = true;
+          if(!ackEnabled) {
+            markFileAsDone(reader.getFilePath());
+          }
+        }
+      } catch (IOException e) {
+        LOG.error("I/O Error processing at file location " + getFileProgress(reader), e);
+        // don't emit anything .. allow configured spout wait strategy to kick in
+        return;
+      } catch (ParseException e) {
+        LOG.error("Parsing error when processing at file location " + getFileProgress(reader) +
+                ". Skipping remainder of file.", e);
+        markFileAsBad(reader.getFilePath());
+        // note: Unfortunately not emitting anything here due to parse error
+        // will trigger the configured spout wait strategy which is unnecessary
+      }
+    }
+
+  }
+
+  // will commit progress into lock file if commit threshold is reached
+  private void commitProgress(FileOffset position) {
+    if ( lock!=null && canCommitNow() ) {
+      try {
+        lock.heartbeat(position.toString());
+        acksSinceLastCommit = 0;
+        commitTimeElapsed.set(false);
+        setupCommitElapseTimer();
+      } catch (IOException e) {
+        LOG.error("Unable to commit progress Will retry later.", e);
+      }
+    }
+  }
+
+  private void setupCommitElapseTimer() {
+    if(commitFrequencySec<=0)
+      return;
+    TimerTask timerTask = new TimerTask() {
+      @Override
+      public void run() {
+        commitTimeElapsed.set(false);
+      }
+    };
+    commitTimer.schedule(timerTask, commitFrequencySec * 1000);
+  }
+
+
+  private static String getFileProgress(FileReader reader) {
+    return reader.getFilePath() + " " + reader.getFileOffset();
+  }
+
+  private void markFileAsDone(Path filePath) {
+    fileReadCompletely = false;
+    try {
+      renameCompletedFile(reader.getFilePath());
+    } catch (IOException e) {
+      LOG.error("Unable to archive completed file" + filePath, e);
+    }
+    unlockAndCloseReader();
+
+  }
+
+  private void markFileAsBad(Path file) {
+    String fileName = file.toString();
+    String fileNameMinusSuffix = fileName.substring(0, fileName.indexOf(inprogress_suffix));
+    String originalName = new Path(fileNameMinusSuffix).getName();
+    Path  newFile = new Path( badFilesDirPath + Path.SEPARATOR + originalName);
+
+    LOG.info("Moving bad file to " + newFile);
+    try {
+      if (!hdfs.rename(file, newFile) ) { // seems this can fail by returning false or throwing exception
+        throw new IOException("Move failed for bad file: " + file); // convert false ret value to exception
+      }
+    } catch (IOException e) {
+      LOG.warn("Error moving bad file: " + file + ". to destination :  " + newFile);
+    }
+
+    unlockAndCloseReader();
+  }
+
+  private void unlockAndCloseReader() {
+    reader.close();
+    reader = null;
+    try {
+      lock.release();
+    } catch (IOException e) {
+      LOG.error("Unable to delete lock file : " + this.lock.getLockFile(), e);
+    }
+    lock = null;
+  }
+
+
+
+  protected void emitData(List<Object> tuple, MessageId id) {
+    LOG.debug("Emitting - {}", id);
+    this.collector.emit(tuple, id);
+    inflight.put(id, tuple);
+  }
+
+  public void open(Map conf, TopologyContext context,  SpoutOutputCollector collector) {
+    this.conf = conf;
+    final String FILE_SYSTEM = "filesystem";
+    LOG.info("Opening");
+    this.collector = collector;
+    this.hdfsConfig = new Configuration();
+    this.tupleCounter = 0;
+
+    for( Object k : conf.keySet() ) {
+      String key = k.toString();
+      if( ! FILE_SYSTEM.equalsIgnoreCase( key ) ) { // to support unit test only
+        String val = conf.get(key).toString();
+        LOG.info("Config setting : " + key + " = " + val);
+        this.hdfsConfig.set(key, val);
+      }
+      else
+        this.hdfs = (FileSystem) conf.get(key);
+
+      if(key.equalsIgnoreCase(Configs.READER_TYPE)) {
+        readerType = conf.get(key).toString();
+        checkValidReader(readerType);
+      }
+    }
+
+    // - Hdfs configs
+    this.hdfsConfig = new Configuration();
+    Map<String, Object> map = (Map<String, Object>)conf.get(this.configKey);
+    if(map != null){
+      for(String key : map.keySet()){
+        this.hdfsConfig.set(key, String.valueOf(map.get(key)));
+      }
+    }
+
+    try {
+      HdfsSecurityUtil.login(conf, hdfsConfig);
+    } catch (IOException e) {
+      LOG.error("Failed to open " + sourceDirPath);
+      throw new RuntimeException(e);
+    }
+
+    // -- source dir config
+    if ( !conf.containsKey(Configs.SOURCE_DIR) ) {
+      LOG.error(Configs.SOURCE_DIR + " setting is required");
+      throw new RuntimeException(Configs.SOURCE_DIR + " setting is required");
+    }
+    this.sourceDirPath = new Path( conf.get(Configs.SOURCE_DIR).toString() );
+
+    // -- archive dir config
+    if ( !conf.containsKey(Configs.ARCHIVE_DIR) ) {
+      LOG.error(Configs.ARCHIVE_DIR + " setting is required");
+      throw new RuntimeException(Configs.ARCHIVE_DIR + " setting is required");
+    }
+    this.archiveDirPath = new Path( conf.get(Configs.ARCHIVE_DIR).toString() );
+
+    try {
+      if(hdfs.exists(archiveDirPath)) {
+        if(! hdfs.isDirectory(archiveDirPath) ) {
+          LOG.error("Archive directory is a file. " + archiveDirPath);
+          throw new RuntimeException("Archive directory is a file. " + archiveDirPath);
+        }
+      } else if(! hdfs.mkdirs(archiveDirPath) ) {
+        LOG.error("Unable to create archive directory. " + archiveDirPath);
+        throw new RuntimeException("Unable to create archive directory " + archiveDirPath);
+      }
+    } catch (IOException e) {
+      LOG.error("Unable to create archive dir ", e);
+      throw new RuntimeException("Unable to create archive directory ", e);
+    }
+
+    // -- bad files dir config
+    if ( !conf.containsKey(Configs.BAD_DIR) ) {
+      LOG.error(Configs.BAD_DIR + " setting is required");
+      throw new RuntimeException(Configs.BAD_DIR + " setting is required");
+    }
+
+    this.badFilesDirPath = new Path(conf.get(Configs.BAD_DIR).toString());
+
+    try {
+      if(hdfs.exists(badFilesDirPath)) {
+        if(! hdfs.isDirectory(badFilesDirPath) ) {
+          LOG.error("Bad files directory is a file: " + badFilesDirPath);
+          throw new RuntimeException("Bad files directory is a file: " + badFilesDirPath);
+        }
+      } else if(! hdfs.mkdirs(badFilesDirPath) ) {
+        LOG.error("Unable to create directory for bad files: " + badFilesDirPath);
+        throw new RuntimeException("Unable to create a directory for bad files: " + badFilesDirPath);
+      }
+    } catch (IOException e) {
+      LOG.error("Unable to create archive dir ", e);
+      throw new RuntimeException(e.getMessage(), e);
+    }
+
+    // -- lock dir config
+    String lockDir = !conf.containsKey(Configs.LOCK_DIR) ? getDefaultLockDir(sourceDirPath) : conf.get(Configs.LOCK_DIR).toString() ;
+    this.lockDirPath = new Path(lockDir);
+
+    try {
+      if(hdfs.exists(lockDirPath)) {
+        if(! hdfs.isDirectory(lockDirPath) ) {
+          LOG.error("Lock directory is a file: " + lockDirPath);
+          throw new RuntimeException("Lock directory is a file: " + lockDirPath);
+        }
+      } else if(! hdfs.mkdirs(lockDirPath) ) {
+        LOG.error("Unable to create lock directory: " + lockDirPath);
+        throw new RuntimeException("Unable to create lock directory: " + lockDirPath);
+      }
+    } catch (IOException e) {
+      LOG.error("Unable to create lock dir: " + lockDirPath, e);
+      throw new RuntimeException(e.getMessage(), e);
+    }
+
+    // -- lock timeout
+    if( conf.get(Configs.LOCK_TIMEOUT) !=null )
+      this.lockTimeoutSec =  Integer.parseInt(conf.get(Configs.LOCK_TIMEOUT).toString());
+
+    // -- enable/disable ACKing
+    Object ackers = conf.get(Config.TOPOLOGY_ACKER_EXECUTORS);
+    if( ackers!=null )
+      this.ackEnabled = ( Integer.parseInt( ackers.toString() ) > 0 );
+    else
+      this.ackEnabled = false;
+
+    // -- commit frequency - count
+    if( conf.get(Configs.COMMIT_FREQ_COUNT) != null )
+      commitFrequencyCount = Integer.parseInt( conf.get(Configs.COMMIT_FREQ_COUNT).toString() );
+
+    // -- commit frequency - seconds
+    if( conf.get(Configs.COMMIT_FREQ_SEC) != null )
+      commitFrequencySec = Integer.parseInt( conf.get(Configs.COMMIT_FREQ_SEC).toString() );
+
+    // -- max duplicate
+    if( conf.get(Configs.MAX_DUPLICATE) !=null )
+      maxDuplicates = Integer.parseInt( conf.get(Configs.MAX_DUPLICATE).toString() );
+
+    // -- clocks in sync
+    if( conf.get(Configs.CLOCKS_INSYNC) !=null )
+      clocksInSync = Boolean.parseBoolean(conf.get(Configs.CLOCKS_INSYNC).toString());
+
+    // -- spout id
+    spoutId = context.getThisComponentId();
+
+    // setup timer for commit elapse time tracking
+    setupCommitElapseTimer();
+  }
+
+  private String getDefaultLockDir(Path sourceDirPath) {
+    return sourceDirPath.toString() + Path.SEPARATOR + Configs.DEFAULT_LOCK_DIR;
+  }
+
+  private static void checkValidReader(String readerType) {
+    if(readerType.equalsIgnoreCase(Configs.TEXT)  || readerType.equalsIgnoreCase(Configs.SEQ) )
+      return;
+    try {
+      Class<?> classType = Class.forName(readerType);
+      classType.getConstructor(FileSystem.class, Path.class, Map.class);
+      return;
+    } catch (ClassNotFoundException e) {
+      LOG.error(readerType + " not found in classpath.", e);
+      throw new IllegalArgumentException(readerType + " not found in classpath.", e);
+    } catch (NoSuchMethodException e) {
+      LOG.error(readerType + " is missing the expected constructor for Readers.", e);
+      throw new IllegalArgumentException(readerType + " is missing the expected constuctor for Readers.");
+    }
+  }
+
+  @Override
+  public void ack(Object msgId) {
+    MessageId id = (MessageId) msgId;
+    inflight.remove(id);
+    ++acksSinceLastCommit;
+    tracker.recordAckedOffset(id.offset);
+    commitProgress(tracker.getCommitPosition());
+    if(fileReadCompletely) {
+      markFileAsDone(reader.getFilePath());
+      reader = null;
+    }
+    super.ack(msgId);
+  }
+
+  private boolean canCommitNow() {
+    if( acksSinceLastCommit >= commitFrequencyCount )
+      return true;
+    return commitTimeElapsed.get();
+  }
+
+  @Override
+  public void fail(Object msgId) {
+    super.fail(msgId);
+    HdfsUtils.Pair<MessageId, List<Object>> item = HdfsUtils.Pair.of(msgId, inflight.remove(msgId));
+    retryList.add(item);
+  }
+
+  private FileReader pickNextFile()  {
+    try {
+      // 1) If there are any abandoned files, pick oldest one
+      lock = getOldestExpiredLock();
+      if (lock != null) {
+        Path file = getFileForLockFile(lock.getLockFile(), sourceDirPath);
+        String resumeFromOffset = lock.getLastLogEntry().fileOffset;
+        LOG.info("Processing abandoned file : {}", file);
+        return createFileReader(file, resumeFromOffset);
+      }
+
+      // 2) If no abandoned files, then pick oldest file in sourceDirPath, lock it and rename it
+      Collection<Path> listing = HdfsUtils.listFilesByModificationTime(hdfs, sourceDirPath, 0);
+
+      for (Path file : listing) {
+        if( file.getName().contains(inprogress_suffix) )
+          continue;
+        LOG.info("Processing : {} ", file);
+        lock = FileLock.tryLock(hdfs, file, lockDirPath, spoutId);
+        if( lock==null ) {
+          LOG.info("Unable to get lock, so skipping file: {}", file);
+          continue; // could not lock, so try another file.
+        }
+        Path newFile = renameSelectedFile(file);
+        return createFileReader(newFile);
+      }
+
+      return null;
+    } catch (IOException e) {
+      LOG.error("Unable to select next file for consumption " + sourceDirPath, e);
+      return null;
+    }
+  }
+
+  /**
+   * If clocks in sync, then acquires the oldest expired lock
+   * Else, on first call, just remembers the oldest expired lock, on next call check if the lock is updated. if not updated then acquires the lock
+   * @return
+   * @throws IOException
+   */
+  private FileLock getOldestExpiredLock() throws IOException {
+    // 1 - acquire lock on dir
+    DirLock dirlock = DirLock.tryLock(hdfs, lockDirPath);
+    if (dirlock == null)
+      return null;
+    try {
+      // 2 - if clocks are in sync then simply take ownership of the oldest expired lock
+      if (clocksInSync)
+        return FileLock.acquireOldestExpiredLock(hdfs, lockDirPath, lockTimeoutSec, spoutId);
+
+      // 3 - if clocks are not in sync ..
+      if( lastExpiredLock == null ) {
+        // just make a note of the oldest expired lock now and check if its still unmodified after lockTimeoutSec
+        lastExpiredLock = FileLock.locateOldestExpiredLock(hdfs, lockDirPath, lockTimeoutSec, spoutId);
+        lastExpiredLockTime = System.currentTimeMillis();
+        return null;
+      }
+      // see if lockTimeoutSec time has elapsed since we last selected the lock file
+      if( hasExpired(lastExpiredLockTime) )
+        return null;
+
+      // If lock file has expired, then own it
+      FileLock.LogEntry lastEntry = FileLock.getLastEntry(hdfs, lastExpiredLock.getKey());
+      if( lastEntry.equals(lastExpiredLock.getValue()) ) {
+        FileLock result = FileLock.takeOwnership(hdfs, lastExpiredLock.getKey(), lastEntry, spoutId);
+        lastExpiredLock = null;
+        return  result;
+      } else {
+        // if lock file has been updated since last time, then leave this lock file alone
+        lastExpiredLock = null;
+        return null;
+      }
+    } finally {
+      dirlock.release();
+    }
+  }
+
+  private boolean hasExpired(long lastModifyTime) {
+    return (System.currentTimeMillis() - lastModifyTime ) < lockTimeoutSec*1000;
+  }
+
+  /**
+   * Creates a reader that reads from beginning of file
+   * @param file file to read
+   * @return
+   * @throws IOException
+   */
+  private FileReader createFileReader(Path file)
+          throws IOException {
+    if(readerType.equalsIgnoreCase(Configs.SEQ))
+      return new SequenceFileReader(this.hdfs, file, conf);
+    if(readerType.equalsIgnoreCase(Configs.TEXT))
+      return new TextFileReader(this.hdfs, file, conf);
+
+    try {
+      Class<?> clsType = Class.forName(readerType);
+      Constructor<?> constructor = clsType.getConstructor(FileSystem.class, Path.class, Map.class);
+      return (FileReader) constructor.newInstance(this.hdfs, file, conf);
+    } catch (Exception e) {
+      LOG.error(e.getMessage(), e);
+      throw new RuntimeException("Unable to instantiate " + readerType, e);
+    }
+  }
+
+
+  /**
+   * Creates a reader that starts reading from 'offset'
+   * @param file the file to read
+   * @param offset the offset string should be understandable by the reader type being used
+   * @return
+   * @throws IOException
+   */
+  private FileReader createFileReader(Path file, String offset)
+          throws IOException {
+    if(readerType.equalsIgnoreCase(Configs.SEQ))
+      return new SequenceFileReader(this.hdfs, file, conf, offset);
+    if(readerType.equalsIgnoreCase(Configs.TEXT))
+      return new TextFileReader(this.hdfs, file, conf, offset);
+
+    try {
+      Class<?> clsType = Class.forName(readerType);
+      Constructor<?> constructor = clsType.getConstructor(FileSystem.class, Path.class, Map.class, String.class);
+      return (FileReader) constructor.newInstance(this.hdfs, file, conf, offset);
+    } catch (Exception e) {
+      LOG.error(e.getMessage(), e);
+      throw new RuntimeException("Unable to instantiate " + readerType, e);
+    }
+  }
+
+  // returns new path of renamed file
+  private Path renameSelectedFile(Path file)
+          throws IOException {
+    Path newFile =  new Path( file.toString() + inprogress_suffix );
+    if( ! hdfs.rename(file, newFile) ) {
+      throw new IOException("Rename failed for file: " + file);
+    }
+    return newFile;
+  }
+
+  /** Returns the corresponding input file in the 'sourceDirPath' for the specified lock file.
+   *  If no such file is found then returns null
+   */
+  private Path getFileForLockFile(Path lockFile, Path sourceDirPath)
+          throws IOException {
+    String lockFileName = lockFile.getName();
+    Path dataFile = new Path(sourceDirPath + lockFileName + inprogress_suffix);
+    if( hdfs.exists(dataFile) )
+      return dataFile;
+    dataFile = new Path(sourceDirPath + lockFileName);
+    if(hdfs.exists(dataFile))
+      return dataFile;
+    return null;
+  }
+
+
+  private Path renameCompletedFile(Path file) throws IOException {
+    String fileName = file.toString();
+    String fileNameMinusSuffix = fileName.substring(0, fileName.indexOf(inprogress_suffix));
+    String newName = new Path(fileNameMinusSuffix).getName();
+
+    Path  newFile = new Path( archiveDirPath + Path.SEPARATOR + newName );
+    LOG.debug("Renaming complete file to " + newFile);
+    LOG.info("Completed file " + fileNameMinusSuffix );
+    if (!hdfs.rename(file, newFile) ) {
+      throw new IOException("Rename failed for file: " + file);
+    }
+    return newFile;
+  }
+
+  public void declareOutputFields(OutputFieldsDeclarer declarer) {
+    Fields fields = reader.getOutputFields();
+    declarer.declare(fields);
+  }
+
+  static class MessageId implements  Comparable<MessageId> {
+    public long msgNumber; // tracks order in which msg came in
+    public String fullPath;
+    public FileOffset offset;
+
+    public MessageId(long msgNumber, Path fullPath, FileOffset offset) {
+      this.msgNumber = msgNumber;
+      this.fullPath = fullPath.toString();
+      this.offset = offset;
+    }
+
+    @Override
+    public String toString() {
+      return "{'" +  fullPath + "':" + offset + "}";
+    }
+
+    @Override
+    public int compareTo(MessageId rhs) {
+      if (msgNumber<rhs.msgNumber)
+        return -1;
+      if(msgNumber>rhs.msgNumber)
+        return 1;
+      return 0;
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/60e7a812/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/ParseException.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/ParseException.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/ParseException.java
new file mode 100644
index 0000000..fdf7751f
--- /dev/null
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/ParseException.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.hdfs.spout;
+
+public class ParseException extends Exception {
+  public ParseException(String message, Throwable cause) {
+    super(message, cause);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/60e7a812/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/ProgressTracker.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/ProgressTracker.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/ProgressTracker.java
new file mode 100644
index 0000000..2079ef4
--- /dev/null
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/ProgressTracker.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.hdfs.spout;
+
+import java.io.PrintStream;
+import java.util.TreeSet;
+
+public class ProgressTracker {
+
+  TreeSet<FileOffset> offsets = new TreeSet<>();
+
+  public void recordAckedOffset(FileOffset newOffset) {
+    if(newOffset==null)
+      return;
+    offsets.add(newOffset);
+
+    FileOffset currHead = offsets.first();
+
+    if( currHead.isNextOffset(newOffset) ) { // check is a minor optimization
+      trimHead();
+    }
+  }
+
+  // remove contiguous elements from the head of the heap
+  // e.g.:  1,2,3,4,10,11,12,15  =>  4,10,11,12,15
+  private void trimHead() {
+    if(offsets.size()<=1)
+      return;
+    FileOffset head = offsets.first();
+    FileOffset head2 = offsets.higher(head);
+    if( head.isNextOffset(head2) ) {
+      offsets.pollFirst();
+      trimHead();
+    }
+    return;
+  }
+
+  public FileOffset getCommitPosition() {
+    if(!offsets.isEmpty())
+      return offsets.first().clone();
+    return null;
+  }
+
+  public void dumpState(PrintStream stream) {
+    stream.println(offsets);
+  }
+
+  public int size() {
+    return offsets.size();
+  }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/60e7a812/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/SequenceFileReader.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/SequenceFileReader.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/SequenceFileReader.java
new file mode 100644
index 0000000..5ff7b75
--- /dev/null
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/SequenceFileReader.java
@@ -0,0 +1,227 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.hdfs.spout;
+
+import backtype.storm.tuple.Fields;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+// Todo: Track file offsets instead of line number
+public class SequenceFileReader<Key extends Writable,Value extends Writable>
+        extends AbstractFileReader {
+  private static final Logger LOG = LoggerFactory
+          .getLogger(SequenceFileReader.class);
+  private static final int DEFAULT_BUFF_SIZE = 4096;
+  public static final String BUFFER_SIZE = "hdfsspout.reader.buffer.bytes";
+
+  private final SequenceFile.Reader reader;
+
+  private final SequenceFileReader.Offset offset;
+
+  private static final String DEFAULT_KEYNAME = "key";
+  private static final String DEFAULT_VALNAME = "value";
+
+  private String keyName;
+  private String valueName;
+
+
+  private final Key key;
+  private final Value value;
+
+
+  public SequenceFileReader(FileSystem fs, Path file, Map conf)
+          throws IOException {
+    super(fs, file, new Fields(DEFAULT_KEYNAME, DEFAULT_VALNAME));
+    this.keyName = DEFAULT_KEYNAME;
+    this.valueName = DEFAULT_VALNAME;
+    int bufferSize = !conf.containsKey(BUFFER_SIZE) ? DEFAULT_BUFF_SIZE : Integer.parseInt( conf.get(BUFFER_SIZE).toString() );
+    this.reader = new SequenceFile.Reader(fs.getConf(),  SequenceFile.Reader.file(file), SequenceFile.Reader.bufferSize(bufferSize) );
+    this.key = (Key) ReflectionUtils.newInstance(reader.getKeyClass(), fs.getConf() );
+    this.value = (Value) ReflectionUtils.newInstance(reader.getValueClass(), fs.getConf() );
+    this.offset = new SequenceFileReader.Offset(0,0,0);
+  }
+
+  public SequenceFileReader(FileSystem fs, Path file, Map conf, String offset)
+          throws IOException {
+    super(fs, file, new Fields(DEFAULT_KEYNAME, DEFAULT_VALNAME));
+    this.keyName = DEFAULT_KEYNAME;
+    this.valueName = DEFAULT_VALNAME;
+    int bufferSize = !conf.containsKey(BUFFER_SIZE) ? DEFAULT_BUFF_SIZE : Integer.parseInt( conf.get(BUFFER_SIZE).toString() );
+    this.offset = new SequenceFileReader.Offset(offset);
+    this.reader = new SequenceFile.Reader(fs.getConf(),  SequenceFile.Reader.file(file), SequenceFile.Reader.bufferSize(bufferSize) );
+    this.reader.sync(this.offset.lastSyncPoint);
+    this.key = (Key) ReflectionUtils.newInstance(reader.getKeyClass(), fs.getConf() );
+    this.value = (Value) ReflectionUtils.newInstance(reader.getValueClass(), fs.getConf() );
+  }
+
+  public String getKeyName() {
+    return keyName;
+  }
+
+  public void setKeyName(String name) {
+    if (name == null)
+      throw new IllegalArgumentException("keyName cannot be null");
+    this.keyName = name;
+    setFields(keyName, valueName);
+
+  }
+
+  public String getValueName() {
+    return valueName;
+  }
+
+  public void setValueName(String name) {
+    if (name == null)
+      throw new IllegalArgumentException("valueName cannot be null");
+    this.valueName = name;
+    setFields(keyName, valueName);
+  }
+
+  public List<Object> next() throws IOException, ParseException {
+    if( reader.next(key, value) ) {
+      ArrayList<Object> result = new ArrayList<Object>(2);
+      Collections.addAll(result, key, value);
+      offset.increment(reader.syncSeen(), reader.getPosition() );
+      return result;
+    }
+    return null;
+  }
+
+  @Override
+  public void close() {
+    try {
+      reader.close();
+    } catch (IOException e) {
+      LOG.warn("Ignoring error when closing file " + getFilePath(), e);
+    }
+  }
+
+  public Offset getFileOffset() {
+      return offset;
+  }
+
+
+  public static class Offset implements  FileOffset {
+    private long lastSyncPoint;
+    private long recordsSinceLastSync;
+    private long currentRecord;
+    private long currRecordEndOffset;
+    private long prevRecordEndOffset;
+
+    public Offset(long lastSyncPoint, long recordsSinceLastSync, long currentRecord) {
+      this(lastSyncPoint, recordsSinceLastSync, currentRecord, 0, 0 );
+    }
+
+    public Offset(long lastSyncPoint, long recordsSinceLastSync, long currentRecord
+                  , long currRecordEndOffset, long prevRecordEndOffset) {
+      this.lastSyncPoint = lastSyncPoint;
+      this.recordsSinceLastSync = recordsSinceLastSync;
+      this.currentRecord = currentRecord;
+      this.prevRecordEndOffset = prevRecordEndOffset;
+      this.currRecordEndOffset = currRecordEndOffset;
+    }
+
+    public Offset(String offset) {
+      try {
+        String[] parts = offset.split(",");
+        this.lastSyncPoint = Long.parseLong(parts[0].split("=")[1]);
+        this.recordsSinceLastSync = Long.parseLong(parts[1].split("=")[1]);
+        this.currentRecord = Long.parseLong(parts[2].split("=")[1]);
+        this.prevRecordEndOffset = 0;
+        this.currRecordEndOffset = 0;
+      } catch (Exception e) {
+        throw new IllegalArgumentException("'" + offset +
+                "' cannot be interpreted. It is not in expected format for SequenceFileReader." +
+                " Format e.g. {sync=123:afterSync=345:record=67}");
+      }
+    }
+
+    @Override
+    public String toString() {
+      return '{' +
+              "sync=" + lastSyncPoint +
+              ":afterSync=" + recordsSinceLastSync +
+              ":record=" + currentRecord +
+              '}';
+    }
+
+    @Override
+    public boolean isNextOffset(FileOffset rhs) {
+      if(rhs instanceof Offset) {
+        Offset other = ((Offset) rhs);
+        return  other.currentRecord > currentRecord+1;
+      }
+      return false;
+    }
+
+    @Override
+    public int compareTo(FileOffset o) {
+      Offset rhs = ((Offset) o);
+      if(currentRecord<rhs.currentRecord)
+        return -1;
+      if(currentRecord==rhs.currentRecord)
+        return 0;
+      return 1;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) return true;
+      if (!(o instanceof Offset)) return false;
+
+      Offset offset = (Offset) o;
+
+      return currentRecord == offset.currentRecord;
+    }
+
+    @Override
+    public int hashCode() {
+      return (int) (currentRecord ^ (currentRecord >>> 32));
+    }
+    
+    void increment(boolean syncSeen, long newBytePosition) {
+      if(!syncSeen) {
+        ++recordsSinceLastSync;
+      }  else {
+        recordsSinceLastSync = 1;
+        lastSyncPoint = prevRecordEndOffset;
+      }
+      ++currentRecord;
+      prevRecordEndOffset = currRecordEndOffset;
+      currentRecord = newBytePosition;
+    }
+
+    @Override
+    public Offset clone() {
+      return new Offset(lastSyncPoint, recordsSinceLastSync, currentRecord, currRecordEndOffset, prevRecordEndOffset);
+    }
+
+  } //class Offset
+} //class

http://git-wip-us.apache.org/repos/asf/storm/blob/60e7a812/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/TextFileReader.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/TextFileReader.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/TextFileReader.java
new file mode 100644
index 0000000..6e4a8b0
--- /dev/null
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/TextFileReader.java
@@ -0,0 +1,168 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.hdfs.spout;
+
+import backtype.storm.tuple.Fields;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+// Todo: Track file offsets instead of line number
+class TextFileReader extends AbstractFileReader {
+  public static final String CHARSET = "hdfsspout.reader.charset";
+  public static final String BUFFER_SIZE = "hdfsspout.reader.buffer.bytes";
+
+  public static final String DEFAULT_FIELD_NAME = "line";
+
+  private static final int DEFAULT_BUFF_SIZE = 4096;
+
+  private BufferedReader reader;
+  private final Logger LOG = LoggerFactory.getLogger(TextFileReader.class);
+  private TextFileReader.Offset offset;
+
+  public TextFileReader(FileSystem fs, Path file, Map conf) throws IOException {
+    super(fs, file, new Fields(DEFAULT_FIELD_NAME));
+    FSDataInputStream in = fs.open(file);
+    String charSet = (conf==null || !conf.containsKey(CHARSET) ) ? "UTF-8" : conf.get(CHARSET).toString();
+    int buffSz = (conf==null || !conf.containsKey(BUFFER_SIZE) ) ? DEFAULT_BUFF_SIZE : Integer.parseInt( conf.get(BUFFER_SIZE).toString() );
+    reader = new BufferedReader(new InputStreamReader(in, charSet), buffSz);
+    offset = new TextFileReader.Offset(0,0);
+  }
+
+  public TextFileReader(FileSystem fs, Path file, Map conf, String startOffset) throws IOException {
+    super(fs, file, new Fields(DEFAULT_FIELD_NAME));
+    offset = new TextFileReader.Offset(startOffset);
+    FSDataInputStream in = fs.open(file);
+    in.seek(offset.byteOffset);
+    String charSet = (conf==null || !conf.containsKey(CHARSET) ) ? "UTF-8" : conf.get(CHARSET).toString();
+    int buffSz = (conf==null || !conf.containsKey(BUFFER_SIZE) ) ? DEFAULT_BUFF_SIZE : Integer.parseInt( conf.get(BUFFER_SIZE).toString() );
+    reader = new BufferedReader(new InputStreamReader(in, charSet), buffSz);
+  }
+
+  public Offset getFileOffset() {
+    return offset.clone();
+  }
+
+  public List<Object> next() throws IOException, ParseException {
+    String line =  reader.readLine();
+    if(line!=null) {
+      int strByteSize = line.getBytes().length;
+      offset.increment(strByteSize);
+      return Collections.singletonList((Object) line);
+    }
+    return null;
+  }
+
+  @Override
+  public void close() {
+    try {
+      reader.close();
+    } catch (IOException e) {
+      LOG.warn("Ignoring error when closing file " + getFilePath(), e);
+    }
+  }
+
+  public static class Offset implements FileOffset {
+    long byteOffset;
+    long lineNumber;
+
+    public Offset(long byteOffset, long lineNumber) {
+      this.byteOffset = byteOffset;
+      this.lineNumber = lineNumber;
+    }
+
+    public Offset(String offset) {
+      try {
+        String[] parts = offset.split(":");
+        this.byteOffset = Long.parseLong(parts[0].split("=")[1]);
+        this.lineNumber = Long.parseLong(parts[1].split("=")[1]);
+      } catch (Exception e) {
+        throw new IllegalArgumentException("'" + offset +
+                "' cannot be interpreted. It is not in expected format for TextFileReader." +
+                " Format e.g.  {byte=123:line=5}");
+      }
+    }
+
+    @Override
+    public String toString() {
+      return '{' +
+              "byte=" + byteOffset +
+              ":line=" + lineNumber +
+              '}';
+    }
+
+    @Override
+    public boolean isNextOffset(FileOffset rhs) {
+      if(rhs instanceof Offset) {
+        Offset other = ((Offset) rhs);
+        return  other.byteOffset > byteOffset    &&
+                other.lineNumber == lineNumber+1;
+      }
+      return false;
+    }
+
+    @Override
+    public int compareTo(FileOffset o) {
+      Offset rhs = ((Offset)o);
+      if(lineNumber < rhs.lineNumber)
+        return -1;
+      if(lineNumber == rhs.lineNumber)
+        return 0;
+      return 1;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) return true;
+      if (!(o instanceof Offset)) return false;
+
+      Offset that = (Offset) o;
+
+      if (byteOffset != that.byteOffset)
+        return false;
+      return lineNumber == that.lineNumber;
+    }
+
+    @Override
+    public int hashCode() {
+      int result = (int) (byteOffset ^ (byteOffset >>> 32));
+      result = 31 * result + (int) (lineNumber ^ (lineNumber >>> 32));
+      return result;
+    }
+
+    void increment(int delta) {
+      ++lineNumber;
+      byteOffset += delta;
+    }
+
+    @Override
+    public Offset clone() {
+      return new Offset(byteOffset, lineNumber);
+    }
+  } //class Offset
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/60e7a812/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestDirLock.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestDirLock.java b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestDirLock.java
new file mode 100644
index 0000000..ea4b3a3
--- /dev/null
+++ b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestDirLock.java
@@ -0,0 +1,143 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.hdfs.spout;
+
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.IOException;
+
+public class TestDirLock {
+
+
+  static MiniDFSCluster.Builder builder;
+  static MiniDFSCluster hdfsCluster;
+  static FileSystem fs;
+  static String hdfsURI;
+  static Configuration conf = new  HdfsConfiguration();
+
+
+  @Rule
+  public TemporaryFolder tempFolder = new TemporaryFolder();
+  private Path lockDir = new Path("/tmp/lockdir");
+
+
+  @BeforeClass
+  public static void setupClass() throws IOException {
+    builder = new MiniDFSCluster.Builder(new Configuration());
+    hdfsCluster = builder.build();
+    fs  = hdfsCluster.getFileSystem();
+    hdfsURI = "hdfs://localhost:" + hdfsCluster.getNameNodePort() + "/";
+  }
+
+  @AfterClass
+  public static void teardownClass() throws IOException {
+    fs.close();
+    hdfsCluster.shutdown();
+  }
+
+  @Before
+  public void setUp() throws Exception {
+    assert fs.mkdirs(lockDir) ;
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    fs.delete(lockDir, true);
+  }
+
+  @Test
+  public void testConcurrentLocking() throws Exception {
+//    -Dlog4j.configuration=config
+    Logger.getRootLogger().setLevel(Level.ERROR);
+    DirLockingThread[] thds = startThreads(10, lockDir );
+    for (DirLockingThread thd : thds) {
+      thd.start();
+    }
+    System.err.println("Thread creation complete");
+    Thread.sleep(5000);
+    for (DirLockingThread thd : thds) {
+      thd.join(1000);
+      if(thd.isAlive() && thd.cleanExit)
+        System.err.println(thd.getName() + " did not exit cleanly");
+      Assert.assertTrue(thd.cleanExit);
+    }
+
+    Path lockFile = new Path(lockDir + Path.SEPARATOR + DirLock.DIR_LOCK_FILE);
+    Assert.assertFalse(fs.exists(lockFile));
+  }
+
+
+
+  private DirLockingThread[] startThreads(int thdCount, Path dir)
+          throws IOException {
+    DirLockingThread[] result = new DirLockingThread[thdCount];
+    for (int i = 0; i < thdCount; i++) {
+      result[i] = new DirLockingThread(i, fs, dir);
+    }
+    return result;
+  }
+
+
+  class DirLockingThread extends Thread {
+
+    private final FileSystem fs;
+    private final Path dir;
+    public boolean cleanExit = false;
+
+    public DirLockingThread(int thdNum,FileSystem fs, Path dir) throws IOException {
+      this.fs = fs;
+      this.dir = dir;
+      Thread.currentThread().setName("DirLockingThread-" + thdNum);
+    }
+
+    @Override
+    public void run() {
+      try {
+        DirLock lock;
+        do {
+          lock = DirLock.tryLock(fs, dir);
+          if(lock==null) {
+            System.out.println("Retrying lock - " + Thread.currentThread().getId());
+          }
+        } while (lock==null);
+        lock.release();
+        cleanExit= true;
+      } catch (IOException e) {
+        e.printStackTrace();
+      }
+
+    }
+
+  }
+}