You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2016/08/26 07:16:09 UTC
apex-malhar git commit: APEXMALHAR-2195 Fix LineReaderContext last
record [Forced Update!]
Repository: apex-malhar
Updated Branches:
refs/heads/master 390b41c43 -> c0af266ae (forced update)
APEXMALHAR-2195 Fix LineReaderContext last record
1. Fixing ReaderContext Issue
2. Changes in the test app
3. Incorporating review comments
4. Graceful handling of test termination
Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/c0af266a
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/c0af266a
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/c0af266a
Branch: refs/heads/master
Commit: c0af266aeec4f1a7bfbb0fc4bc5516d515ea3650
Parents: 2b2d5bc
Author: yogidevendra <yo...@apache.org>
Authored: Sun Aug 14 23:15:30 2016 +0530
Committer: yogidevendra <yo...@apache.org>
Committed: Fri Aug 26 12:36:04 2016 +0530
----------------------------------------------------------------------
.../datatorrent/lib/io/block/ReaderContext.java | 16 +++++++++------
.../apex/malhar/lib/fs/FSRecordReaderTest.java | 21 ++++++++++----------
2 files changed, 21 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c0af266a/library/src/main/java/com/datatorrent/lib/io/block/ReaderContext.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/block/ReaderContext.java b/library/src/main/java/com/datatorrent/lib/io/block/ReaderContext.java
index 1ee6c53..6fe47a2 100644
--- a/library/src/main/java/com/datatorrent/lib/io/block/ReaderContext.java
+++ b/library/src/main/java/com/datatorrent/lib/io/block/ReaderContext.java
@@ -152,7 +152,7 @@ public interface ReaderContext<STREAM extends InputStream & PositionedReadable>
private final transient ByteArrayOutputStream tmpBuilder;
private transient byte[] buffer;
- private transient String strBuffer;
+ private transient String bufferStr;
private transient int posInStr;
public LineReaderContext()
@@ -190,11 +190,11 @@ public interface ReaderContext<STREAM extends InputStream & PositionedReadable>
if (bytesRead == -1) {
break;
}
- strBuffer = new String(buffer);
+ bufferStr = new String(buffer,0, bytesRead);
}
- while (posInStr < strBuffer.length()) {
- char c = strBuffer.charAt(posInStr);
+ while (posInStr < bufferStr.length()) {
+ char c = bufferStr.charAt(posInStr);
if (c != '\r' && c != '\n') {
tmpBuilder.write(c);
posInStr++;
@@ -208,8 +208,8 @@ public interface ReaderContext<STREAM extends InputStream & PositionedReadable>
lineBuilder.write(subLine);
if (foundEOL) {
- while (posInStr < strBuffer.length()) {
- char c = strBuffer.charAt(posInStr);
+ while (posInStr < bufferStr.length()) {
+ char c = bufferStr.charAt(posInStr);
if (c == '\r' || c == '\n') {
emptyBuilder.write(c);
posInStr++;
@@ -219,6 +219,10 @@ public interface ReaderContext<STREAM extends InputStream & PositionedReadable>
}
usedBytes += emptyBuilder.toByteArray().length;
} else {
+ //end of stream reached
+ if (bytesRead < bufferSize) {
+ break;
+ }
//read more bytes from the input stream
posInStr = 0;
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c0af266a/library/src/test/java/org/apache/apex/malhar/lib/fs/FSRecordReaderTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/fs/FSRecordReaderTest.java b/library/src/test/java/org/apache/apex/malhar/lib/fs/FSRecordReaderTest.java
index fdd888c..ecaff70 100644
--- a/library/src/test/java/org/apache/apex/malhar/lib/fs/FSRecordReaderTest.java
+++ b/library/src/test/java/org/apache/apex/malhar/lib/fs/FSRecordReaderTest.java
@@ -90,14 +90,22 @@ public class FSRecordReaderTest
LocalMode.Controller lc = lma.getController();
lc.setHeartbeatMonitoringEnabled(true);
lc.runAsync();
- LOG.debug("Waiting for app to finish");
- Thread.sleep(1000 * 1);
+
+ Set<String> expectedRecords = new HashSet<String>(Arrays.asList(FILE_1_DATA.split("\n")));
+ expectedRecords.addAll(Arrays.asList(FILE_2_DATA.split("\n")));
+
+ while (DelimitedValidator.records.size() != expectedRecords.size()) {
+ LOG.debug("Waiting for app to finish");
+ Thread.sleep(1000);
+ }
lc.shutdown();
+ Assert.assertEquals(expectedRecords, DelimitedValidator.records);
+
}
public static class DelimitedValidator extends BaseOperator
{
- Set<String> records = new HashSet<String>();
+ static Set<String> records = new HashSet<String>();
public final transient DefaultInputPort<byte[]> data = new DefaultInputPort<byte[]>()
{
@@ -110,13 +118,6 @@ public class FSRecordReaderTest
}
};
- public void teardown()
- {
- Set<String> expectedRecords = new HashSet<String>(Arrays.asList(FILE_1_DATA.split("\n")));
- expectedRecords.addAll(Arrays.asList(FILE_2_DATA.split("\n")));
-
- Assert.assertEquals(expectedRecords, records);
- }
}
private static class DelimitedApplication implements StreamingApplication