You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by hs...@apache.org on 2015/01/17 01:57:44 UTC
flume git commit: FLUME-2568. Additional fix for
TestReliableSpoolingFileEventReader
Repository: flume
Updated Branches:
refs/heads/trunk 1d9bab676 -> 91c58804d
FLUME-2568. Additional fix for TestReliableSpoolingFileEventReader
(Johny Rufus via Hari)
Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/91c58804
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/91c58804
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/91c58804
Branch: refs/heads/trunk
Commit: 91c58804da51a551fcbbe290261810c7750ee749
Parents: 1d9bab6
Author: Hari Shreedharan <hs...@apache.org>
Authored: Fri Jan 16 16:56:48 2015 -0800
Committer: Hari Shreedharan <hs...@apache.org>
Committed: Fri Jan 16 16:56:48 2015 -0800
----------------------------------------------------------------------
.../TestReliableSpoolingFileEventReader.java | 31 ++++++++++++--------
1 file changed, 18 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flume/blob/91c58804/flume-ng-core/src/test/java/org/apache/flume/client/avro/TestReliableSpoolingFileEventReader.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/test/java/org/apache/flume/client/avro/TestReliableSpoolingFileEventReader.java b/flume-ng-core/src/test/java/org/apache/flume/client/avro/TestReliableSpoolingFileEventReader.java
index 476bbff..4e90054 100644
--- a/flume-ng-core/src/test/java/org/apache/flume/client/avro/TestReliableSpoolingFileEventReader.java
+++ b/flume-ng-core/src/test/java/org/apache/flume/client/avro/TestReliableSpoolingFileEventReader.java
@@ -238,30 +238,28 @@ public class TestReliableSpoolingFileEventReader {
FileUtils.write(fileName,
"New file created in the end. Shoud be read randomly.\n");
Set<String> expected = Sets.newHashSet();
- File tempDir = Files.createTempDir();
- File tempFile = new File(tempDir, "t");
- File finalFile = new File(WORK_DIR, "t-file");
int totalFiles = WORK_DIR.listFiles().length;
- FileUtils.write(tempFile, "Last file");
final Set<String> actual = Sets.newHashSet();
ExecutorService executor = Executors.newSingleThreadExecutor();
- final Semaphore semaphore = new Semaphore(0);
+ final Semaphore semaphore1 = new Semaphore(0);
+ final Semaphore semaphore2 = new Semaphore(0);
Future<Void> wait = executor.submit(
new Callable<Void>() {
@Override
public Void call() throws Exception {
- readEventsForFilesInDir(WORK_DIR, reader, actual, semaphore);
+ readEventsForFilesInDir(WORK_DIR, reader, actual, semaphore1, semaphore2);
return null;
}
}
);
- semaphore.acquire();
- tempFile.renameTo(finalFile);
+ semaphore1.acquire();
+ File finalFile = new File(WORK_DIR, "t-file");
+ FileUtils.write(finalFile, "Last file");
+ semaphore2.release();
wait.get();
int listFilesCount = ((ReliableSpoolingFileEventReader)reader)
.getListFilesCount();
finalFile.delete();
- FileUtils.deleteQuietly(tempDir);
createExpectedFromFilesInSetup(expected);
expected.add("");
expected.add(
@@ -496,13 +494,14 @@ public class TestReliableSpoolingFileEventReader {
private void readEventsForFilesInDir(File dir, ReliableEventReader reader,
Collection<String> actual) throws IOException {
- readEventsForFilesInDir(dir, reader, actual, null);
+ readEventsForFilesInDir(dir, reader, actual, null, null);
}
/* Read events, one for each file in the given directory. */
private void readEventsForFilesInDir(File dir, ReliableEventReader reader,
- Collection<String> actual, Semaphore semaphore) throws IOException {
+ Collection<String> actual, Semaphore semaphore1, Semaphore semaphore2) throws IOException {
List<Event> events;
+ boolean executed = false;
for (int i=0; i < listFiles(dir).size(); i++) {
events = reader.readEvents(10);
for (Event e : events) {
@@ -510,8 +509,14 @@ public class TestReliableSpoolingFileEventReader {
}
reader.commit();
try {
- if (semaphore != null) {
- semaphore.release();
+ if(!executed) {
+ executed = true;
+ if (semaphore1 != null) {
+ semaphore1.release();
+ }
+ if (semaphore2 != null) {
+ semaphore2.acquire();
+ }
}
} catch (Exception ex) {
throw new IOException(ex);