You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pa...@apache.org on 2016/01/14 21:56:05 UTC

[14/20] storm git commit: Fixing bugs related to switching to next file in setting fileReadCompletely=true/false and reader=null for ACK mode reading. Added UTs. incorprated review comments from Satish and others

Fixing bugs related to switching to next file in setting fileReadCompletely=true/false and reader=null for ACK mode reading. Added UTs. incorprated review comments from Satish and others


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/1e52f083
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/1e52f083
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/1e52f083

Branch: refs/heads/master
Commit: 1e52f0837aed03cc47b86b1e02037b6136c8c8b0
Parents: 152856d
Author: Roshan Naik <ro...@hortonworks.com>
Authored: Mon Dec 21 20:22:03 2015 -0800
Committer: Roshan Naik <ro...@hortonworks.com>
Committed: Thu Jan 14 11:34:56 2016 -0800

----------------------------------------------------------------------
 .../hdfs/common/CmpFilesByModificationTime.java |  32 -----
 .../org/apache/storm/hdfs/common/HdfsUtils.java |   4 +-
 .../storm/hdfs/common/ModifTimeComparator.java  |  32 +++++
 .../storm/hdfs/spout/AbstractFileReader.java    |   2 -
 .../org/apache/storm/hdfs/spout/FileLock.java   |  17 ++-
 .../org/apache/storm/hdfs/spout/HdfsSpout.java  | 101 ++++++--------
 .../apache/storm/hdfs/spout/TextFileReader.java |  19 +--
 .../apache/storm/hdfs/spout/TestHdfsSpout.java  | 134 +++++++++++++++----
 8 files changed, 207 insertions(+), 134 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/1e52f083/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/CmpFilesByModificationTime.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/CmpFilesByModificationTime.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/CmpFilesByModificationTime.java
deleted file mode 100644
index 67420aa..0000000
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/CmpFilesByModificationTime.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.hdfs.common;
-
-import org.apache.hadoop.fs.FileStatus;
-
-import java.util.Comparator;
-
-
-public class CmpFilesByModificationTime
-        implements Comparator<FileStatus> {
-   @Override
-    public int compare(FileStatus o1, FileStatus o2) {
-      return new Long(o1.getModificationTime()).compareTo( o1.getModificationTime() );
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/1e52f083/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/HdfsUtils.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/HdfsUtils.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/HdfsUtils.java
index e8df78d..86b9ee8 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/HdfsUtils.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/HdfsUtils.java
@@ -49,7 +49,7 @@ public class HdfsUtils {
         fstats.add(fileStatus);
       }
     }
-    Collections.sort(fstats, new CmpFilesByModificationTime() );
+    Collections.sort(fstats, new ModifTimeComparator() );
 
     ArrayList<Path> result = new ArrayList<>(fstats.size());
     for (LocatedFileStatus fstat : fstats) {
@@ -59,7 +59,7 @@ public class HdfsUtils {
   }
 
   /**
-   * Returns true if succeeded. False if file already exists. throws if there was unexpected problem
+   * Returns null if file already exists. throws if there was unexpected problem
    */
   public static FSDataOutputStream tryCreateFile(FileSystem fs, Path file) throws IOException {
     try {

http://git-wip-us.apache.org/repos/asf/storm/blob/1e52f083/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/ModifTimeComparator.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/ModifTimeComparator.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/ModifTimeComparator.java
new file mode 100644
index 0000000..de5613e
--- /dev/null
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/ModifTimeComparator.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.hdfs.common;
+
+import org.apache.hadoop.fs.FileStatus;
+
+import java.util.Comparator;
+
+
+public class ModifTimeComparator
+        implements Comparator<FileStatus> {
+   @Override
+    public int compare(FileStatus o1, FileStatus o2) {
+      return new Long(o1.getModificationTime()).compareTo( o1.getModificationTime() );
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/1e52f083/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/AbstractFileReader.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/AbstractFileReader.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/AbstractFileReader.java
index 09dc0d3..6efea81 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/AbstractFileReader.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/AbstractFileReader.java
@@ -26,13 +26,11 @@ import org.apache.hadoop.fs.Path;
 abstract class AbstractFileReader implements FileReader {
 
   private final Path file;
-  private final FileSystem fs;
   private Fields fields;
 
   public AbstractFileReader(FileSystem fs, Path file, Fields fieldNames) {
     if (fs == null || file == null)
       throw new IllegalArgumentException("file and filesystem args cannot be null");
-    this.fs = fs;
     this.file = file;
     this.fields = fieldNames;
   }

http://git-wip-us.apache.org/repos/asf/storm/blob/1e52f083/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileLock.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileLock.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileLock.java
index b40d1dd..89ed855 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileLock.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileLock.java
@@ -48,7 +48,7 @@ public class FileLock {
   private final FSDataOutputStream lockFileStream;
   private LogEntry lastEntry;
 
-  private static final Logger log = LoggerFactory.getLogger(DirLock.class);
+  private static final Logger log = LoggerFactory.getLogger(FileLock.class);
 
   private FileLock(FileSystem fs, Path lockFile, FSDataOutputStream lockFileStream, String spoutId)
           throws IOException {
@@ -89,9 +89,15 @@ public class FileLock {
     lastEntry = entry; // update this only after writing to hdfs
   }
 
+  /** Release lock by deleting file
+   * @throws IOException if lock file could not be deleted
+   */
   public void release() throws IOException {
     lockFileStream.close();
-    fs.delete(lockFile, false);
+    if(!fs.delete(lockFile, false)){
+      log.warn("Unable to delete lock file");
+      throw new IOException("Unable to delete lock file");
+    }
     log.debug("Released lock file {}", lockFile);
   }
 
@@ -109,10 +115,10 @@ public class FileLock {
     try {
       FSDataOutputStream ostream = HdfsUtils.tryCreateFile(fs, lockFile);
       if (ostream != null) {
-        log.info("Acquired lock on file {}. LockFile=", fileToLock, lockFile);
+        log.debug("Acquired lock on file {}. LockFile=", fileToLock, lockFile);
         return new FileLock(fs, lockFile, ostream, spoutId);
       } else {
-        log.info("Cannot lock file {} as its already locked.", fileToLock);
+        log.debug("Cannot lock file {} as its already locked.", fileToLock);
         return null;
       }
     } catch (IOException e) {
@@ -166,7 +172,6 @@ public class FileLock {
     return LogEntry.deserialize(lastLine);
   }
 
-  // takes ownership of the lock file
   /**
    * Takes ownership of the lock file if possible.
    * @param lockFile
@@ -184,7 +189,7 @@ public class FileLock {
       return new FileLock(fs, lockFile, spoutId, lastEntry);
     } catch (RemoteException e) {
       if (e.unwrapRemoteException() instanceof AlreadyBeingCreatedException) {
-        log.info("Lock file {} is currently open. Cannot transfer ownership.", lockFile);
+        log.warn("Lock file {} is currently open. Cannot transfer ownership now. Will try later.", lockFile);
         return null;
       } else { // unexpected error
         throw e;

http://git-wip-us.apache.org/repos/asf/storm/blob/1e52f083/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java
index 50c2172..3d95ea7 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java
@@ -86,7 +86,7 @@ public class HdfsSpout extends BaseRichSpout {
   private int acksSinceLastCommit = 0 ;
   private final AtomicBoolean commitTimeElapsed = new AtomicBoolean(false);
   private final Timer commitTimer = new Timer();
-  private boolean fileReadCompletely = false;
+  private boolean fileReadCompletely = true;
 
   private String configKey = Configs.DEFAULT_HDFS_CONFIG_KEY; // key for hdfs kerberos configs
 
@@ -130,12 +130,15 @@ public class HdfsSpout extends BaseRichSpout {
         // 3) Select a new file if one is not open already
         if (reader == null) {
           reader = pickNextFile();
+          fileReadCompletely=false;
           if (reader == null) {
             LOG.debug("Currently no new files to process under : " + sourceDirPath);
             return;
           }
         }
-
+        if( fileReadCompletely ) { // wait for more ACKs before proceeding
+          return;
+        }
         // 4) Read record from file, emit to collector and record progress
         List<Object> tuple = reader.next();
         if (tuple != null) {
@@ -145,7 +148,7 @@ public class HdfsSpout extends BaseRichSpout {
           emitData(tuple, msgId);
 
           if(!ackEnabled) {
-            ++acksSinceLastCommit; // assume message is immediately acked in non-ack mode
+            ++acksSinceLastCommit; // assume message is immediately ACKed in non-ack mode
             commitProgress(reader.getFileOffset());
           } else {
             commitProgress(tracker.getCommitPosition());
@@ -175,6 +178,8 @@ public class HdfsSpout extends BaseRichSpout {
 
   // will commit progress into lock file if commit threshold is reached
   private void commitProgress(FileOffset position) {
+    if(position==null)
+      return;
     if ( lock!=null && canCommitNow() ) {
       try {
         lock.heartbeat(position.toString());
@@ -205,15 +210,13 @@ public class HdfsSpout extends BaseRichSpout {
   }
 
   private void markFileAsDone(Path filePath) {
-    fileReadCompletely = false;
     try {
       Path newFile = renameCompletedFile(reader.getFilePath());
       LOG.info("Completed processing {}", newFile);
     } catch (IOException e) {
       LOG.error("Unable to archive completed file" + filePath, e);
     }
-    unlockAndCloseReader();
-
+    closeReaderAndResetTrackers();
   }
 
   private void markFileAsBad(Path file) {
@@ -222,19 +225,22 @@ public class HdfsSpout extends BaseRichSpout {
     String originalName = new Path(fileNameMinusSuffix).getName();
     Path  newFile = new Path( badFilesDirPath + Path.SEPARATOR + originalName);
 
-    LOG.info("Moving bad file {} to {} ", originalName, newFile);
+    LOG.info("Moving bad file {} to {}. Processed it till offset {}", originalName, newFile, tracker.getCommitPosition());
     try {
       if (!hdfs.rename(file, newFile) ) { // seems this can fail by returning false or throwing exception
         throw new IOException("Move failed for bad file: " + file); // convert false ret value to exception
       }
     } catch (IOException e) {
-      LOG.warn("Error moving bad file: " + file + ". to destination :  " + newFile);
+      LOG.warn("Error moving bad file: " + file + " to destination " + newFile, e);
     }
-
-    unlockAndCloseReader();
+    closeReaderAndResetTrackers();
   }
 
-  private void unlockAndCloseReader() {
+  private void closeReaderAndResetTrackers() {
+    inflight.clear();
+    tracker.offsets.clear();
+    retryList.clear();
+
     reader.close();
     reader = null;
     try {
@@ -245,8 +251,6 @@ public class HdfsSpout extends BaseRichSpout {
     lock = null;
   }
 
-
-
   protected void emitData(List<Object> tuple, MessageId id) {
     LOG.debug("Emitting - {}", id);
     this.collector.emit(tuple, id);
@@ -306,21 +310,7 @@ public class HdfsSpout extends BaseRichSpout {
       throw new RuntimeException(Configs.ARCHIVE_DIR + " setting is required");
     }
     this.archiveDirPath = new Path( conf.get(Configs.ARCHIVE_DIR).toString() );
-
-    try {
-      if(hdfs.exists(archiveDirPath)) {
-        if(! hdfs.isDirectory(archiveDirPath) ) {
-          LOG.error("Archive directory is a file. " + archiveDirPath);
-          throw new RuntimeException("Archive directory is a file. " + archiveDirPath);
-        }
-      } else if(! hdfs.mkdirs(archiveDirPath) ) {
-        LOG.error("Unable to create archive directory. " + archiveDirPath);
-        throw new RuntimeException("Unable to create archive directory " + archiveDirPath);
-      }
-    } catch (IOException e) {
-      LOG.error("Unable to create archive dir ", e);
-      throw new RuntimeException("Unable to create archive directory ", e);
-    }
+    validateOrMakeDir(hdfs, archiveDirPath, "Archive");
 
     // -- bad files dir config
     if ( !conf.containsKey(Configs.BAD_DIR) ) {
@@ -329,23 +319,9 @@ public class HdfsSpout extends BaseRichSpout {
     }
 
     this.badFilesDirPath = new Path(conf.get(Configs.BAD_DIR).toString());
+    validateOrMakeDir(hdfs, badFilesDirPath, "bad files");
 
-    try {
-      if(hdfs.exists(badFilesDirPath)) {
-        if(! hdfs.isDirectory(badFilesDirPath) ) {
-          LOG.error("Bad files directory is a file: " + badFilesDirPath);
-          throw new RuntimeException("Bad files directory is a file: " + badFilesDirPath);
-        }
-      } else if(! hdfs.mkdirs(badFilesDirPath) ) {
-        LOG.error("Unable to create directory for bad files: " + badFilesDirPath);
-        throw new RuntimeException("Unable to create a directory for bad files: " + badFilesDirPath);
-      }
-    } catch (IOException e) {
-      LOG.error("Unable to create archive dir ", e);
-      throw new RuntimeException(e.getMessage(), e);
-    }
-
-    // -- ignore filename suffix
+            // -- ignore filename suffix
     if ( conf.containsKey(Configs.IGNORE_SUFFIX) ) {
       this.ignoreSuffix = conf.get(Configs.IGNORE_SUFFIX).toString();
     }
@@ -353,21 +329,7 @@ public class HdfsSpout extends BaseRichSpout {
     // -- lock dir config
     String lockDir = !conf.containsKey(Configs.LOCK_DIR) ? getDefaultLockDir(sourceDirPath) : conf.get(Configs.LOCK_DIR).toString() ;
     this.lockDirPath = new Path(lockDir);
-
-    try {
-      if(hdfs.exists(lockDirPath)) {
-        if(! hdfs.isDirectory(lockDirPath) ) {
-          LOG.error("Lock directory is a file: " + lockDirPath);
-          throw new RuntimeException("Lock directory is a file: " + lockDirPath);
-        }
-      } else if(! hdfs.mkdirs(lockDirPath) ) {
-        LOG.error("Unable to create lock directory: " + lockDirPath);
-        throw new RuntimeException("Unable to create lock directory: " + lockDirPath);
-      }
-    } catch (IOException e) {
-      LOG.error("Unable to create lock dir: " + lockDirPath, e);
-      throw new RuntimeException(e.getMessage(), e);
-    }
+    validateOrMakeDir(hdfs,lockDirPath,"locks");
 
     // -- lock timeout
     if( conf.get(Configs.LOCK_TIMEOUT) !=null )
@@ -403,6 +365,23 @@ public class HdfsSpout extends BaseRichSpout {
     setupCommitElapseTimer();
   }
 
+  private static void validateOrMakeDir(FileSystem fs, Path dir, String dirDescription) {
+    try {
+      if(fs.exists(dir)) {
+        if(! fs.isDirectory(dir) ) {
+          LOG.error(dirDescription + " directory is a file, not a dir. " + dir);
+          throw new RuntimeException(dirDescription + " directory is a file, not a dir. " + dir);
+        }
+      } else if(! fs.mkdirs(dir) ) {
+        LOG.error("Unable to create " + dirDescription + " directory " + dir);
+        throw new RuntimeException("Unable to create " + dirDescription + " directory " + dir);
+      }
+    } catch (IOException e) {
+      LOG.error("Unable to create " + dirDescription + " directory " + dir, e);
+      throw new RuntimeException("Unable to create " + dirDescription + " directory " + dir, e);
+    }
+  }
+
   private String getDefaultLockDir(Path sourceDirPath) {
     return sourceDirPath.toString() + Path.SEPARATOR + Configs.DEFAULT_LOCK_DIR;
   }
@@ -425,12 +404,14 @@ public class HdfsSpout extends BaseRichSpout {
 
   @Override
   public void ack(Object msgId) {
+    if(!ackEnabled)
+      throw new IllegalStateException("Received an ACKs when ack-ing is disabled" );
     MessageId id = (MessageId) msgId;
     inflight.remove(id);
     ++acksSinceLastCommit;
     tracker.recordAckedOffset(id.offset);
     commitProgress(tracker.getCommitPosition());
-    if(fileReadCompletely) {
+    if(fileReadCompletely && inflight.isEmpty()) {
       markFileAsDone(reader.getFilePath());
       reader = null;
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/1e52f083/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/TextFileReader.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/TextFileReader.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/TextFileReader.java
index 6e4a8b0..b998d30 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/TextFileReader.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/TextFileReader.java
@@ -46,19 +46,20 @@ class TextFileReader extends AbstractFileReader {
   private TextFileReader.Offset offset;
 
   public TextFileReader(FileSystem fs, Path file, Map conf) throws IOException {
-    super(fs, file, new Fields(DEFAULT_FIELD_NAME));
-    FSDataInputStream in = fs.open(file);
-    String charSet = (conf==null || !conf.containsKey(CHARSET) ) ? "UTF-8" : conf.get(CHARSET).toString();
-    int buffSz = (conf==null || !conf.containsKey(BUFFER_SIZE) ) ? DEFAULT_BUFF_SIZE : Integer.parseInt( conf.get(BUFFER_SIZE).toString() );
-    reader = new BufferedReader(new InputStreamReader(in, charSet), buffSz);
-    offset = new TextFileReader.Offset(0,0);
+    this(fs, file, conf, new TextFileReader.Offset(0,0) );
   }
 
   public TextFileReader(FileSystem fs, Path file, Map conf, String startOffset) throws IOException {
+    this(fs, file, conf, new TextFileReader.Offset(startOffset) );
+  }
+
+  private TextFileReader(FileSystem fs, Path file, Map conf, TextFileReader.Offset startOffset) throws IOException {
     super(fs, file, new Fields(DEFAULT_FIELD_NAME));
-    offset = new TextFileReader.Offset(startOffset);
+    offset = startOffset;
     FSDataInputStream in = fs.open(file);
-    in.seek(offset.byteOffset);
+    if(offset.byteOffset>0)
+      in.seek(offset.byteOffset);
+
     String charSet = (conf==null || !conf.containsKey(CHARSET) ) ? "UTF-8" : conf.get(CHARSET).toString();
     int buffSz = (conf==null || !conf.containsKey(BUFFER_SIZE) ) ? DEFAULT_BUFF_SIZE : Integer.parseInt( conf.get(BUFFER_SIZE).toString() );
     reader = new BufferedReader(new InputStreamReader(in, charSet), buffSz);
@@ -97,6 +98,8 @@ class TextFileReader extends AbstractFileReader {
     }
 
     public Offset(String offset) {
+      if(offset!=null)
+        throw new IllegalArgumentException("offset cannot be null");
       try {
         String[] parts = offset.split(":");
         this.byteOffset = Long.parseLong(parts[0].split("=")[1]);

http://git-wip-us.apache.org/repos/asf/storm/blob/1e52f083/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSpout.java b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSpout.java
index 98d21f8..f64400a 100644
--- a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSpout.java
+++ b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSpout.java
@@ -18,6 +18,7 @@
 
 package org.apache.storm.hdfs.spout;
 
+import backtype.storm.Config;
 import backtype.storm.spout.SpoutOutputCollector;
 import backtype.storm.task.TopologyContext;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
@@ -48,6 +49,7 @@ import java.io.BufferedReader;
 import java.io.File;
 import java.io.IOException;
 import java.io.InputStreamReader;
+import java.lang.reflect.Field;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
@@ -109,31 +111,51 @@ public class TestHdfsSpout {
   }
 
   @Test
-  public void testSimpleText() throws IOException {
+  public void testSimpleText_noACK() throws IOException {
     Path file1 = new Path(source.toString() + "/file1.txt");
     createTextFile(file1, 5);
 
     Path file2 = new Path(source.toString() + "/file2.txt");
     createTextFile(file2, 5);
 
-    listDir(source);
-
     Map conf = getDefaultConfig();
     conf.put(Configs.COMMIT_FREQ_COUNT, "1");
     conf.put(Configs.COMMIT_FREQ_SEC, "1");
+
     HdfsSpout spout = makeSpout(0, conf, Configs.TEXT);
 
-    List<String> res = runSpout(spout,"r11", "a0", "a1", "a2", "a3", "a4");
-    for (String re : res) {
-      System.err.println(re);
-    }
+    runSpout(spout,"r11");
 
-    listCompletedDir();
     Path arc1 = new Path(archive.toString() + "/file1.txt");
     Path arc2 = new Path(archive.toString() + "/file2.txt");
     checkCollectorOutput_txt((MockCollector) spout.getCollector(), arc1, arc2);
   }
 
+  @Test
+  public void testSimpleText_ACK() throws IOException {
+    Path file1 = new Path(source.toString() + "/file1.txt");
+    createTextFile(file1, 5);
+
+    Path file2 = new Path(source.toString() + "/file2.txt");
+    createTextFile(file2, 5);
+
+    Map conf = getDefaultConfig();
+    conf.put(Configs.COMMIT_FREQ_COUNT, "1");
+    conf.put(Configs.COMMIT_FREQ_SEC, "1");
+    conf.put(Config.TOPOLOGY_ACKER_EXECUTORS, "1"); // enable acking
+    HdfsSpout spout = makeSpout(0, conf, Configs.TEXT);
+
+    // consume file 1
+    runSpout(spout, "r6", "a0", "a1", "a2", "a3", "a4");
+    Path arc1 = new Path(archive.toString() + "/file1.txt");
+    checkCollectorOutput_txt((MockCollector) spout.getCollector(), arc1);
+
+    // consume file 2
+    runSpout(spout, "r6", "a5", "a6", "a7", "a8", "a9");
+    Path arc2 = new Path(archive.toString() + "/file2.txt");
+    checkCollectorOutput_txt((MockCollector) spout.getCollector(), arc1, arc2);
+  }
+
   private void checkCollectorOutput_txt(MockCollector collector, Path... txtFiles) throws IOException {
     ArrayList<String> expected = new ArrayList<>();
     for (Path txtFile : txtFiles) {
@@ -190,11 +212,6 @@ public class TestHdfsSpout {
     return result;
   }
 
-  private void listCompletedDir() throws IOException {
-    listDir(source);
-    listDir(archive);
-  }
-
   private List<String> listDir(Path p) throws IOException {
     ArrayList<String> result = new ArrayList<>();
     System.err.println("*** Listing " + p);
@@ -209,28 +226,97 @@ public class TestHdfsSpout {
 
 
   @Test
-  public void testSimpleSequenceFile() throws IOException {
+  public void testMultipleFileConsumption_Ack() throws Exception {
+    Path file1 = new Path(source.toString() + "/file1.txt");
+    createTextFile(file1, 5);
+
+    Map conf = getDefaultConfig();
+    conf.put(Configs.COMMIT_FREQ_COUNT, "1");
+    conf.put(Configs.COMMIT_FREQ_SEC, "1");
+    conf.put(Config.TOPOLOGY_ACKER_EXECUTORS, "1"); // enable ACKing
+    HdfsSpout spout = makeSpout(0, conf, Configs.TEXT);
+
+    // read few lines from file1 dont ack
+    runSpout(spout, "r3");
+    FileReader reader = getField(spout, "reader");
+    Assert.assertNotNull(reader);
+    Assert.assertEquals(false, getBoolField(spout, "fileReadCompletely"));
+
+    // read remaining lines
+    runSpout(spout, "r3");
+    reader = getField(spout, "reader");
+    Assert.assertNotNull(reader);
+    Assert.assertEquals(true, getBoolField(spout, "fileReadCompletely") );
+
+    // ack few
+    runSpout(spout, "a0", "a1", "a2");
+    reader = getField(spout, "reader");
+    Assert.assertNotNull(reader);
+    Assert.assertEquals(true, getBoolField(spout, "fileReadCompletely"));
+
+    //ack rest
+    runSpout(spout, "a3", "a4");
+    reader = getField(spout, "reader");
+    Assert.assertNull(reader);
+    Assert.assertEquals(true, getBoolField(spout, "fileReadCompletely"));
+
+
+    // go to next file
+    Path file2 = new Path(source.toString() + "/file2.txt");
+    createTextFile(file2, 5);
+
+    // Read 1 line
+    runSpout(spout, "r1");
+    Assert.assertNotNull(getField(spout, "reader"));
+    Assert.assertEquals(false, getBoolField(spout, "fileReadCompletely"));
 
+    // ack 1 tuple
+    runSpout(spout, "a5");
+    Assert.assertNotNull(getField(spout, "reader"));
+    Assert.assertEquals(false, getBoolField(spout, "fileReadCompletely"));
+
+
+    // read and ack remaining lines
+    runSpout(spout, "r5", "a6", "a7", "a8", "a9");
+    Assert.assertNull(getField(spout, "reader"));
+    Assert.assertEquals(true, getBoolField(spout, "fileReadCompletely"));
+  }
+
+  private static <T> T getField(HdfsSpout spout, String fieldName) throws NoSuchFieldException, IllegalAccessException {
+    Field readerFld = HdfsSpout.class.getDeclaredField(fieldName);
+    readerFld.setAccessible(true);
+    return (T) readerFld.get(spout);
+  }
+
+  private static boolean getBoolField(HdfsSpout spout, String fieldName) throws NoSuchFieldException, IllegalAccessException {
+    Field readerFld = HdfsSpout.class.getDeclaredField(fieldName);
+    readerFld.setAccessible(true);
+    return readerFld.getBoolean(spout);
+  }
+
+
+  @Test
+  public void testSimpleSequenceFile() throws IOException {
+    //1) create a couple files to consume
     source = new Path("/tmp/hdfsspout/source");
     fs.mkdirs(source);
     archive = new Path("/tmp/hdfsspout/archive");
     fs.mkdirs(archive);
 
     Path file1 = new Path(source + "/file1.seq");
-    createSeqFile(fs, file1);
+    createSeqFile(fs, file1, 5);
 
     Path file2 = new Path(source + "/file2.seq");
-    createSeqFile(fs, file2);
+    createSeqFile(fs, file2, 5);
 
     Map conf = getDefaultConfig();
     HdfsSpout spout = makeSpout(0, conf, Configs.SEQ);
 
-    List<String> res = runSpout(spout, "r11", "a0", "a1", "a2", "a3", "a4");
-    for (String re : res) {
-      System.err.println(re);
-    }
+    // consume both files
+    List<String> res = runSpout(spout, "r11");
+    Assert.assertEquals(10, res.size());
 
-    listDir(archive);
+    Assert.assertEquals(2, listDir(archive).size());
 
 
     Path f1 = new Path(archive + "/file1.seq");
@@ -401,7 +487,7 @@ public class TestHdfsSpout {
    * fN - fail, item number: N
    */
 
-  private List<String> runSpout(HdfsSpout spout,  String...  cmds) {
+  private List<String> runSpout(HdfsSpout spout, String...  cmds) {
     MockCollector collector = (MockCollector) spout.getCollector();
       for(String cmd : cmds) {
         if(cmd.startsWith("r")) {
@@ -437,7 +523,7 @@ public class TestHdfsSpout {
 
 
 
-  private static void createSeqFile(FileSystem fs, Path file) throws IOException {
+  private static void createSeqFile(FileSystem fs, Path file, int rowCount) throws IOException {
 
     Configuration conf = new Configuration();
     try {
@@ -446,7 +532,7 @@ public class TestHdfsSpout {
       }
 
       SequenceFile.Writer w = SequenceFile.createWriter(fs, conf, file, IntWritable.class, Text.class );
-      for (int i = 0; i < 5; i++) {
+      for (int i = 0; i < rowCount; i++) {
         w.append(new IntWritable(i), new Text("line " + i));
       }
       w.close();