You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pa...@apache.org on 2016/01/14 21:55:55 UTC
[04/20] storm git commit: Functionally complete. Not well tested.
Have some UTs
Functionally complete. Not well tested. Have some UTs
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/60e7a812
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/60e7a812
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/60e7a812
Branch: refs/heads/master
Commit: 60e7a8126aceb85fe194d1cf90818fcda696d60a
Parents: 6fcebe6
Author: Roshan Naik <ro...@hortonworks.com>
Authored: Wed Dec 9 13:10:32 2015 -0800
Committer: Roshan Naik <ro...@hortonworks.com>
Committed: Thu Jan 14 11:34:55 2016 -0800
----------------------------------------------------------------------
.../hdfs/common/CmpFilesByModificationTime.java | 14 +
.../org/apache/storm/hdfs/common/HdfsUtils.java | 57 ++
.../storm/hdfs/spout/AbstractFileReader.java | 71 ++
.../org/apache/storm/hdfs/spout/Configs.java | 44 ++
.../org/apache/storm/hdfs/spout/DirLock.java | 74 +++
.../org/apache/storm/hdfs/spout/FileLock.java | 263 ++++++++
.../org/apache/storm/hdfs/spout/FileOffset.java | 36 ++
.../org/apache/storm/hdfs/spout/FileReader.java | 49 ++
.../org/apache/storm/hdfs/spout/HdfsSpout.java | 645 +++++++++++++++++++
.../apache/storm/hdfs/spout/ParseException.java | 26 +
.../storm/hdfs/spout/ProgressTracker.java | 67 ++
.../storm/hdfs/spout/SequenceFileReader.java | 227 +++++++
.../apache/storm/hdfs/spout/TextFileReader.java | 168 +++++
.../apache/storm/hdfs/spout/TestDirLock.java | 143 ++++
.../apache/storm/hdfs/spout/TestHdfsSpout.java | 465 +++++++++++++
.../storm/hdfs/spout/TestProgressTracker.java | 108 ++++
16 files changed, 2457 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/60e7a812/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/CmpFilesByModificationTime.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/CmpFilesByModificationTime.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/CmpFilesByModificationTime.java
new file mode 100644
index 0000000..d194558
--- /dev/null
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/CmpFilesByModificationTime.java
@@ -0,0 +1,14 @@
+package org.apache.storm.hdfs.common;
+
+import org.apache.hadoop.fs.LocatedFileStatus;
+
+import java.util.Comparator;
+
+
+public class CmpFilesByModificationTime
+ implements Comparator<LocatedFileStatus> {
+ @Override
+ public int compare(LocatedFileStatus o1, LocatedFileStatus o2) {
+ return new Long(o1.getModificationTime()).compareTo( o1.getModificationTime() );
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/60e7a812/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/HdfsUtils.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/HdfsUtils.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/HdfsUtils.java
new file mode 100644
index 0000000..344adf1
--- /dev/null
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/HdfsUtils.java
@@ -0,0 +1,57 @@
+package org.apache.storm.hdfs.common;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+
+public class HdfsUtils {
+ /** list files sorted by modification time that have not been modified since 'olderThan'. if
+ * 'olderThan' is <= 0 then the filtering is disabled */
+ public static Collection<Path> listFilesByModificationTime(FileSystem fs, Path directory, long olderThan)
+ throws IOException {
+ ArrayList<LocatedFileStatus> fstats = new ArrayList<>();
+
+ RemoteIterator<LocatedFileStatus> itr = fs.listFiles(directory, false);
+ while( itr.hasNext() ) {
+ LocatedFileStatus fileStatus = itr.next();
+ if(olderThan>0 && fileStatus.getModificationTime()<olderThan )
+ fstats.add(fileStatus);
+ else
+ fstats.add(fileStatus);
+ }
+ Collections.sort(fstats, new CmpFilesByModificationTime() );
+
+ ArrayList<Path> result = new ArrayList<>(fstats.size());
+ for (LocatedFileStatus fstat : fstats) {
+ result.add(fstat.getPath());
+ }
+ return result;
+ }
+
+ public static class Pair<K,V> {
+ private K key;
+ private V value;
+ public Pair(K key, V value) {
+ this.key = key;
+ this.value = value;
+ }
+
+ public K getKey() {
+ return key;
+ }
+
+ public V getValue() {
+ return value;
+ }
+
+ public static <K,V> Pair of(K key, V value) {
+ return new Pair(key,value);
+ }
+ } // class Pair
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/60e7a812/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/AbstractFileReader.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/AbstractFileReader.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/AbstractFileReader.java
new file mode 100644
index 0000000..09dc0d3
--- /dev/null
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/AbstractFileReader.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.hdfs.spout;
+
+import backtype.storm.tuple.Fields;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+
+abstract class AbstractFileReader implements FileReader {
+
+ private final Path file;
+ private final FileSystem fs;
+ private Fields fields;
+
+ public AbstractFileReader(FileSystem fs, Path file, Fields fieldNames) {
+ if (fs == null || file == null)
+ throw new IllegalArgumentException("file and filesystem args cannot be null");
+ this.fs = fs;
+ this.file = file;
+ this.fields = fieldNames;
+ }
+
+ @Override
+ public Path getFilePath() {
+ return file;
+ }
+
+
+ @Override
+ public Fields getOutputFields() {
+ return fields;
+ }
+
+ @Override
+ public void setFields(String... fieldNames) {
+ this.fields = new Fields(fieldNames);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ AbstractFileReader that = (AbstractFileReader) o;
+
+ return !(file != null ? !file.equals(that.file) : that.file != null);
+ }
+
+ @Override
+ public int hashCode() {
+ return file != null ? file.hashCode() : 0;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/60e7a812/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/Configs.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/Configs.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/Configs.java
new file mode 100644
index 0000000..66b8972
--- /dev/null
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/Configs.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.hdfs.spout;
+
+public class Configs {
+ public static final String READER_TYPE = "hdfsspout.reader.type";
+ public static final String TEXT = "text";
+ public static final String SEQ = "seq";
+
+ public static final String SOURCE_DIR = "hdfsspout.source.dir"; // dir from which to read files
+ public static final String ARCHIVE_DIR = "hdfsspout.archive.dir"; // completed files will be moved here
+ public static final String BAD_DIR = "hdfsspout.badfiles.dir"; // unpraseable files will be moved here
+ public static final String LOCK_DIR = "hdfsspout.lock.dir"; // dir in which lock files will be created
+ public static final String COMMIT_FREQ_COUNT = "hdfsspout.commit.count"; // commit after N records
+ public static final String COMMIT_FREQ_SEC = "hdfsspout.commit.sec"; // commit after N secs
+ public static final String MAX_DUPLICATE = "hdfsspout.max.duplicate";
+ public static final String LOCK_TIMEOUT = "hdfsspout.lock.timeout.sec"; // inactivity duration after which locks are considered candidates for being reassigned to another spout
+ public static final String CLOCKS_INSYNC = "hdfsspout.clocks.insync"; // if clocks on machines in the Storm cluster are in sync
+
+ public static final String DEFAULT_LOCK_DIR = ".lock";
+ public static final int DEFAULT_COMMIT_FREQ_COUNT = 10000;
+ public static final int DEFAULT_COMMIT_FREQ_SEC = 10;
+ public static final int DEFAULT_MAX_DUPLICATES = 100;
+ public static final int DEFAULT_LOCK_TIMEOUT = 5 * 60; // 5 min
+ public static final String DEFAULT_HDFS_CONFIG_KEY = "hdfs.config";
+
+
+} // class Configs
http://git-wip-us.apache.org/repos/asf/storm/blob/60e7a812/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/DirLock.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/DirLock.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/DirLock.java
new file mode 100644
index 0000000..ef02a8f
--- /dev/null
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/DirLock.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.hdfs.spout;
+
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileAlreadyExistsException;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+public class DirLock {
+ private FileSystem fs;
+ private final Path lockFile;
+ public static final String DIR_LOCK_FILE = "DIRLOCK";
+ private static final Logger log = LoggerFactory.getLogger(DirLock.class);
+ private DirLock(FileSystem fs, Path lockFile) throws IOException {
+ if( fs.isDirectory(lockFile) )
+ throw new IllegalArgumentException(lockFile.toString() + " is not a directory");
+ this.fs = fs;
+ this.lockFile = lockFile;
+ }
+
+ /** Returns null if somebody else has a lock
+ *
+ * @param fs
+ * @param dir the dir on which to get a lock
+ * @return lock object
+ * @throws IOException if there were errors
+ */
+ public static DirLock tryLock(FileSystem fs, Path dir) throws IOException {
+ Path lockFile = new Path(dir.toString() + Path.SEPARATOR_CHAR + DIR_LOCK_FILE );
+ try {
+ FSDataOutputStream os = fs.create(lockFile, false);
+ if(log.isInfoEnabled()) {
+ log.info("Thread acquired dir lock " + threadInfo() + " - lockfile " + lockFile);
+ }
+ os.close();
+ return new DirLock(fs, lockFile);
+ } catch (FileAlreadyExistsException e) {
+ return null;
+ }
+ }
+
+ private static String threadInfo () {
+ return "ThdId=" + Thread.currentThread().getId() + ", ThdName=" + Thread.currentThread().getName();
+ }
+ public void release() throws IOException {
+ fs.delete(lockFile, false);
+ log.info("Thread {} released dir lock {} ", threadInfo(), lockFile);
+ }
+
+ public Path getLockFile() {
+ return lockFile;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/60e7a812/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileLock.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileLock.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileLock.java
new file mode 100644
index 0000000..f4a6813
--- /dev/null
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileLock.java
@@ -0,0 +1,263 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.hdfs.spout;
+
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.storm.hdfs.common.HdfsUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.Collection;
+
+public class FileLock {
+
+ private final FileSystem fs;
+ private final String componentID;
+ private final Path lockFile;
+ private final FSDataOutputStream stream;
+ private LogEntry lastEntry;
+
+ private static final Logger log = LoggerFactory.getLogger(DirLock.class);
+
+ private FileLock(FileSystem fs, Path fileToLock, Path lockDirPath, String spoutId)
+ throws IOException {
+ this.fs = fs;
+ String lockFileName = lockDirPath.toString() + Path.SEPARATOR_CHAR + fileToLock.getName();
+ this.lockFile = new Path(lockFileName);
+ this.stream = fs.create(lockFile);
+ this.componentID = spoutId;
+ logProgress("0", false);
+ }
+
+ private FileLock(FileSystem fs, Path lockFile, String spoutId, LogEntry entry)
+ throws IOException {
+ this.fs = fs;
+ this.lockFile = lockFile;
+ this.stream = fs.append(lockFile);
+ this.componentID = spoutId;
+ log.debug("Acquired abandoned lockFile {}", lockFile);
+ logProgress(entry.fileOffset, true);
+ }
+
+ public void heartbeat(String fileOffset) throws IOException {
+ logProgress(fileOffset, true);
+ }
+
+ // new line is at beginning of each line (instead of end) for better recovery from
+ // partial writes of prior lines
+ private void logProgress(String fileOffset, boolean prefixNewLine)
+ throws IOException {
+ long now = System.currentTimeMillis();
+ LogEntry entry = new LogEntry(now, componentID, fileOffset);
+ String line = entry.toString();
+ if(prefixNewLine)
+ stream.writeBytes(System.lineSeparator() + line);
+ else
+ stream.writeBytes(line);
+ stream.flush();
+ lastEntry = entry; // update this only after writing to hdfs
+ }
+
+ public void release() throws IOException {
+ stream.close();
+ fs.delete(lockFile, false);
+ }
+
+ // throws exception immediately if not able to acquire lock
+ public static FileLock tryLock(FileSystem hdfs, Path fileToLock, Path lockDirPath, String spoutId)
+ throws IOException {
+ return new FileLock(hdfs, fileToLock, lockDirPath, spoutId);
+ }
+
+ /**
+ * checks if lockFile is older than 'olderThan' UTC time by examining the modification time
+ * on file and (if necessary) the timestamp in last log entry in the file. If its stale, then
+ * returns the last log entry, else returns null.
+ * @param fs
+ * @param lockFile
+ * @param olderThan time (millis) in UTC.
+ * @return the last entry in the file if its too old. null if last entry is not too old
+ * @throws IOException
+ */
+ public static LogEntry getLastEntryIfStale(FileSystem fs, Path lockFile, long olderThan)
+ throws IOException {
+ if( fs.getFileStatus(lockFile).getModificationTime() >= olderThan ) {
+ // HDFS timestamp may not reflect recent updates, so we double check the
+ // timestamp in last line of file to see when the last update was made
+ LogEntry lastEntry = getLastEntry(fs, lockFile);
+ if(lastEntry==null) {
+ throw new RuntimeException(lockFile.getName() + " is empty. this file is invalid.");
+ }
+ if( lastEntry.eventTime <= olderThan )
+ return lastEntry;
+ }
+ return null;
+ }
+
+ /**
+ * returns the last log entry
+ * @param fs
+ * @param lockFile
+ * @return
+ * @throws IOException
+ */
+ public static LogEntry getLastEntry(FileSystem fs, Path lockFile)
+ throws IOException {
+ FSDataInputStream in = fs.open(lockFile);
+ BufferedReader reader = new BufferedReader(new InputStreamReader(in));
+ String lastLine = null;
+ for(String line = reader.readLine(); line!=null; line = reader.readLine() ) {
+ lastLine=line;
+ }
+ return LogEntry.deserialize(lastLine);
+ }
+
+ // takes ownership of the lock file
+
+ /**
+ * Takes ownership of the lock file.
+ * @param lockFile
+ * @param lastEntry last entry in the lock file. this param is an optimization.
+ * we dont scan the lock file again to find its last entry here since
+ * its already been done once by the logic used to check if the lock
+ * file is stale. so this value comes from that earlier scan.
+ * @param spoutId spout id
+ * @return
+ */
+ public static FileLock takeOwnership(FileSystem fs, Path lockFile, LogEntry lastEntry, String spoutId)
+ throws IOException {
+ return new FileLock(fs, lockFile, spoutId, lastEntry);
+ }
+
+ /**
+ * Finds a oldest expired lock file (using modification timestamp), then takes
+ * ownership of the lock file
+ * Impt: Assumes access to lockFilesDir has been externally synchronized such that
+ * only one thread accessing the same thread
+ * @param fs
+ * @param lockFilesDir
+ * @param locktimeoutSec
+ * @return
+ */
+ public static FileLock acquireOldestExpiredLock(FileSystem fs, Path lockFilesDir, int locktimeoutSec, String spoutId)
+ throws IOException {
+ // list files
+ long olderThan = System.currentTimeMillis() - (locktimeoutSec*1000);
+ Collection<Path> listing = HdfsUtils.listFilesByModificationTime(fs, lockFilesDir, olderThan);
+
+ // locate oldest expired lock file (if any) and take ownership
+ for (Path file : listing) {
+ if(file.getName().equalsIgnoreCase( DirLock.DIR_LOCK_FILE) )
+ continue;
+ LogEntry lastEntry = getLastEntryIfStale(fs, file, olderThan);
+ if(lastEntry!=null)
+ return FileLock.takeOwnership(fs, file, lastEntry, spoutId);
+ }
+ log.info("No abandoned files found");
+ return null;
+ }
+
+
+ /**
+ * Finds oldest expired lock file (using modification timestamp), then takes
+ * ownership of the lock file
+ * Impt: Assumes access to lockFilesDir has been externally synchronized such that
+ * only one thread accessing the same thread
+ * @param fs
+ * @param lockFilesDir
+ * @param locktimeoutSec
+ * @param spoutId
+ * @return a Pair<lock file path, last entry in lock file> .. if expired lock file found
+ * @throws IOException
+ */
+ public static HdfsUtils.Pair<Path,LogEntry> locateOldestExpiredLock(FileSystem fs, Path lockFilesDir, int locktimeoutSec, String spoutId)
+ throws IOException {
+ // list files
+ long olderThan = System.currentTimeMillis() - (locktimeoutSec*1000);
+ Collection<Path> listing = HdfsUtils.listFilesByModificationTime(fs, lockFilesDir, olderThan);
+
+ // locate oldest expired lock file (if any) and take ownership
+ for (Path file : listing) {
+ if(file.getName().equalsIgnoreCase( DirLock.DIR_LOCK_FILE) )
+ continue;
+ LogEntry lastEntry = getLastEntryIfStale(fs, file, olderThan);
+ if(lastEntry!=null)
+ return new HdfsUtils.Pair<>(file, lastEntry);
+ }
+ log.info("No abandoned files found");
+ return null;
+ }
+
+ public LogEntry getLastLogEntry() {
+ return lastEntry;
+ }
+
+ public Path getLockFile() {
+ return lockFile;
+ }
+
+ public static class LogEntry {
+ private static final int NUM_FIELDS = 3;
+ public final long eventTime;
+ public final String componentID;
+ public final String fileOffset;
+
+ public LogEntry(long eventtime, String componentID, String fileOffset) {
+ this.eventTime = eventtime;
+ this.componentID = componentID;
+ this.fileOffset = fileOffset;
+ }
+
+ public String toString() {
+ return eventTime + "," + componentID + "," + fileOffset;
+ }
+ public static LogEntry deserialize(String line) {
+ String[] fields = line.split(",", NUM_FIELDS);
+ return new LogEntry(Long.parseLong(fields[0]), fields[1], fields[2]);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (!(o instanceof LogEntry)) return false;
+
+ LogEntry logEntry = (LogEntry) o;
+
+ if (eventTime != logEntry.eventTime) return false;
+ if (!componentID.equals(logEntry.componentID)) return false;
+ return fileOffset.equals(logEntry.fileOffset);
+
+ }
+
+ @Override
+ public int hashCode() {
+ int result = (int) (eventTime ^ (eventTime >>> 32));
+ result = 31 * result + componentID.hashCode();
+ result = 31 * result + fileOffset.hashCode();
+ return result;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/60e7a812/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileOffset.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileOffset.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileOffset.java
new file mode 100644
index 0000000..ea8c1e1
--- /dev/null
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileOffset.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.hdfs.spout;
+
+/**
+ * Represents the notion of an offset in a file. Idea is accommodate representing file
+ * offsets other than simple byte offset as it may be insufficient for certain formats.
+ * Reader for each format implements this as appropriate for its needs.
+ * Note: Derived types must :
+ * - implement equals() & hashCode() appropriately.
+ * - implement Comparable<> appropriately.
+ * - implement toString() appropriately for serialization.
+ * - constructor(string) for deserialization
+ */
+
+interface FileOffset extends Comparable<FileOffset>, Cloneable {
+ /** tests if rhs == currOffset+1 */
+ boolean isNextOffset(FileOffset rhs);
+ public FileOffset clone();
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/60e7a812/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileReader.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileReader.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileReader.java
new file mode 100644
index 0000000..78284cf
--- /dev/null
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileReader.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.hdfs.spout;
+
+import backtype.storm.tuple.Fields;
+import org.apache.hadoop.fs.Path;
+
+import java.io.IOException;
+import java.util.List;
+
+interface FileReader {
+ public Path getFilePath();
+
+ /**
+ * A simple numeric value may not be sufficient for certain formats consequently
+ * this is a String.
+ */
+ public FileOffset getFileOffset();
+
+ /**
+ * Get the next tuple from the file
+ *
+ * @return null if no more data
+ * @throws IOException
+ */
+ public List<Object> next() throws IOException, ParseException;
+
+ public Fields getOutputFields();
+
+ public void setFields(String... fieldNames);
+
+ public void close();
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/60e7a812/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java
new file mode 100644
index 0000000..2d4afdb
--- /dev/null
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java
@@ -0,0 +1,645 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.hdfs.spout;
+
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import backtype.storm.Config;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.storm.hdfs.common.HdfsUtils;
+import org.apache.storm.hdfs.common.security.HdfsSecurityUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import backtype.storm.spout.SpoutOutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.base.BaseRichSpout;
+import backtype.storm.tuple.Fields;
+
+public class HdfsSpout extends BaseRichSpout {
+
+ private static final Logger LOG = LoggerFactory.getLogger(HdfsSpout.class);
+
+ private Path sourceDirPath;
+ private Path archiveDirPath;
+ private Path badFilesDirPath;
+ private Path lockDirPath;
+
+ private int commitFrequencyCount = Configs.DEFAULT_COMMIT_FREQ_COUNT;
+ private int commitFrequencySec = Configs.DEFAULT_COMMIT_FREQ_SEC;
+ private int maxDuplicates = Configs.DEFAULT_MAX_DUPLICATES;
+ private int lockTimeoutSec = Configs.DEFAULT_LOCK_TIMEOUT;
+ private boolean clocksInSync = true;
+
+ private ProgressTracker tracker = new ProgressTracker();
+
+ private FileSystem hdfs;
+ private FileReader reader;
+
+ private SpoutOutputCollector collector;
+ HashMap<MessageId, List<Object> > inflight = new HashMap<>();
+ LinkedBlockingQueue<HdfsUtils.Pair<MessageId, List<Object>>> retryList = new LinkedBlockingQueue<>();
+
+ private String inprogress_suffix = ".inprogress";
+
+ private Configuration hdfsConfig;
+ private String readerType;
+
+ private Map conf = null;
+ private FileLock lock;
+ private String spoutId = null;
+
+ HdfsUtils.Pair<Path,FileLock.LogEntry> lastExpiredLock = null;
+ private long lastExpiredLockTime = 0;
+
+ private long tupleCounter = 0;
+ private boolean ackEnabled = false;
+ private int acksSinceLastCommit = 0 ;
+ private final AtomicBoolean commitTimeElapsed = new AtomicBoolean(false);
+ private final Timer commitTimer = new Timer();
+ private boolean fileReadCompletely = false;
+
+ private String configKey = Configs.DEFAULT_HDFS_CONFIG_KEY; // key for hdfs kerberos configs
+
+ public HdfsSpout() {
+ }
+
+ public Path getLockDirPath() {
+ return lockDirPath;
+ }
+
+ public SpoutOutputCollector getCollector() {
+ return collector;
+ }
+
+ public HdfsSpout withConfigKey(String configKey){
+ this.configKey = configKey;
+ return this;
+ }
+
+ public void nextTuple() {
+ LOG.debug("Next Tuple");
+ // 1) First re-emit any previously failed tuples (from retryList)
+ if (!retryList.isEmpty()) {
+ LOG.debug("Sending from retry list");
+ HdfsUtils.Pair<MessageId, List<Object>> pair = retryList.remove();
+ emitData(pair.getValue(), pair.getKey());
+ return;
+ }
+
+ if( ackEnabled && tracker.size()>=maxDuplicates ) {
+ LOG.warn("Waiting for more ACKs before generating new tuples. " +
+ "Progress tracker size has reached limit {}"
+ , maxDuplicates);
+ // Don't emit anything .. allow configured spout wait strategy to kick in
+ return;
+ }
+
+ // 2) If no failed tuples, then send tuples from hdfs
+ while (true) {
+ try {
+ // 3) Select a new file if one is not open already
+ if (reader == null) {
+ reader = pickNextFile();
+ if (reader == null) {
+ LOG.info("Currently no new files to process under : " + sourceDirPath);
+ return;
+ }
+ }
+
+ // 4) Read record from file, emit to collector and record progress
+ List<Object> tuple = reader.next();
+ if (tuple != null) {
+ fileReadCompletely= false;
+ ++tupleCounter;
+ MessageId msgId = new MessageId(tupleCounter, reader.getFilePath(), reader.getFileOffset());
+ emitData(tuple, msgId);
+
+ if(!ackEnabled) {
+ ++acksSinceLastCommit; // assume message is immediately acked in non-ack mode
+ commitProgress(reader.getFileOffset());
+ } else {
+ commitProgress(tracker.getCommitPosition());
+ }
+ return;
+ } else {
+ fileReadCompletely = true;
+ if(!ackEnabled) {
+ markFileAsDone(reader.getFilePath());
+ }
+ }
+ } catch (IOException e) {
+ LOG.error("I/O Error processing at file location " + getFileProgress(reader), e);
+ // don't emit anything .. allow configured spout wait strategy to kick in
+ return;
+ } catch (ParseException e) {
+ LOG.error("Parsing error when processing at file location " + getFileProgress(reader) +
+ ". Skipping remainder of file.", e);
+ markFileAsBad(reader.getFilePath());
+ // note: Unfortunately not emitting anything here due to parse error
+ // will trigger the configured spout wait strategy which is unnecessary
+ }
+ }
+
+ }
+
+ // will commit progress into lock file if commit threshold is reached
+ private void commitProgress(FileOffset position) {
+ if ( lock!=null && canCommitNow() ) {
+ try {
+ lock.heartbeat(position.toString());
+ acksSinceLastCommit = 0;
+ commitTimeElapsed.set(false);
+ setupCommitElapseTimer();
+ } catch (IOException e) {
+ LOG.error("Unable to commit progress Will retry later.", e);
+ }
+ }
+ }
+
+ private void setupCommitElapseTimer() {
+ if(commitFrequencySec<=0)
+ return;
+ TimerTask timerTask = new TimerTask() {
+ @Override
+ public void run() {
+ commitTimeElapsed.set(false);
+ }
+ };
+ commitTimer.schedule(timerTask, commitFrequencySec * 1000);
+ }
+
+
+ private static String getFileProgress(FileReader reader) {
+ return reader.getFilePath() + " " + reader.getFileOffset();
+ }
+
+ private void markFileAsDone(Path filePath) {
+ fileReadCompletely = false;
+ try {
+ renameCompletedFile(reader.getFilePath());
+ } catch (IOException e) {
+ LOG.error("Unable to archive completed file" + filePath, e);
+ }
+ unlockAndCloseReader();
+
+ }
+
+ private void markFileAsBad(Path file) {
+ String fileName = file.toString();
+ String fileNameMinusSuffix = fileName.substring(0, fileName.indexOf(inprogress_suffix));
+ String originalName = new Path(fileNameMinusSuffix).getName();
+ Path newFile = new Path( badFilesDirPath + Path.SEPARATOR + originalName);
+
+ LOG.info("Moving bad file to " + newFile);
+ try {
+ if (!hdfs.rename(file, newFile) ) { // seems this can fail by returning false or throwing exception
+ throw new IOException("Move failed for bad file: " + file); // convert false ret value to exception
+ }
+ } catch (IOException e) {
+ LOG.warn("Error moving bad file: " + file + ". to destination : " + newFile);
+ }
+
+ unlockAndCloseReader();
+ }
+
+ private void unlockAndCloseReader() {
+ reader.close();
+ reader = null;
+ try {
+ lock.release();
+ } catch (IOException e) {
+ LOG.error("Unable to delete lock file : " + this.lock.getLockFile(), e);
+ }
+ lock = null;
+ }
+
+
+
+ protected void emitData(List<Object> tuple, MessageId id) {
+ LOG.debug("Emitting - {}", id);
+ this.collector.emit(tuple, id);
+ inflight.put(id, tuple);
+ }
+
+ public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
+ this.conf = conf;
+ final String FILE_SYSTEM = "filesystem";
+ LOG.info("Opening");
+ this.collector = collector;
+ this.hdfsConfig = new Configuration();
+ this.tupleCounter = 0;
+
+ for( Object k : conf.keySet() ) {
+ String key = k.toString();
+ if( ! FILE_SYSTEM.equalsIgnoreCase( key ) ) { // to support unit test only
+ String val = conf.get(key).toString();
+ LOG.info("Config setting : " + key + " = " + val);
+ this.hdfsConfig.set(key, val);
+ }
+ else
+ this.hdfs = (FileSystem) conf.get(key);
+
+ if(key.equalsIgnoreCase(Configs.READER_TYPE)) {
+ readerType = conf.get(key).toString();
+ checkValidReader(readerType);
+ }
+ }
+
+ // - Hdfs configs
+ this.hdfsConfig = new Configuration();
+ Map<String, Object> map = (Map<String, Object>)conf.get(this.configKey);
+ if(map != null){
+ for(String key : map.keySet()){
+ this.hdfsConfig.set(key, String.valueOf(map.get(key)));
+ }
+ }
+
+ try {
+ HdfsSecurityUtil.login(conf, hdfsConfig);
+ } catch (IOException e) {
+ LOG.error("Failed to open " + sourceDirPath);
+ throw new RuntimeException(e);
+ }
+
+ // -- source dir config
+ if ( !conf.containsKey(Configs.SOURCE_DIR) ) {
+ LOG.error(Configs.SOURCE_DIR + " setting is required");
+ throw new RuntimeException(Configs.SOURCE_DIR + " setting is required");
+ }
+ this.sourceDirPath = new Path( conf.get(Configs.SOURCE_DIR).toString() );
+
+ // -- archive dir config
+ if ( !conf.containsKey(Configs.ARCHIVE_DIR) ) {
+ LOG.error(Configs.ARCHIVE_DIR + " setting is required");
+ throw new RuntimeException(Configs.ARCHIVE_DIR + " setting is required");
+ }
+ this.archiveDirPath = new Path( conf.get(Configs.ARCHIVE_DIR).toString() );
+
+ try {
+ if(hdfs.exists(archiveDirPath)) {
+ if(! hdfs.isDirectory(archiveDirPath) ) {
+ LOG.error("Archive directory is a file. " + archiveDirPath);
+ throw new RuntimeException("Archive directory is a file. " + archiveDirPath);
+ }
+ } else if(! hdfs.mkdirs(archiveDirPath) ) {
+ LOG.error("Unable to create archive directory. " + archiveDirPath);
+ throw new RuntimeException("Unable to create archive directory " + archiveDirPath);
+ }
+ } catch (IOException e) {
+ LOG.error("Unable to create archive dir ", e);
+ throw new RuntimeException("Unable to create archive directory ", e);
+ }
+
+ // -- bad files dir config
+ if ( !conf.containsKey(Configs.BAD_DIR) ) {
+ LOG.error(Configs.BAD_DIR + " setting is required");
+ throw new RuntimeException(Configs.BAD_DIR + " setting is required");
+ }
+
+ this.badFilesDirPath = new Path(conf.get(Configs.BAD_DIR).toString());
+
+ try {
+ if(hdfs.exists(badFilesDirPath)) {
+ if(! hdfs.isDirectory(badFilesDirPath) ) {
+ LOG.error("Bad files directory is a file: " + badFilesDirPath);
+ throw new RuntimeException("Bad files directory is a file: " + badFilesDirPath);
+ }
+ } else if(! hdfs.mkdirs(badFilesDirPath) ) {
+ LOG.error("Unable to create directory for bad files: " + badFilesDirPath);
+ throw new RuntimeException("Unable to create a directory for bad files: " + badFilesDirPath);
+ }
+ } catch (IOException e) {
+ LOG.error("Unable to create archive dir ", e);
+ throw new RuntimeException(e.getMessage(), e);
+ }
+
+ // -- lock dir config
+ String lockDir = !conf.containsKey(Configs.LOCK_DIR) ? getDefaultLockDir(sourceDirPath) : conf.get(Configs.LOCK_DIR).toString() ;
+ this.lockDirPath = new Path(lockDir);
+
+ try {
+ if(hdfs.exists(lockDirPath)) {
+ if(! hdfs.isDirectory(lockDirPath) ) {
+ LOG.error("Lock directory is a file: " + lockDirPath);
+ throw new RuntimeException("Lock directory is a file: " + lockDirPath);
+ }
+ } else if(! hdfs.mkdirs(lockDirPath) ) {
+ LOG.error("Unable to create lock directory: " + lockDirPath);
+ throw new RuntimeException("Unable to create lock directory: " + lockDirPath);
+ }
+ } catch (IOException e) {
+ LOG.error("Unable to create lock dir: " + lockDirPath, e);
+ throw new RuntimeException(e.getMessage(), e);
+ }
+
+ // -- lock timeout
+ if( conf.get(Configs.LOCK_TIMEOUT) !=null )
+ this.lockTimeoutSec = Integer.parseInt(conf.get(Configs.LOCK_TIMEOUT).toString());
+
+ // -- enable/disable ACKing
+ Object ackers = conf.get(Config.TOPOLOGY_ACKER_EXECUTORS);
+ if( ackers!=null )
+ this.ackEnabled = ( Integer.parseInt( ackers.toString() ) > 0 );
+ else
+ this.ackEnabled = false;
+
+ // -- commit frequency - count
+ if( conf.get(Configs.COMMIT_FREQ_COUNT) != null )
+ commitFrequencyCount = Integer.parseInt( conf.get(Configs.COMMIT_FREQ_COUNT).toString() );
+
+ // -- commit frequency - seconds
+ if( conf.get(Configs.COMMIT_FREQ_SEC) != null )
+ commitFrequencySec = Integer.parseInt( conf.get(Configs.COMMIT_FREQ_SEC).toString() );
+
+ // -- max duplicate
+ if( conf.get(Configs.MAX_DUPLICATE) !=null )
+ maxDuplicates = Integer.parseInt( conf.get(Configs.MAX_DUPLICATE).toString() );
+
+ // -- clocks in sync
+ if( conf.get(Configs.CLOCKS_INSYNC) !=null )
+ clocksInSync = Boolean.parseBoolean(conf.get(Configs.CLOCKS_INSYNC).toString());
+
+ // -- spout id
+ spoutId = context.getThisComponentId();
+
+ // setup timer for commit elapse time tracking
+ setupCommitElapseTimer();
+ }
+
+ private String getDefaultLockDir(Path sourceDirPath) {
+ return sourceDirPath.toString() + Path.SEPARATOR + Configs.DEFAULT_LOCK_DIR;
+ }
+
+ private static void checkValidReader(String readerType) {
+ if(readerType.equalsIgnoreCase(Configs.TEXT) || readerType.equalsIgnoreCase(Configs.SEQ) )
+ return;
+ try {
+ Class<?> classType = Class.forName(readerType);
+ classType.getConstructor(FileSystem.class, Path.class, Map.class);
+ return;
+ } catch (ClassNotFoundException e) {
+ LOG.error(readerType + " not found in classpath.", e);
+ throw new IllegalArgumentException(readerType + " not found in classpath.", e);
+ } catch (NoSuchMethodException e) {
+ LOG.error(readerType + " is missing the expected constructor for Readers.", e);
+ throw new IllegalArgumentException(readerType + " is missing the expected constuctor for Readers.");
+ }
+ }
+
+ @Override
+ public void ack(Object msgId) {
+ MessageId id = (MessageId) msgId;
+ inflight.remove(id);
+ ++acksSinceLastCommit;
+ tracker.recordAckedOffset(id.offset);
+ commitProgress(tracker.getCommitPosition());
+ if(fileReadCompletely) {
+ markFileAsDone(reader.getFilePath());
+ reader = null;
+ }
+ super.ack(msgId);
+ }
+
+ private boolean canCommitNow() {
+ if( acksSinceLastCommit >= commitFrequencyCount )
+ return true;
+ return commitTimeElapsed.get();
+ }
+
+ @Override
+ public void fail(Object msgId) {
+ super.fail(msgId);
+ HdfsUtils.Pair<MessageId, List<Object>> item = HdfsUtils.Pair.of(msgId, inflight.remove(msgId));
+ retryList.add(item);
+ }
+
+ private FileReader pickNextFile() {
+ try {
+ // 1) If there are any abandoned files, pick oldest one
+ lock = getOldestExpiredLock();
+ if (lock != null) {
+ Path file = getFileForLockFile(lock.getLockFile(), sourceDirPath);
+ String resumeFromOffset = lock.getLastLogEntry().fileOffset;
+ LOG.info("Processing abandoned file : {}", file);
+ return createFileReader(file, resumeFromOffset);
+ }
+
+ // 2) If no abandoned files, then pick oldest file in sourceDirPath, lock it and rename it
+ Collection<Path> listing = HdfsUtils.listFilesByModificationTime(hdfs, sourceDirPath, 0);
+
+ for (Path file : listing) {
+ if( file.getName().contains(inprogress_suffix) )
+ continue;
+ LOG.info("Processing : {} ", file);
+ lock = FileLock.tryLock(hdfs, file, lockDirPath, spoutId);
+ if( lock==null ) {
+ LOG.info("Unable to get lock, so skipping file: {}", file);
+ continue; // could not lock, so try another file.
+ }
+ Path newFile = renameSelectedFile(file);
+ return createFileReader(newFile);
+ }
+
+ return null;
+ } catch (IOException e) {
+ LOG.error("Unable to select next file for consumption " + sourceDirPath, e);
+ return null;
+ }
+ }
+
+ /**
+ * If clocks in sync, then acquires the oldest expired lock
+ * Else, on first call, just remembers the oldest expired lock, on next call check if the lock is updated. if not updated then acquires the lock
+ * @return
+ * @throws IOException
+ */
+ private FileLock getOldestExpiredLock() throws IOException {
+ // 1 - acquire lock on dir
+ DirLock dirlock = DirLock.tryLock(hdfs, lockDirPath);
+ if (dirlock == null)
+ return null;
+ try {
+ // 2 - if clocks are in sync then simply take ownership of the oldest expired lock
+ if (clocksInSync)
+ return FileLock.acquireOldestExpiredLock(hdfs, lockDirPath, lockTimeoutSec, spoutId);
+
+ // 3 - if clocks are not in sync ..
+ if( lastExpiredLock == null ) {
+ // just make a note of the oldest expired lock now and check if its still unmodified after lockTimeoutSec
+ lastExpiredLock = FileLock.locateOldestExpiredLock(hdfs, lockDirPath, lockTimeoutSec, spoutId);
+ lastExpiredLockTime = System.currentTimeMillis();
+ return null;
+ }
+ // see if lockTimeoutSec time has elapsed since we last selected the lock file
+ if( hasExpired(lastExpiredLockTime) )
+ return null;
+
+ // If lock file has expired, then own it
+ FileLock.LogEntry lastEntry = FileLock.getLastEntry(hdfs, lastExpiredLock.getKey());
+ if( lastEntry.equals(lastExpiredLock.getValue()) ) {
+ FileLock result = FileLock.takeOwnership(hdfs, lastExpiredLock.getKey(), lastEntry, spoutId);
+ lastExpiredLock = null;
+ return result;
+ } else {
+ // if lock file has been updated since last time, then leave this lock file alone
+ lastExpiredLock = null;
+ return null;
+ }
+ } finally {
+ dirlock.release();
+ }
+ }
+
+ private boolean hasExpired(long lastModifyTime) {
+ return (System.currentTimeMillis() - lastModifyTime ) < lockTimeoutSec*1000;
+ }
+
+ /**
+ * Creates a reader that reads from beginning of file
+ * @param file file to read
+ * @return
+ * @throws IOException
+ */
+ private FileReader createFileReader(Path file)
+ throws IOException {
+ if(readerType.equalsIgnoreCase(Configs.SEQ))
+ return new SequenceFileReader(this.hdfs, file, conf);
+ if(readerType.equalsIgnoreCase(Configs.TEXT))
+ return new TextFileReader(this.hdfs, file, conf);
+
+ try {
+ Class<?> clsType = Class.forName(readerType);
+ Constructor<?> constructor = clsType.getConstructor(FileSystem.class, Path.class, Map.class);
+ return (FileReader) constructor.newInstance(this.hdfs, file, conf);
+ } catch (Exception e) {
+ LOG.error(e.getMessage(), e);
+ throw new RuntimeException("Unable to instantiate " + readerType, e);
+ }
+ }
+
+
+ /**
+ * Creates a reader that starts reading from 'offset'
+ * @param file the file to read
+ * @param offset the offset string should be understandable by the reader type being used
+ * @return
+ * @throws IOException
+ */
+ private FileReader createFileReader(Path file, String offset)
+ throws IOException {
+ if(readerType.equalsIgnoreCase(Configs.SEQ))
+ return new SequenceFileReader(this.hdfs, file, conf, offset);
+ if(readerType.equalsIgnoreCase(Configs.TEXT))
+ return new TextFileReader(this.hdfs, file, conf, offset);
+
+ try {
+ Class<?> clsType = Class.forName(readerType);
+ Constructor<?> constructor = clsType.getConstructor(FileSystem.class, Path.class, Map.class, String.class);
+ return (FileReader) constructor.newInstance(this.hdfs, file, conf, offset);
+ } catch (Exception e) {
+ LOG.error(e.getMessage(), e);
+ throw new RuntimeException("Unable to instantiate " + readerType, e);
+ }
+ }
+
+ // returns new path of renamed file
+ private Path renameSelectedFile(Path file)
+ throws IOException {
+ Path newFile = new Path( file.toString() + inprogress_suffix );
+ if( ! hdfs.rename(file, newFile) ) {
+ throw new IOException("Rename failed for file: " + file);
+ }
+ return newFile;
+ }
+
+ /** Returns the corresponding input file in the 'sourceDirPath' for the specified lock file.
+ * If no such file is found then returns null
+ */
+ private Path getFileForLockFile(Path lockFile, Path sourceDirPath)
+ throws IOException {
+ String lockFileName = lockFile.getName();
+ Path dataFile = new Path(sourceDirPath + lockFileName + inprogress_suffix);
+ if( hdfs.exists(dataFile) )
+ return dataFile;
+ dataFile = new Path(sourceDirPath + lockFileName);
+ if(hdfs.exists(dataFile))
+ return dataFile;
+ return null;
+ }
+
+
+ private Path renameCompletedFile(Path file) throws IOException {
+ String fileName = file.toString();
+ String fileNameMinusSuffix = fileName.substring(0, fileName.indexOf(inprogress_suffix));
+ String newName = new Path(fileNameMinusSuffix).getName();
+
+ Path newFile = new Path( archiveDirPath + Path.SEPARATOR + newName );
+ LOG.debug("Renaming complete file to " + newFile);
+ LOG.info("Completed file " + fileNameMinusSuffix );
+ if (!hdfs.rename(file, newFile) ) {
+ throw new IOException("Rename failed for file: " + file);
+ }
+ return newFile;
+ }
+
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ Fields fields = reader.getOutputFields();
+ declarer.declare(fields);
+ }
+
+ static class MessageId implements Comparable<MessageId> {
+ public long msgNumber; // tracks order in which msg came in
+ public String fullPath;
+ public FileOffset offset;
+
+ public MessageId(long msgNumber, Path fullPath, FileOffset offset) {
+ this.msgNumber = msgNumber;
+ this.fullPath = fullPath.toString();
+ this.offset = offset;
+ }
+
+ @Override
+ public String toString() {
+ return "{'" + fullPath + "':" + offset + "}";
+ }
+
+ @Override
+ public int compareTo(MessageId rhs) {
+ if (msgNumber<rhs.msgNumber)
+ return -1;
+ if(msgNumber>rhs.msgNumber)
+ return 1;
+ return 0;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/60e7a812/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/ParseException.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/ParseException.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/ParseException.java
new file mode 100644
index 0000000..fdf7751f
--- /dev/null
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/ParseException.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.hdfs.spout;
+
+public class ParseException extends Exception {
+ public ParseException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/60e7a812/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/ProgressTracker.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/ProgressTracker.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/ProgressTracker.java
new file mode 100644
index 0000000..2079ef4
--- /dev/null
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/ProgressTracker.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.hdfs.spout;
+
+import java.io.PrintStream;
+import java.util.TreeSet;
+
+public class ProgressTracker {
+
+ TreeSet<FileOffset> offsets = new TreeSet<>();
+
+ public void recordAckedOffset(FileOffset newOffset) {
+ if(newOffset==null)
+ return;
+ offsets.add(newOffset);
+
+ FileOffset currHead = offsets.first();
+
+ if( currHead.isNextOffset(newOffset) ) { // check is a minor optimization
+ trimHead();
+ }
+ }
+
+ // remove contiguous elements from the head of the heap
+ // e.g.: 1,2,3,4,10,11,12,15 => 4,10,11,12,15
+ private void trimHead() {
+ if(offsets.size()<=1)
+ return;
+ FileOffset head = offsets.first();
+ FileOffset head2 = offsets.higher(head);
+ if( head.isNextOffset(head2) ) {
+ offsets.pollFirst();
+ trimHead();
+ }
+ return;
+ }
+
+ public FileOffset getCommitPosition() {
+ if(!offsets.isEmpty())
+ return offsets.first().clone();
+ return null;
+ }
+
+ public void dumpState(PrintStream stream) {
+ stream.println(offsets);
+ }
+
+ public int size() {
+ return offsets.size();
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/60e7a812/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/SequenceFileReader.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/SequenceFileReader.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/SequenceFileReader.java
new file mode 100644
index 0000000..5ff7b75
--- /dev/null
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/SequenceFileReader.java
@@ -0,0 +1,227 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.hdfs.spout;
+
+import backtype.storm.tuple.Fields;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+// Todo: Track file offsets instead of line number
+public class SequenceFileReader<Key extends Writable,Value extends Writable>
+ extends AbstractFileReader {
+ private static final Logger LOG = LoggerFactory
+ .getLogger(SequenceFileReader.class);
+ private static final int DEFAULT_BUFF_SIZE = 4096;
+ public static final String BUFFER_SIZE = "hdfsspout.reader.buffer.bytes";
+
+ private final SequenceFile.Reader reader;
+
+ private final SequenceFileReader.Offset offset;
+
+ private static final String DEFAULT_KEYNAME = "key";
+ private static final String DEFAULT_VALNAME = "value";
+
+ private String keyName;
+ private String valueName;
+
+
+ private final Key key;
+ private final Value value;
+
+
+ public SequenceFileReader(FileSystem fs, Path file, Map conf)
+ throws IOException {
+ super(fs, file, new Fields(DEFAULT_KEYNAME, DEFAULT_VALNAME));
+ this.keyName = DEFAULT_KEYNAME;
+ this.valueName = DEFAULT_VALNAME;
+ int bufferSize = !conf.containsKey(BUFFER_SIZE) ? DEFAULT_BUFF_SIZE : Integer.parseInt( conf.get(BUFFER_SIZE).toString() );
+ this.reader = new SequenceFile.Reader(fs.getConf(), SequenceFile.Reader.file(file), SequenceFile.Reader.bufferSize(bufferSize) );
+ this.key = (Key) ReflectionUtils.newInstance(reader.getKeyClass(), fs.getConf() );
+ this.value = (Value) ReflectionUtils.newInstance(reader.getValueClass(), fs.getConf() );
+ this.offset = new SequenceFileReader.Offset(0,0,0);
+ }
+
+ public SequenceFileReader(FileSystem fs, Path file, Map conf, String offset)
+ throws IOException {
+ super(fs, file, new Fields(DEFAULT_KEYNAME, DEFAULT_VALNAME));
+ this.keyName = DEFAULT_KEYNAME;
+ this.valueName = DEFAULT_VALNAME;
+ int bufferSize = !conf.containsKey(BUFFER_SIZE) ? DEFAULT_BUFF_SIZE : Integer.parseInt( conf.get(BUFFER_SIZE).toString() );
+ this.offset = new SequenceFileReader.Offset(offset);
+ this.reader = new SequenceFile.Reader(fs.getConf(), SequenceFile.Reader.file(file), SequenceFile.Reader.bufferSize(bufferSize) );
+ this.reader.sync(this.offset.lastSyncPoint);
+ this.key = (Key) ReflectionUtils.newInstance(reader.getKeyClass(), fs.getConf() );
+ this.value = (Value) ReflectionUtils.newInstance(reader.getValueClass(), fs.getConf() );
+ }
+
+ public String getKeyName() {
+ return keyName;
+ }
+
+ public void setKeyName(String name) {
+ if (name == null)
+ throw new IllegalArgumentException("keyName cannot be null");
+ this.keyName = name;
+ setFields(keyName, valueName);
+
+ }
+
+ public String getValueName() {
+ return valueName;
+ }
+
+ public void setValueName(String name) {
+ if (name == null)
+ throw new IllegalArgumentException("valueName cannot be null");
+ this.valueName = name;
+ setFields(keyName, valueName);
+ }
+
+ public List<Object> next() throws IOException, ParseException {
+ if( reader.next(key, value) ) {
+ ArrayList<Object> result = new ArrayList<Object>(2);
+ Collections.addAll(result, key, value);
+ offset.increment(reader.syncSeen(), reader.getPosition() );
+ return result;
+ }
+ return null;
+ }
+
+ @Override
+ public void close() {
+ try {
+ reader.close();
+ } catch (IOException e) {
+ LOG.warn("Ignoring error when closing file " + getFilePath(), e);
+ }
+ }
+
+ public Offset getFileOffset() {
+ return offset;
+ }
+
+
+ public static class Offset implements FileOffset {
+ private long lastSyncPoint;
+ private long recordsSinceLastSync;
+ private long currentRecord;
+ private long currRecordEndOffset;
+ private long prevRecordEndOffset;
+
+ public Offset(long lastSyncPoint, long recordsSinceLastSync, long currentRecord) {
+ this(lastSyncPoint, recordsSinceLastSync, currentRecord, 0, 0 );
+ }
+
+ public Offset(long lastSyncPoint, long recordsSinceLastSync, long currentRecord
+ , long currRecordEndOffset, long prevRecordEndOffset) {
+ this.lastSyncPoint = lastSyncPoint;
+ this.recordsSinceLastSync = recordsSinceLastSync;
+ this.currentRecord = currentRecord;
+ this.prevRecordEndOffset = prevRecordEndOffset;
+ this.currRecordEndOffset = currRecordEndOffset;
+ }
+
+ public Offset(String offset) {
+ try {
+ String[] parts = offset.split(",");
+ this.lastSyncPoint = Long.parseLong(parts[0].split("=")[1]);
+ this.recordsSinceLastSync = Long.parseLong(parts[1].split("=")[1]);
+ this.currentRecord = Long.parseLong(parts[2].split("=")[1]);
+ this.prevRecordEndOffset = 0;
+ this.currRecordEndOffset = 0;
+ } catch (Exception e) {
+ throw new IllegalArgumentException("'" + offset +
+ "' cannot be interpreted. It is not in expected format for SequenceFileReader." +
+ " Format e.g. {sync=123:afterSync=345:record=67}");
+ }
+ }
+
+ @Override
+ public String toString() {
+ return '{' +
+ "sync=" + lastSyncPoint +
+ ":afterSync=" + recordsSinceLastSync +
+ ":record=" + currentRecord +
+ '}';
+ }
+
+ @Override
+ public boolean isNextOffset(FileOffset rhs) {
+ if(rhs instanceof Offset) {
+ Offset other = ((Offset) rhs);
+ return other.currentRecord > currentRecord+1;
+ }
+ return false;
+ }
+
+ @Override
+ public int compareTo(FileOffset o) {
+ Offset rhs = ((Offset) o);
+ if(currentRecord<rhs.currentRecord)
+ return -1;
+ if(currentRecord==rhs.currentRecord)
+ return 0;
+ return 1;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (!(o instanceof Offset)) return false;
+
+ Offset offset = (Offset) o;
+
+ return currentRecord == offset.currentRecord;
+ }
+
+ @Override
+ public int hashCode() {
+ return (int) (currentRecord ^ (currentRecord >>> 32));
+ }
+
+ void increment(boolean syncSeen, long newBytePosition) {
+ if(!syncSeen) {
+ ++recordsSinceLastSync;
+ } else {
+ recordsSinceLastSync = 1;
+ lastSyncPoint = prevRecordEndOffset;
+ }
+ ++currentRecord;
+ prevRecordEndOffset = currRecordEndOffset;
+ currentRecord = newBytePosition;
+ }
+
+ @Override
+ public Offset clone() {
+ return new Offset(lastSyncPoint, recordsSinceLastSync, currentRecord, currRecordEndOffset, prevRecordEndOffset);
+ }
+
+ } //class Offset
+} //class
http://git-wip-us.apache.org/repos/asf/storm/blob/60e7a812/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/TextFileReader.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/TextFileReader.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/TextFileReader.java
new file mode 100644
index 0000000..6e4a8b0
--- /dev/null
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/TextFileReader.java
@@ -0,0 +1,168 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.hdfs.spout;
+
+import backtype.storm.tuple.Fields;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+// Todo: Track file offsets instead of line number
+class TextFileReader extends AbstractFileReader {
+ public static final String CHARSET = "hdfsspout.reader.charset";
+ public static final String BUFFER_SIZE = "hdfsspout.reader.buffer.bytes";
+
+ public static final String DEFAULT_FIELD_NAME = "line";
+
+ private static final int DEFAULT_BUFF_SIZE = 4096;
+
+ private BufferedReader reader;
+ private final Logger LOG = LoggerFactory.getLogger(TextFileReader.class);
+ private TextFileReader.Offset offset;
+
+ public TextFileReader(FileSystem fs, Path file, Map conf) throws IOException {
+ super(fs, file, new Fields(DEFAULT_FIELD_NAME));
+ FSDataInputStream in = fs.open(file);
+ String charSet = (conf==null || !conf.containsKey(CHARSET) ) ? "UTF-8" : conf.get(CHARSET).toString();
+ int buffSz = (conf==null || !conf.containsKey(BUFFER_SIZE) ) ? DEFAULT_BUFF_SIZE : Integer.parseInt( conf.get(BUFFER_SIZE).toString() );
+ reader = new BufferedReader(new InputStreamReader(in, charSet), buffSz);
+ offset = new TextFileReader.Offset(0,0);
+ }
+
+ public TextFileReader(FileSystem fs, Path file, Map conf, String startOffset) throws IOException {
+ super(fs, file, new Fields(DEFAULT_FIELD_NAME));
+ offset = new TextFileReader.Offset(startOffset);
+ FSDataInputStream in = fs.open(file);
+ in.seek(offset.byteOffset);
+ String charSet = (conf==null || !conf.containsKey(CHARSET) ) ? "UTF-8" : conf.get(CHARSET).toString();
+ int buffSz = (conf==null || !conf.containsKey(BUFFER_SIZE) ) ? DEFAULT_BUFF_SIZE : Integer.parseInt( conf.get(BUFFER_SIZE).toString() );
+ reader = new BufferedReader(new InputStreamReader(in, charSet), buffSz);
+ }
+
+ public Offset getFileOffset() {
+ return offset.clone();
+ }
+
+ public List<Object> next() throws IOException, ParseException {
+ String line = reader.readLine();
+ if(line!=null) {
+ int strByteSize = line.getBytes().length;
+ offset.increment(strByteSize);
+ return Collections.singletonList((Object) line);
+ }
+ return null;
+ }
+
+ @Override
+ public void close() {
+ try {
+ reader.close();
+ } catch (IOException e) {
+ LOG.warn("Ignoring error when closing file " + getFilePath(), e);
+ }
+ }
+
+ public static class Offset implements FileOffset {
+ long byteOffset;
+ long lineNumber;
+
+ public Offset(long byteOffset, long lineNumber) {
+ this.byteOffset = byteOffset;
+ this.lineNumber = lineNumber;
+ }
+
+ public Offset(String offset) {
+ try {
+ String[] parts = offset.split(":");
+ this.byteOffset = Long.parseLong(parts[0].split("=")[1]);
+ this.lineNumber = Long.parseLong(parts[1].split("=")[1]);
+ } catch (Exception e) {
+ throw new IllegalArgumentException("'" + offset +
+ "' cannot be interpreted. It is not in expected format for TextFileReader." +
+ " Format e.g. {byte=123:line=5}");
+ }
+ }
+
+ @Override
+ public String toString() {
+ return '{' +
+ "byte=" + byteOffset +
+ ":line=" + lineNumber +
+ '}';
+ }
+
+ @Override
+ public boolean isNextOffset(FileOffset rhs) {
+ if(rhs instanceof Offset) {
+ Offset other = ((Offset) rhs);
+ return other.byteOffset > byteOffset &&
+ other.lineNumber == lineNumber+1;
+ }
+ return false;
+ }
+
+ @Override
+ public int compareTo(FileOffset o) {
+ Offset rhs = ((Offset)o);
+ if(lineNumber < rhs.lineNumber)
+ return -1;
+ if(lineNumber == rhs.lineNumber)
+ return 0;
+ return 1;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (!(o instanceof Offset)) return false;
+
+ Offset that = (Offset) o;
+
+ if (byteOffset != that.byteOffset)
+ return false;
+ return lineNumber == that.lineNumber;
+ }
+
+ @Override
+ public int hashCode() {
+ int result = (int) (byteOffset ^ (byteOffset >>> 32));
+ result = 31 * result + (int) (lineNumber ^ (lineNumber >>> 32));
+ return result;
+ }
+
+ void increment(int delta) {
+ ++lineNumber;
+ byteOffset += delta;
+ }
+
+ @Override
+ public Offset clone() {
+ return new Offset(byteOffset, lineNumber);
+ }
+ } //class Offset
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/60e7a812/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestDirLock.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestDirLock.java b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestDirLock.java
new file mode 100644
index 0000000..ea4b3a3
--- /dev/null
+++ b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestDirLock.java
@@ -0,0 +1,143 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.hdfs.spout;
+
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.IOException;
+
+public class TestDirLock {
+
+
+ static MiniDFSCluster.Builder builder;
+ static MiniDFSCluster hdfsCluster;
+ static FileSystem fs;
+ static String hdfsURI;
+ static Configuration conf = new HdfsConfiguration();
+
+
+ @Rule
+ public TemporaryFolder tempFolder = new TemporaryFolder();
+ private Path lockDir = new Path("/tmp/lockdir");
+
+
+ @BeforeClass
+ public static void setupClass() throws IOException {
+ builder = new MiniDFSCluster.Builder(new Configuration());
+ hdfsCluster = builder.build();
+ fs = hdfsCluster.getFileSystem();
+ hdfsURI = "hdfs://localhost:" + hdfsCluster.getNameNodePort() + "/";
+ }
+
+ @AfterClass
+ public static void teardownClass() throws IOException {
+ fs.close();
+ hdfsCluster.shutdown();
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ assert fs.mkdirs(lockDir) ;
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ fs.delete(lockDir, true);
+ }
+
+ @Test
+ public void testConcurrentLocking() throws Exception {
+// -Dlog4j.configuration=config
+ Logger.getRootLogger().setLevel(Level.ERROR);
+ DirLockingThread[] thds = startThreads(10, lockDir );
+ for (DirLockingThread thd : thds) {
+ thd.start();
+ }
+ System.err.println("Thread creation complete");
+ Thread.sleep(5000);
+ for (DirLockingThread thd : thds) {
+ thd.join(1000);
+ if(thd.isAlive() && thd.cleanExit)
+ System.err.println(thd.getName() + " did not exit cleanly");
+ Assert.assertTrue(thd.cleanExit);
+ }
+
+ Path lockFile = new Path(lockDir + Path.SEPARATOR + DirLock.DIR_LOCK_FILE);
+ Assert.assertFalse(fs.exists(lockFile));
+ }
+
+
+
+ private DirLockingThread[] startThreads(int thdCount, Path dir)
+ throws IOException {
+ DirLockingThread[] result = new DirLockingThread[thdCount];
+ for (int i = 0; i < thdCount; i++) {
+ result[i] = new DirLockingThread(i, fs, dir);
+ }
+ return result;
+ }
+
+
+ class DirLockingThread extends Thread {
+
+ private final FileSystem fs;
+ private final Path dir;
+ public boolean cleanExit = false;
+
+ public DirLockingThread(int thdNum,FileSystem fs, Path dir) throws IOException {
+ this.fs = fs;
+ this.dir = dir;
+ Thread.currentThread().setName("DirLockingThread-" + thdNum);
+ }
+
+ @Override
+ public void run() {
+ try {
+ DirLock lock;
+ do {
+ lock = DirLock.tryLock(fs, dir);
+ if(lock==null) {
+ System.out.println("Retrying lock - " + Thread.currentThread().getId());
+ }
+ } while (lock==null);
+ lock.release();
+ cleanExit= true;
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+
+ }
+
+ }
+}