You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pa...@apache.org on 2016/01/14 21:56:05 UTC
[14/20] storm git commit: Fixing bugs related to switching to next
file in setting fileReadCompletely=true/false and reader=null for ACK mode
reading. Added UTs. incorprated review comments from Satish and others
Fixing bugs related to switching to next file in setting fileReadCompletely=true/false and reader=null for ACK mode reading. Added UTs. incorprated review comments from Satish and others
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/1e52f083
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/1e52f083
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/1e52f083
Branch: refs/heads/master
Commit: 1e52f0837aed03cc47b86b1e02037b6136c8c8b0
Parents: 152856d
Author: Roshan Naik <ro...@hortonworks.com>
Authored: Mon Dec 21 20:22:03 2015 -0800
Committer: Roshan Naik <ro...@hortonworks.com>
Committed: Thu Jan 14 11:34:56 2016 -0800
----------------------------------------------------------------------
.../hdfs/common/CmpFilesByModificationTime.java | 32 -----
.../org/apache/storm/hdfs/common/HdfsUtils.java | 4 +-
.../storm/hdfs/common/ModifTimeComparator.java | 32 +++++
.../storm/hdfs/spout/AbstractFileReader.java | 2 -
.../org/apache/storm/hdfs/spout/FileLock.java | 17 ++-
.../org/apache/storm/hdfs/spout/HdfsSpout.java | 101 ++++++--------
.../apache/storm/hdfs/spout/TextFileReader.java | 19 +--
.../apache/storm/hdfs/spout/TestHdfsSpout.java | 134 +++++++++++++++----
8 files changed, 207 insertions(+), 134 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/1e52f083/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/CmpFilesByModificationTime.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/CmpFilesByModificationTime.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/CmpFilesByModificationTime.java
deleted file mode 100644
index 67420aa..0000000
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/CmpFilesByModificationTime.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.hdfs.common;
-
-import org.apache.hadoop.fs.FileStatus;
-
-import java.util.Comparator;
-
-
-public class CmpFilesByModificationTime
- implements Comparator<FileStatus> {
- @Override
- public int compare(FileStatus o1, FileStatus o2) {
- return new Long(o1.getModificationTime()).compareTo( o1.getModificationTime() );
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/1e52f083/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/HdfsUtils.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/HdfsUtils.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/HdfsUtils.java
index e8df78d..86b9ee8 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/HdfsUtils.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/HdfsUtils.java
@@ -49,7 +49,7 @@ public class HdfsUtils {
fstats.add(fileStatus);
}
}
- Collections.sort(fstats, new CmpFilesByModificationTime() );
+ Collections.sort(fstats, new ModifTimeComparator() );
ArrayList<Path> result = new ArrayList<>(fstats.size());
for (LocatedFileStatus fstat : fstats) {
@@ -59,7 +59,7 @@ public class HdfsUtils {
}
/**
- * Returns true if succeeded. False if file already exists. throws if there was unexpected problem
+ * Returns null if file already exists. throws if there was unexpected problem
*/
public static FSDataOutputStream tryCreateFile(FileSystem fs, Path file) throws IOException {
try {
http://git-wip-us.apache.org/repos/asf/storm/blob/1e52f083/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/ModifTimeComparator.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/ModifTimeComparator.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/ModifTimeComparator.java
new file mode 100644
index 0000000..de5613e
--- /dev/null
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/ModifTimeComparator.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.hdfs.common;
+
+import org.apache.hadoop.fs.FileStatus;
+
+import java.util.Comparator;
+
+
+public class ModifTimeComparator
+ implements Comparator<FileStatus> {
+ @Override
+ public int compare(FileStatus o1, FileStatus o2) {
+ return new Long(o1.getModificationTime()).compareTo( o1.getModificationTime() );
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/1e52f083/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/AbstractFileReader.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/AbstractFileReader.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/AbstractFileReader.java
index 09dc0d3..6efea81 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/AbstractFileReader.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/AbstractFileReader.java
@@ -26,13 +26,11 @@ import org.apache.hadoop.fs.Path;
abstract class AbstractFileReader implements FileReader {
private final Path file;
- private final FileSystem fs;
private Fields fields;
public AbstractFileReader(FileSystem fs, Path file, Fields fieldNames) {
if (fs == null || file == null)
throw new IllegalArgumentException("file and filesystem args cannot be null");
- this.fs = fs;
this.file = file;
this.fields = fieldNames;
}
http://git-wip-us.apache.org/repos/asf/storm/blob/1e52f083/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileLock.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileLock.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileLock.java
index b40d1dd..89ed855 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileLock.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/FileLock.java
@@ -48,7 +48,7 @@ public class FileLock {
private final FSDataOutputStream lockFileStream;
private LogEntry lastEntry;
- private static final Logger log = LoggerFactory.getLogger(DirLock.class);
+ private static final Logger log = LoggerFactory.getLogger(FileLock.class);
private FileLock(FileSystem fs, Path lockFile, FSDataOutputStream lockFileStream, String spoutId)
throws IOException {
@@ -89,9 +89,15 @@ public class FileLock {
lastEntry = entry; // update this only after writing to hdfs
}
+ /** Release lock by deleting file
+ * @throws IOException if lock file could not be deleted
+ */
public void release() throws IOException {
lockFileStream.close();
- fs.delete(lockFile, false);
+ if(!fs.delete(lockFile, false)){
+ log.warn("Unable to delete lock file");
+ throw new IOException("Unable to delete lock file");
+ }
log.debug("Released lock file {}", lockFile);
}
@@ -109,10 +115,10 @@ public class FileLock {
try {
FSDataOutputStream ostream = HdfsUtils.tryCreateFile(fs, lockFile);
if (ostream != null) {
- log.info("Acquired lock on file {}. LockFile=", fileToLock, lockFile);
+ log.debug("Acquired lock on file {}. LockFile=", fileToLock, lockFile);
return new FileLock(fs, lockFile, ostream, spoutId);
} else {
- log.info("Cannot lock file {} as its already locked.", fileToLock);
+ log.debug("Cannot lock file {} as its already locked.", fileToLock);
return null;
}
} catch (IOException e) {
@@ -166,7 +172,6 @@ public class FileLock {
return LogEntry.deserialize(lastLine);
}
- // takes ownership of the lock file
/**
* Takes ownership of the lock file if possible.
* @param lockFile
@@ -184,7 +189,7 @@ public class FileLock {
return new FileLock(fs, lockFile, spoutId, lastEntry);
} catch (RemoteException e) {
if (e.unwrapRemoteException() instanceof AlreadyBeingCreatedException) {
- log.info("Lock file {} is currently open. Cannot transfer ownership.", lockFile);
+ log.warn("Lock file {} is currently open. Cannot transfer ownership now. Will try later.", lockFile);
return null;
} else { // unexpected error
throw e;
http://git-wip-us.apache.org/repos/asf/storm/blob/1e52f083/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java
index 50c2172..3d95ea7 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java
@@ -86,7 +86,7 @@ public class HdfsSpout extends BaseRichSpout {
private int acksSinceLastCommit = 0 ;
private final AtomicBoolean commitTimeElapsed = new AtomicBoolean(false);
private final Timer commitTimer = new Timer();
- private boolean fileReadCompletely = false;
+ private boolean fileReadCompletely = true;
private String configKey = Configs.DEFAULT_HDFS_CONFIG_KEY; // key for hdfs kerberos configs
@@ -130,12 +130,15 @@ public class HdfsSpout extends BaseRichSpout {
// 3) Select a new file if one is not open already
if (reader == null) {
reader = pickNextFile();
+ fileReadCompletely=false;
if (reader == null) {
LOG.debug("Currently no new files to process under : " + sourceDirPath);
return;
}
}
-
+ if( fileReadCompletely ) { // wait for more ACKs before proceeding
+ return;
+ }
// 4) Read record from file, emit to collector and record progress
List<Object> tuple = reader.next();
if (tuple != null) {
@@ -145,7 +148,7 @@ public class HdfsSpout extends BaseRichSpout {
emitData(tuple, msgId);
if(!ackEnabled) {
- ++acksSinceLastCommit; // assume message is immediately acked in non-ack mode
+ ++acksSinceLastCommit; // assume message is immediately ACKed in non-ack mode
commitProgress(reader.getFileOffset());
} else {
commitProgress(tracker.getCommitPosition());
@@ -175,6 +178,8 @@ public class HdfsSpout extends BaseRichSpout {
// will commit progress into lock file if commit threshold is reached
private void commitProgress(FileOffset position) {
+ if(position==null)
+ return;
if ( lock!=null && canCommitNow() ) {
try {
lock.heartbeat(position.toString());
@@ -205,15 +210,13 @@ public class HdfsSpout extends BaseRichSpout {
}
private void markFileAsDone(Path filePath) {
- fileReadCompletely = false;
try {
Path newFile = renameCompletedFile(reader.getFilePath());
LOG.info("Completed processing {}", newFile);
} catch (IOException e) {
LOG.error("Unable to archive completed file" + filePath, e);
}
- unlockAndCloseReader();
-
+ closeReaderAndResetTrackers();
}
private void markFileAsBad(Path file) {
@@ -222,19 +225,22 @@ public class HdfsSpout extends BaseRichSpout {
String originalName = new Path(fileNameMinusSuffix).getName();
Path newFile = new Path( badFilesDirPath + Path.SEPARATOR + originalName);
- LOG.info("Moving bad file {} to {} ", originalName, newFile);
+ LOG.info("Moving bad file {} to {}. Processed it till offset {}", originalName, newFile, tracker.getCommitPosition());
try {
if (!hdfs.rename(file, newFile) ) { // seems this can fail by returning false or throwing exception
throw new IOException("Move failed for bad file: " + file); // convert false ret value to exception
}
} catch (IOException e) {
- LOG.warn("Error moving bad file: " + file + ". to destination : " + newFile);
+ LOG.warn("Error moving bad file: " + file + " to destination " + newFile, e);
}
-
- unlockAndCloseReader();
+ closeReaderAndResetTrackers();
}
- private void unlockAndCloseReader() {
+ private void closeReaderAndResetTrackers() {
+ inflight.clear();
+ tracker.offsets.clear();
+ retryList.clear();
+
reader.close();
reader = null;
try {
@@ -245,8 +251,6 @@ public class HdfsSpout extends BaseRichSpout {
lock = null;
}
-
-
protected void emitData(List<Object> tuple, MessageId id) {
LOG.debug("Emitting - {}", id);
this.collector.emit(tuple, id);
@@ -306,21 +310,7 @@ public class HdfsSpout extends BaseRichSpout {
throw new RuntimeException(Configs.ARCHIVE_DIR + " setting is required");
}
this.archiveDirPath = new Path( conf.get(Configs.ARCHIVE_DIR).toString() );
-
- try {
- if(hdfs.exists(archiveDirPath)) {
- if(! hdfs.isDirectory(archiveDirPath) ) {
- LOG.error("Archive directory is a file. " + archiveDirPath);
- throw new RuntimeException("Archive directory is a file. " + archiveDirPath);
- }
- } else if(! hdfs.mkdirs(archiveDirPath) ) {
- LOG.error("Unable to create archive directory. " + archiveDirPath);
- throw new RuntimeException("Unable to create archive directory " + archiveDirPath);
- }
- } catch (IOException e) {
- LOG.error("Unable to create archive dir ", e);
- throw new RuntimeException("Unable to create archive directory ", e);
- }
+ validateOrMakeDir(hdfs, archiveDirPath, "Archive");
// -- bad files dir config
if ( !conf.containsKey(Configs.BAD_DIR) ) {
@@ -329,23 +319,9 @@ public class HdfsSpout extends BaseRichSpout {
}
this.badFilesDirPath = new Path(conf.get(Configs.BAD_DIR).toString());
+ validateOrMakeDir(hdfs, badFilesDirPath, "bad files");
- try {
- if(hdfs.exists(badFilesDirPath)) {
- if(! hdfs.isDirectory(badFilesDirPath) ) {
- LOG.error("Bad files directory is a file: " + badFilesDirPath);
- throw new RuntimeException("Bad files directory is a file: " + badFilesDirPath);
- }
- } else if(! hdfs.mkdirs(badFilesDirPath) ) {
- LOG.error("Unable to create directory for bad files: " + badFilesDirPath);
- throw new RuntimeException("Unable to create a directory for bad files: " + badFilesDirPath);
- }
- } catch (IOException e) {
- LOG.error("Unable to create archive dir ", e);
- throw new RuntimeException(e.getMessage(), e);
- }
-
- // -- ignore filename suffix
+ // -- ignore filename suffix
if ( conf.containsKey(Configs.IGNORE_SUFFIX) ) {
this.ignoreSuffix = conf.get(Configs.IGNORE_SUFFIX).toString();
}
@@ -353,21 +329,7 @@ public class HdfsSpout extends BaseRichSpout {
// -- lock dir config
String lockDir = !conf.containsKey(Configs.LOCK_DIR) ? getDefaultLockDir(sourceDirPath) : conf.get(Configs.LOCK_DIR).toString() ;
this.lockDirPath = new Path(lockDir);
-
- try {
- if(hdfs.exists(lockDirPath)) {
- if(! hdfs.isDirectory(lockDirPath) ) {
- LOG.error("Lock directory is a file: " + lockDirPath);
- throw new RuntimeException("Lock directory is a file: " + lockDirPath);
- }
- } else if(! hdfs.mkdirs(lockDirPath) ) {
- LOG.error("Unable to create lock directory: " + lockDirPath);
- throw new RuntimeException("Unable to create lock directory: " + lockDirPath);
- }
- } catch (IOException e) {
- LOG.error("Unable to create lock dir: " + lockDirPath, e);
- throw new RuntimeException(e.getMessage(), e);
- }
+ validateOrMakeDir(hdfs,lockDirPath,"locks");
// -- lock timeout
if( conf.get(Configs.LOCK_TIMEOUT) !=null )
@@ -403,6 +365,23 @@ public class HdfsSpout extends BaseRichSpout {
setupCommitElapseTimer();
}
+ private static void validateOrMakeDir(FileSystem fs, Path dir, String dirDescription) {
+ try {
+ if(fs.exists(dir)) {
+ if(! fs.isDirectory(dir) ) {
+ LOG.error(dirDescription + " directory is a file, not a dir. " + dir);
+ throw new RuntimeException(dirDescription + " directory is a file, not a dir. " + dir);
+ }
+ } else if(! fs.mkdirs(dir) ) {
+ LOG.error("Unable to create " + dirDescription + " directory " + dir);
+ throw new RuntimeException("Unable to create " + dirDescription + " directory " + dir);
+ }
+ } catch (IOException e) {
+ LOG.error("Unable to create " + dirDescription + " directory " + dir, e);
+ throw new RuntimeException("Unable to create " + dirDescription + " directory " + dir, e);
+ }
+ }
+
private String getDefaultLockDir(Path sourceDirPath) {
return sourceDirPath.toString() + Path.SEPARATOR + Configs.DEFAULT_LOCK_DIR;
}
@@ -425,12 +404,14 @@ public class HdfsSpout extends BaseRichSpout {
@Override
public void ack(Object msgId) {
+ if(!ackEnabled)
+ throw new IllegalStateException("Received an ACKs when ack-ing is disabled" );
MessageId id = (MessageId) msgId;
inflight.remove(id);
++acksSinceLastCommit;
tracker.recordAckedOffset(id.offset);
commitProgress(tracker.getCommitPosition());
- if(fileReadCompletely) {
+ if(fileReadCompletely && inflight.isEmpty()) {
markFileAsDone(reader.getFilePath());
reader = null;
}
http://git-wip-us.apache.org/repos/asf/storm/blob/1e52f083/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/TextFileReader.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/TextFileReader.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/TextFileReader.java
index 6e4a8b0..b998d30 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/TextFileReader.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/TextFileReader.java
@@ -46,19 +46,20 @@ class TextFileReader extends AbstractFileReader {
private TextFileReader.Offset offset;
public TextFileReader(FileSystem fs, Path file, Map conf) throws IOException {
- super(fs, file, new Fields(DEFAULT_FIELD_NAME));
- FSDataInputStream in = fs.open(file);
- String charSet = (conf==null || !conf.containsKey(CHARSET) ) ? "UTF-8" : conf.get(CHARSET).toString();
- int buffSz = (conf==null || !conf.containsKey(BUFFER_SIZE) ) ? DEFAULT_BUFF_SIZE : Integer.parseInt( conf.get(BUFFER_SIZE).toString() );
- reader = new BufferedReader(new InputStreamReader(in, charSet), buffSz);
- offset = new TextFileReader.Offset(0,0);
+ this(fs, file, conf, new TextFileReader.Offset(0,0) );
}
public TextFileReader(FileSystem fs, Path file, Map conf, String startOffset) throws IOException {
+ this(fs, file, conf, new TextFileReader.Offset(startOffset) );
+ }
+
+ private TextFileReader(FileSystem fs, Path file, Map conf, TextFileReader.Offset startOffset) throws IOException {
super(fs, file, new Fields(DEFAULT_FIELD_NAME));
- offset = new TextFileReader.Offset(startOffset);
+ offset = startOffset;
FSDataInputStream in = fs.open(file);
- in.seek(offset.byteOffset);
+ if(offset.byteOffset>0)
+ in.seek(offset.byteOffset);
+
String charSet = (conf==null || !conf.containsKey(CHARSET) ) ? "UTF-8" : conf.get(CHARSET).toString();
int buffSz = (conf==null || !conf.containsKey(BUFFER_SIZE) ) ? DEFAULT_BUFF_SIZE : Integer.parseInt( conf.get(BUFFER_SIZE).toString() );
reader = new BufferedReader(new InputStreamReader(in, charSet), buffSz);
@@ -97,6 +98,8 @@ class TextFileReader extends AbstractFileReader {
}
public Offset(String offset) {
+ if(offset!=null)
+ throw new IllegalArgumentException("offset cannot be null");
try {
String[] parts = offset.split(":");
this.byteOffset = Long.parseLong(parts[0].split("=")[1]);
http://git-wip-us.apache.org/repos/asf/storm/blob/1e52f083/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSpout.java b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSpout.java
index 98d21f8..f64400a 100644
--- a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSpout.java
+++ b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSpout.java
@@ -18,6 +18,7 @@
package org.apache.storm.hdfs.spout;
+import backtype.storm.Config;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import org.apache.hadoop.hdfs.MiniDFSCluster;
@@ -48,6 +49,7 @@ import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
import java.io.InputStreamReader;
+import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
@@ -109,31 +111,51 @@ public class TestHdfsSpout {
}
@Test
- public void testSimpleText() throws IOException {
+ public void testSimpleText_noACK() throws IOException {
Path file1 = new Path(source.toString() + "/file1.txt");
createTextFile(file1, 5);
Path file2 = new Path(source.toString() + "/file2.txt");
createTextFile(file2, 5);
- listDir(source);
-
Map conf = getDefaultConfig();
conf.put(Configs.COMMIT_FREQ_COUNT, "1");
conf.put(Configs.COMMIT_FREQ_SEC, "1");
+
HdfsSpout spout = makeSpout(0, conf, Configs.TEXT);
- List<String> res = runSpout(spout,"r11", "a0", "a1", "a2", "a3", "a4");
- for (String re : res) {
- System.err.println(re);
- }
+ runSpout(spout,"r11");
- listCompletedDir();
Path arc1 = new Path(archive.toString() + "/file1.txt");
Path arc2 = new Path(archive.toString() + "/file2.txt");
checkCollectorOutput_txt((MockCollector) spout.getCollector(), arc1, arc2);
}
+ @Test
+ public void testSimpleText_ACK() throws IOException {
+ Path file1 = new Path(source.toString() + "/file1.txt");
+ createTextFile(file1, 5);
+
+ Path file2 = new Path(source.toString() + "/file2.txt");
+ createTextFile(file2, 5);
+
+ Map conf = getDefaultConfig();
+ conf.put(Configs.COMMIT_FREQ_COUNT, "1");
+ conf.put(Configs.COMMIT_FREQ_SEC, "1");
+ conf.put(Config.TOPOLOGY_ACKER_EXECUTORS, "1"); // enable acking
+ HdfsSpout spout = makeSpout(0, conf, Configs.TEXT);
+
+ // consume file 1
+ runSpout(spout, "r6", "a0", "a1", "a2", "a3", "a4");
+ Path arc1 = new Path(archive.toString() + "/file1.txt");
+ checkCollectorOutput_txt((MockCollector) spout.getCollector(), arc1);
+
+ // consume file 2
+ runSpout(spout, "r6", "a5", "a6", "a7", "a8", "a9");
+ Path arc2 = new Path(archive.toString() + "/file2.txt");
+ checkCollectorOutput_txt((MockCollector) spout.getCollector(), arc1, arc2);
+ }
+
private void checkCollectorOutput_txt(MockCollector collector, Path... txtFiles) throws IOException {
ArrayList<String> expected = new ArrayList<>();
for (Path txtFile : txtFiles) {
@@ -190,11 +212,6 @@ public class TestHdfsSpout {
return result;
}
- private void listCompletedDir() throws IOException {
- listDir(source);
- listDir(archive);
- }
-
private List<String> listDir(Path p) throws IOException {
ArrayList<String> result = new ArrayList<>();
System.err.println("*** Listing " + p);
@@ -209,28 +226,97 @@ public class TestHdfsSpout {
@Test
- public void testSimpleSequenceFile() throws IOException {
+ public void testMultipleFileConsumption_Ack() throws Exception {
+ Path file1 = new Path(source.toString() + "/file1.txt");
+ createTextFile(file1, 5);
+
+ Map conf = getDefaultConfig();
+ conf.put(Configs.COMMIT_FREQ_COUNT, "1");
+ conf.put(Configs.COMMIT_FREQ_SEC, "1");
+ conf.put(Config.TOPOLOGY_ACKER_EXECUTORS, "1"); // enable ACKing
+ HdfsSpout spout = makeSpout(0, conf, Configs.TEXT);
+
+ // read few lines from file1 dont ack
+ runSpout(spout, "r3");
+ FileReader reader = getField(spout, "reader");
+ Assert.assertNotNull(reader);
+ Assert.assertEquals(false, getBoolField(spout, "fileReadCompletely"));
+
+ // read remaining lines
+ runSpout(spout, "r3");
+ reader = getField(spout, "reader");
+ Assert.assertNotNull(reader);
+ Assert.assertEquals(true, getBoolField(spout, "fileReadCompletely") );
+
+ // ack few
+ runSpout(spout, "a0", "a1", "a2");
+ reader = getField(spout, "reader");
+ Assert.assertNotNull(reader);
+ Assert.assertEquals(true, getBoolField(spout, "fileReadCompletely"));
+
+ //ack rest
+ runSpout(spout, "a3", "a4");
+ reader = getField(spout, "reader");
+ Assert.assertNull(reader);
+ Assert.assertEquals(true, getBoolField(spout, "fileReadCompletely"));
+
+
+ // go to next file
+ Path file2 = new Path(source.toString() + "/file2.txt");
+ createTextFile(file2, 5);
+
+ // Read 1 line
+ runSpout(spout, "r1");
+ Assert.assertNotNull(getField(spout, "reader"));
+ Assert.assertEquals(false, getBoolField(spout, "fileReadCompletely"));
+ // ack 1 tuple
+ runSpout(spout, "a5");
+ Assert.assertNotNull(getField(spout, "reader"));
+ Assert.assertEquals(false, getBoolField(spout, "fileReadCompletely"));
+
+
+ // read and ack remaining lines
+ runSpout(spout, "r5", "a6", "a7", "a8", "a9");
+ Assert.assertNull(getField(spout, "reader"));
+ Assert.assertEquals(true, getBoolField(spout, "fileReadCompletely"));
+ }
+
+ private static <T> T getField(HdfsSpout spout, String fieldName) throws NoSuchFieldException, IllegalAccessException {
+ Field readerFld = HdfsSpout.class.getDeclaredField(fieldName);
+ readerFld.setAccessible(true);
+ return (T) readerFld.get(spout);
+ }
+
+ private static boolean getBoolField(HdfsSpout spout, String fieldName) throws NoSuchFieldException, IllegalAccessException {
+ Field readerFld = HdfsSpout.class.getDeclaredField(fieldName);
+ readerFld.setAccessible(true);
+ return readerFld.getBoolean(spout);
+ }
+
+
+ @Test
+ public void testSimpleSequenceFile() throws IOException {
+ //1) create a couple files to consume
source = new Path("/tmp/hdfsspout/source");
fs.mkdirs(source);
archive = new Path("/tmp/hdfsspout/archive");
fs.mkdirs(archive);
Path file1 = new Path(source + "/file1.seq");
- createSeqFile(fs, file1);
+ createSeqFile(fs, file1, 5);
Path file2 = new Path(source + "/file2.seq");
- createSeqFile(fs, file2);
+ createSeqFile(fs, file2, 5);
Map conf = getDefaultConfig();
HdfsSpout spout = makeSpout(0, conf, Configs.SEQ);
- List<String> res = runSpout(spout, "r11", "a0", "a1", "a2", "a3", "a4");
- for (String re : res) {
- System.err.println(re);
- }
+ // consume both files
+ List<String> res = runSpout(spout, "r11");
+ Assert.assertEquals(10, res.size());
- listDir(archive);
+ Assert.assertEquals(2, listDir(archive).size());
Path f1 = new Path(archive + "/file1.seq");
@@ -401,7 +487,7 @@ public class TestHdfsSpout {
* fN - fail, item number: N
*/
- private List<String> runSpout(HdfsSpout spout, String... cmds) {
+ private List<String> runSpout(HdfsSpout spout, String... cmds) {
MockCollector collector = (MockCollector) spout.getCollector();
for(String cmd : cmds) {
if(cmd.startsWith("r")) {
@@ -437,7 +523,7 @@ public class TestHdfsSpout {
- private static void createSeqFile(FileSystem fs, Path file) throws IOException {
+ private static void createSeqFile(FileSystem fs, Path file, int rowCount) throws IOException {
Configuration conf = new Configuration();
try {
@@ -446,7 +532,7 @@ public class TestHdfsSpout {
}
SequenceFile.Writer w = SequenceFile.createWriter(fs, conf, file, IntWritable.class, Text.class );
- for (int i = 0; i < 5; i++) {
+ for (int i = 0; i < rowCount; i++) {
w.append(new IntWritable(i), new Text("line " + i));
}
w.close();