You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2016/08/26 07:16:09 UTC

apex-malhar git commit: APEXMALHAR-2195 Fix LineReaderContext last record [Forced Update!]

Repository: apex-malhar
Updated Branches:
  refs/heads/master 390b41c43 -> c0af266ae (forced update)


APEXMALHAR-2195 Fix LineReaderContext last record

1. Fixing ReaderContext Issue

2. Changes in the test app

3. Incorporating review comments

4. Graceful handling of test termination


Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/c0af266a
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/c0af266a
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/c0af266a

Branch: refs/heads/master
Commit: c0af266aeec4f1a7bfbb0fc4bc5516d515ea3650
Parents: 2b2d5bc
Author: yogidevendra <yo...@apache.org>
Authored: Sun Aug 14 23:15:30 2016 +0530
Committer: yogidevendra <yo...@apache.org>
Committed: Fri Aug 26 12:36:04 2016 +0530

----------------------------------------------------------------------
 .../datatorrent/lib/io/block/ReaderContext.java | 16 +++++++++------
 .../apex/malhar/lib/fs/FSRecordReaderTest.java  | 21 ++++++++++----------
 2 files changed, 21 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c0af266a/library/src/main/java/com/datatorrent/lib/io/block/ReaderContext.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/block/ReaderContext.java b/library/src/main/java/com/datatorrent/lib/io/block/ReaderContext.java
index 1ee6c53..6fe47a2 100644
--- a/library/src/main/java/com/datatorrent/lib/io/block/ReaderContext.java
+++ b/library/src/main/java/com/datatorrent/lib/io/block/ReaderContext.java
@@ -152,7 +152,7 @@ public interface ReaderContext<STREAM extends InputStream & PositionedReadable>
     private final transient ByteArrayOutputStream tmpBuilder;
 
     private transient byte[] buffer;
-    private transient String strBuffer;
+    private transient String bufferStr;
     private transient int posInStr;
 
     public LineReaderContext()
@@ -190,11 +190,11 @@ public interface ReaderContext<STREAM extends InputStream & PositionedReadable>
           if (bytesRead == -1) {
             break;
           }
-          strBuffer = new String(buffer);
+          bufferStr = new String(buffer,0, bytesRead);
         }
 
-        while (posInStr < strBuffer.length()) {
-          char c = strBuffer.charAt(posInStr);
+        while (posInStr < bufferStr.length()) {
+          char c = bufferStr.charAt(posInStr);
           if (c != '\r' && c != '\n') {
             tmpBuilder.write(c);
             posInStr++;
@@ -208,8 +208,8 @@ public interface ReaderContext<STREAM extends InputStream & PositionedReadable>
         lineBuilder.write(subLine);
 
         if (foundEOL) {
-          while (posInStr < strBuffer.length()) {
-            char c = strBuffer.charAt(posInStr);
+          while (posInStr < bufferStr.length()) {
+            char c = bufferStr.charAt(posInStr);
             if (c == '\r' || c == '\n') {
               emptyBuilder.write(c);
               posInStr++;
@@ -219,6 +219,10 @@ public interface ReaderContext<STREAM extends InputStream & PositionedReadable>
           }
           usedBytes += emptyBuilder.toByteArray().length;
         } else {
+          //end of stream reached
+          if (bytesRead < bufferSize) {
+            break;
+          }
           //read more bytes from the input stream
           posInStr = 0;
         }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c0af266a/library/src/test/java/org/apache/apex/malhar/lib/fs/FSRecordReaderTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/fs/FSRecordReaderTest.java b/library/src/test/java/org/apache/apex/malhar/lib/fs/FSRecordReaderTest.java
index fdd888c..ecaff70 100644
--- a/library/src/test/java/org/apache/apex/malhar/lib/fs/FSRecordReaderTest.java
+++ b/library/src/test/java/org/apache/apex/malhar/lib/fs/FSRecordReaderTest.java
@@ -90,14 +90,22 @@ public class FSRecordReaderTest
     LocalMode.Controller lc = lma.getController();
     lc.setHeartbeatMonitoringEnabled(true);
     lc.runAsync();
-    LOG.debug("Waiting for app to finish");
-    Thread.sleep(1000 * 1);
+    
+    Set<String> expectedRecords = new HashSet<String>(Arrays.asList(FILE_1_DATA.split("\n")));
+    expectedRecords.addAll(Arrays.asList(FILE_2_DATA.split("\n")));
+    
+    while (DelimitedValidator.records.size() != expectedRecords.size()) {
+      LOG.debug("Waiting for app to finish");
+      Thread.sleep(1000);
+    }
     lc.shutdown();
+    Assert.assertEquals(expectedRecords, DelimitedValidator.records);
+
   }
 
   public static class DelimitedValidator extends BaseOperator
   {
-    Set<String> records = new HashSet<String>();
+    static Set<String> records = new HashSet<String>();
 
     public final transient DefaultInputPort<byte[]> data = new DefaultInputPort<byte[]>()
     {
@@ -110,13 +118,6 @@ public class FSRecordReaderTest
       }
     };
 
-    public void teardown()
-    {
-      Set<String> expectedRecords = new HashSet<String>(Arrays.asList(FILE_1_DATA.split("\n")));
-      expectedRecords.addAll(Arrays.asList(FILE_2_DATA.split("\n")));
-
-      Assert.assertEquals(expectedRecords, records);
-    }
   }
 
   private static class DelimitedApplication implements StreamingApplication