You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2017/08/01 13:09:30 UTC

[apex-malhar] branch master updated: APEXMALHAR-2529 Allow subclasses of AbstractFileInputOperator to have their own logic to suspend emit for the current streaming window

This is an automated email from the ASF dual-hosted git repository.

thw pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apex-malhar.git


The following commit(s) were added to refs/heads/master by this push:
     new 09c4f57  APEXMALHAR-2529 Allow subclasses of AbstractFileInputOperator to have their own logic to suspend emit for the current streaming window
09c4f57 is described below

commit 09c4f5727287e12d47bfa022ad1e6606062d5844
Author: David Yan <da...@apache.org>
AuthorDate: Sat Jul 29 23:14:35 2017 -0700

    APEXMALHAR-2529 Allow subclasses of AbstractFileInputOperator to have their own logic to suspend emit for the current streaming window
---
 .../com/datatorrent/lib/io/fs/AbstractFileInputOperator.java  | 11 ++++++++++-
 1 file changed, 10 insertions(+), 1 deletion(-)

diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java
index 5c34546..228856d 100644
--- a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java
@@ -680,7 +680,7 @@ public abstract class AbstractFileInputOperator<T> implements InputOperator, Par
 
       try {
         int counterForTuple = 0;
-        while (counterForTuple++ < emitBatchSize) {
+        while (!suspendEmit() && counterForTuple++ < emitBatchSize) {
           T line = readEntity();
           if (line == null) {
             LOG.info("done reading file ({} entries).", offset);
@@ -711,6 +711,15 @@ public abstract class AbstractFileInputOperator<T> implements InputOperator, Par
   }
 
   /**
+   * Whether or not to suspend emission of tuples, regardless of emitBatchSize.
+   * This is useful for subclasses to have their own logic of whether to emit during the current window.
+   */
+  protected boolean suspendEmit()
+  {
+    return false;
+  }
+
+  /**
    * Notifies that the directory is being scanned.<br>
    * Override this method to custom handling. Will be called once
    */

-- 
To stop receiving notification emails like this one, please contact
['"commits@apex.apache.org" <co...@apex.apache.org>'].