You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2017/08/01 13:09:30 UTC
[apex-malhar] branch master updated: APEXMALHAR-2529 Allow
subclasses of AbstractFileInputOperator to have their own logic to suspend
emit for the current streaming window
This is an automated email from the ASF dual-hosted git repository.
thw pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apex-malhar.git
The following commit(s) were added to refs/heads/master by this push:
new 09c4f57 APEXMALHAR-2529 Allow subclasses of AbstractFileInputOperator to have their own logic to suspend emit for the current streaming window
09c4f57 is described below
commit 09c4f5727287e12d47bfa022ad1e6606062d5844
Author: David Yan <da...@apache.org>
AuthorDate: Sat Jul 29 23:14:35 2017 -0700
APEXMALHAR-2529 Allow subclasses of AbstractFileInputOperator to have their own logic to suspend emit for the current streaming window
---
.../com/datatorrent/lib/io/fs/AbstractFileInputOperator.java | 11 ++++++++++-
1 file changed, 10 insertions(+), 1 deletion(-)
diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java
index 5c34546..228856d 100644
--- a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java
@@ -680,7 +680,7 @@ public abstract class AbstractFileInputOperator<T> implements InputOperator, Par
try {
int counterForTuple = 0;
- while (counterForTuple++ < emitBatchSize) {
+ while (!suspendEmit() && counterForTuple++ < emitBatchSize) {
T line = readEntity();
if (line == null) {
LOG.info("done reading file ({} entries).", offset);
@@ -711,6 +711,15 @@ public abstract class AbstractFileInputOperator<T> implements InputOperator, Par
}
/**
+ * Whether or not to suspend emission of tuples, regardless of emitBatchSize.
+ * This is useful for subclasses to have their own logic of whether to emit during the current window.
+ */
+ protected boolean suspendEmit()
+ {
+ return false;
+ }
+
+ /**
* Notifies that the directory is being scanned.<br>
* Override this method to custom handling. Will be called once
*/
--
To stop receiving notification emails like this one, please contact
['"commits@apex.apache.org" <co...@apex.apache.org>'].