You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@nemo.apache.org by GitBox <gi...@apache.org> on 2018/10/29 11:54:23 UTC

[GitHub] johnyangk closed pull request #130: [NEMO-233] Emit watermark at unbounded source

johnyangk closed pull request #130: [NEMO-233] Emit watermark at unbounded source 
URL: https://github.com/apache/incubator-nemo/pull/130
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/common/src/main/java/org/apache/nemo/common/ir/BoundedIteratorReadable.java b/common/src/main/java/org/apache/nemo/common/ir/BoundedIteratorReadable.java
new file mode 100644
index 000000000..4d157555f
--- /dev/null
+++ b/common/src/main/java/org/apache/nemo/common/ir/BoundedIteratorReadable.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.common.ir;
+
+import java.util.Iterator;
+
+/**
+ * An abstract readable class that retrieves data from iterator.
+ * @param <O> output type.
+ */
+public abstract class BoundedIteratorReadable<O> implements Readable<O> {
+
+  private Iterator<O> iterator;
+
+  /**
+   * Initialize iterator.
+   * @return iterator
+   */
+  protected abstract Iterator<O> initializeIterator();
+
+  /**
+   * Prepare reading data.
+   */
+  @Override
+  public final void prepare() {
+    iterator = initializeIterator();
+  }
+
+  @Override
+  public final O readCurrent() {
+    return iterator.next();
+  }
+
+  @Override
+  public final void advance() {
+    // do nothing
+  }
+
+  @Override
+  public final boolean isFinished() {
+    return !iterator.hasNext();
+  }
+}
diff --git a/common/src/main/java/org/apache/nemo/common/ir/Readable.java b/common/src/main/java/org/apache/nemo/common/ir/Readable.java
index 2955ed32d..e0f7b7c79 100644
--- a/common/src/main/java/org/apache/nemo/common/ir/Readable.java
+++ b/common/src/main/java/org/apache/nemo/common/ir/Readable.java
@@ -21,19 +21,44 @@
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.List;
+import java.util.NoSuchElementException;
 
 /**
  * Interface for readable.
  * @param <O> output type.
  */
 public interface Readable<O> extends Serializable {
+
+  /**
+   * Prepare reading data.
+   */
+  void prepare();
+
   /**
-   * Method to read data from the source.
+   * Method to read current data from the source.
+   * The caller should check whether the Readable is finished or not by using isFinished() method
+   * before calling this method.
    *
-   * @return an {@link Iterable} of the data read by the readable.
-   * @throws IOException exception while reading data.
+   * It can throw NoSuchElementException although it is not finished in Unbounded source.
+   * @return a data read by the readable.
+   */
+  O readCurrent() throws NoSuchElementException;
+
+  /**
+   * Advance current data point.
+   */
+  void advance() throws IOException;
+
+  /**
+   * Read watermark.
+   * @return watermark
+   */
+  long readWatermark();
+
+  /**
+   * @return true if it reads all data.
    */
-  Iterable<O> read() throws IOException;
+  boolean isFinished();
 
   /**
    * Returns the list of locations where this readable resides.
@@ -44,4 +69,9 @@
    * @throws Exception                     any other exceptions on the way
    */
   List<String> getLocations() throws Exception;
+
+  /**
+   * Close.
+   */
+  void close() throws IOException;
 }
diff --git a/common/src/main/java/org/apache/nemo/common/ir/vertex/CachedSourceVertex.java b/common/src/main/java/org/apache/nemo/common/ir/vertex/CachedSourceVertex.java
index fe42ed5bd..6e909cfb3 100644
--- a/common/src/main/java/org/apache/nemo/common/ir/vertex/CachedSourceVertex.java
+++ b/common/src/main/java/org/apache/nemo/common/ir/vertex/CachedSourceVertex.java
@@ -22,7 +22,6 @@
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.List;
 
 /**
@@ -61,6 +60,12 @@ public CachedSourceVertex getClone() {
     return that;
   }
 
+  @Override
+  public boolean isBounded() {
+    // It supports only bounded source.
+    return true;
+  }
+
   @Override
   public List<Readable<T>> getReadables(final int desiredNumOfSplits) {
     // Ignore the desired number of splits.
@@ -77,7 +82,6 @@ public void clearInternalStates() {
    * It does not contain any actual data but the data will be sent from the cached store through external input reader.
    */
   private final class CachedReadable implements Readable<T> {
-
     /**
      * Constructor.
      */
@@ -86,13 +90,41 @@ private CachedReadable() {
     }
 
     @Override
-    public Iterable<T> read() throws IOException {
-      return Collections.emptyList();
+    public void prepare() {
+
+    }
+
+    @Override
+    public T readCurrent() {
+      throw new UnsupportedOperationException(
+        "CachedSourceVertex should not be used");
+    }
+
+    @Override
+    public void advance() throws IOException {
+      throw new UnsupportedOperationException(
+        "CachedSourceVertex should not be used");
+    }
+
+    @Override
+    public long readWatermark() {
+      throw new UnsupportedOperationException(
+        "CachedSourceVertex should not be used");
+    }
+
+    @Override
+    public boolean isFinished() {
+      return true;
     }
 
     @Override
     public List<String> getLocations() {
       throw new UnsupportedOperationException();
     }
+
+    @Override
+    public void close() throws IOException {
+
+    }
   }
 }
diff --git a/common/src/main/java/org/apache/nemo/common/ir/vertex/InMemorySourceVertex.java b/common/src/main/java/org/apache/nemo/common/ir/vertex/InMemorySourceVertex.java
index 6cf200920..3278963dc 100644
--- a/common/src/main/java/org/apache/nemo/common/ir/vertex/InMemorySourceVertex.java
+++ b/common/src/main/java/org/apache/nemo/common/ir/vertex/InMemorySourceVertex.java
@@ -18,8 +18,10 @@
  */
 package org.apache.nemo.common.ir.vertex;
 
+import org.apache.nemo.common.ir.BoundedIteratorReadable;
 import org.apache.nemo.common.ir.Readable;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
@@ -56,6 +58,11 @@ public InMemorySourceVertex(final InMemorySourceVertex that) {
     return new InMemorySourceVertex<>(this);
   }
 
+  @Override
+  public boolean isBounded() {
+    return true;
+  }
+
   @Override
   public List<Readable<T>> getReadables(final int desiredNumOfSplits) throws Exception {
 
@@ -88,7 +95,8 @@ public void clearInternalStates() {
    * Simply returns the in-memory data.
    * @param <T> type of the data.
    */
-  private static final class InMemorySourceReadable<T> implements Readable<T> {
+  private static final class InMemorySourceReadable<T> extends BoundedIteratorReadable<T> {
+
     private final Iterable<T> initializedSourceData;
 
     /**
@@ -96,17 +104,28 @@ public void clearInternalStates() {
      * @param initializedSourceData the source data.
      */
     private InMemorySourceReadable(final Iterable<T> initializedSourceData) {
+      super();
       this.initializedSourceData = initializedSourceData;
     }
 
     @Override
-    public Iterable<T> read() {
-      return this.initializedSourceData;
+    protected Iterator<T> initializeIterator() {
+      return initializedSourceData.iterator();
+    }
+
+    @Override
+    public long readWatermark() {
+      throw new UnsupportedOperationException("No watermark");
     }
 
     @Override
     public List<String> getLocations() {
       throw new UnsupportedOperationException();
     }
+
+    @Override
+    public void close() throws IOException {
+
+    }
   }
 }
diff --git a/common/src/main/java/org/apache/nemo/common/ir/vertex/SourceVertex.java b/common/src/main/java/org/apache/nemo/common/ir/vertex/SourceVertex.java
index 6a86064de..789249098 100644
--- a/common/src/main/java/org/apache/nemo/common/ir/vertex/SourceVertex.java
+++ b/common/src/main/java/org/apache/nemo/common/ir/vertex/SourceVertex.java
@@ -36,6 +36,8 @@ public SourceVertex() {
     super();
   }
 
+  public abstract boolean isBounded();
+
   /**
    * Copy Constructor for SourceVertex.
    *
diff --git a/common/src/main/java/org/apache/nemo/common/punctuation/Finishmark.java b/common/src/main/java/org/apache/nemo/common/punctuation/Finishmark.java
new file mode 100644
index 000000000..e801c2518
--- /dev/null
+++ b/common/src/main/java/org/apache/nemo/common/punctuation/Finishmark.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.common.punctuation;
+
+/**
+ * Finish mark that notifies the data fetching is finished.
+ * This is only used for bounded source because unbounded source does not finish.
+ */
+public final class Finishmark {
+  private static final Finishmark INSTANCE = new Finishmark();
+
+  private Finishmark() {
+
+  }
+
+  public static Finishmark getInstance() {
+    return INSTANCE;
+  }
+}
diff --git a/common/src/main/java/org/apache/nemo/common/punctuation/Watermark.java b/common/src/main/java/org/apache/nemo/common/punctuation/Watermark.java
new file mode 100644
index 000000000..4f24a80f7
--- /dev/null
+++ b/common/src/main/java/org/apache/nemo/common/punctuation/Watermark.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.common.punctuation;
+
+import java.io.Serializable;
+
+/**
+ * Watermark event.
+ */
+public final class Watermark implements Serializable {
+  private final long timestamp;
+  public Watermark(final long timestamp) {
+    this.timestamp = timestamp;
+  }
+
+  public long getTimestamp() {
+    return timestamp;
+  }
+}
diff --git a/common/src/main/java/org/apache/nemo/common/test/EmptyComponents.java b/common/src/main/java/org/apache/nemo/common/test/EmptyComponents.java
index 1ce26373b..9c95bed3a 100644
--- a/common/src/main/java/org/apache/nemo/common/test/EmptyComponents.java
+++ b/common/src/main/java/org/apache/nemo/common/test/EmptyComponents.java
@@ -36,6 +36,7 @@
 import org.apache.nemo.common.ir.vertex.transform.Transform;
 import org.apache.beam.sdk.values.KV;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 
@@ -204,6 +205,11 @@ public String toString() {
       return sb.toString();
     }
 
+    @Override
+    public boolean isBounded() {
+      return true;
+    }
+
     @Override
     public List<Readable<T>> getReadables(final int desirednumOfSplits) {
       final List list = new ArrayList(desirednumOfSplits);
@@ -230,13 +236,37 @@ public void clearInternalStates() {
    */
   static final class EmptyReadable<T> implements Readable<T> {
     @Override
-    public Iterable<T> read() {
-      return new ArrayList<>();
+    public void prepare() {
+
+    }
+
+    @Override
+    public T readCurrent() {
+      return null;
+    }
+
+    @Override
+    public void advance() {
+    }
+
+    @Override
+    public long readWatermark() {
+      return 0;
+    }
+
+    @Override
+    public boolean isFinished() {
+      return true;
     }
 
     @Override
     public List<String> getLocations() {
       throw new UnsupportedOperationException();
     }
+
+    @Override
+    public void close() throws IOException {
+
+    }
   }
 }
diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslator.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslator.java
index 89be5ba47..58a6d4a53 100644
--- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslator.java
+++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslator.java
@@ -24,6 +24,7 @@
 import org.apache.beam.runners.core.construction.TransformInputs;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.runners.AppliedPTransform;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.nemo.common.dag.DAG;
 import org.apache.nemo.common.dag.DAGBuilder;
@@ -225,7 +226,7 @@ private static Transform createGBKTransform(
       Iterables.getOnlyElement(TransformInputs.nonAdditionalInputs(pTransform));
     final TupleTag mainOutputTag = new TupleTag<>();
 
-    if (mainInput.getWindowingStrategy() == WindowingStrategy.globalDefault()) {
+    if (mainInput.getWindowingStrategy().getWindowFn() instanceof GlobalWindows) {
       return new GroupByKeyTransform();
     } else {
       return new GroupByKeyAndWindowDoFnTransform(
diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/source/BeamBoundedSourceVertex.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/source/BeamBoundedSourceVertex.java
index 2602e7243..880a3a04d 100644
--- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/source/BeamBoundedSourceVertex.java
+++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/source/BeamBoundedSourceVertex.java
@@ -27,6 +27,7 @@
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
+import java.util.function.Function;
 
 import org.apache.nemo.common.ir.vertex.SourceVertex;
 import org.apache.beam.sdk.io.BoundedSource;
@@ -71,6 +72,11 @@ public BeamBoundedSourceVertex getClone() {
     return new BeamBoundedSourceVertex(this);
   }
 
+  @Override
+  public boolean isBounded() {
+    return true;
+  }
+
   @Override
   public List<Readable<WindowedValue<O>>> getReadables(final int desiredNumOfSplits) throws Exception {
     final List<Readable<WindowedValue<O>>> readables = new ArrayList<>();
@@ -99,6 +105,9 @@ public ObjectNode getPropertiesAsJsonNode() {
    */
   private static final class BoundedSourceReadable<T> implements Readable<WindowedValue<T>> {
     private final BoundedSource<T> boundedSource;
+    private boolean finished = false;
+    private BoundedSource.BoundedReader<T> reader;
+    private Function<T, WindowedValue<T>> windowedValueConverter;
 
     /**
      * Constructor of the BoundedSourceReadable.
@@ -109,32 +118,48 @@ public ObjectNode getPropertiesAsJsonNode() {
     }
 
     @Override
-    public Iterable<WindowedValue<T>> read() throws IOException {
-      boolean started = false;
-      boolean windowed = false;
-
-      final ArrayList<WindowedValue<T>> elements = new ArrayList<>();
-      try (BoundedSource.BoundedReader<T> reader = boundedSource.createReader(null)) {
-        for (boolean available = reader.start(); available; available = reader.advance()) {
-          final T elem = reader.getCurrent();
-
-          // Check whether the element is windowed or not
-          // We only have to check the first element.
-          if (!started) {
-            started = true;
-            if (elem instanceof WindowedValue) {
-              windowed = true;
-            }
-          }
+    public void prepare() {
+      try {
+        reader = boundedSource.createReader(null);
+        finished = !reader.start();
+
+        if (!finished) {
+          T elem = reader.getCurrent();
 
-          if (!windowed) {
-            elements.add(WindowedValue.valueInGlobalWindow(reader.getCurrent()));
+          if (elem instanceof WindowedValue) {
+            windowedValueConverter = val -> (WindowedValue) val;
           } else {
-            elements.add((WindowedValue<T>) elem);
+            windowedValueConverter = WindowedValue::valueInGlobalWindow;
           }
         }
+      } catch (final Exception e) {
+        throw new RuntimeException(e);
       }
-      return elements;
+    }
+
+    @Override
+    public WindowedValue<T> readCurrent() {
+      if (finished) {
+        throw new IllegalStateException("Bounded reader read all elements");
+      }
+
+      final T elem = reader.getCurrent();
+      return windowedValueConverter.apply(elem);
+    }
+
+    @Override
+    public void advance() throws IOException {
+      finished = !reader.advance();
+    }
+
+    @Override
+    public long readWatermark() {
+      throw new UnsupportedOperationException("No watermark");
+    }
+
+    @Override
+    public boolean isFinished() {
+      return finished;
     }
 
     @Override
@@ -149,5 +174,11 @@ public ObjectNode getPropertiesAsJsonNode() {
         throw new UnsupportedOperationException();
       }
     }
+
+    @Override
+    public void close() throws IOException {
+      finished = true;
+      reader.close();
+    }
   }
 }
diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/source/BeamUnboundedSourceVertex.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/source/BeamUnboundedSourceVertex.java
index 8ce26fe6f..97adc5bb6 100644
--- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/source/BeamUnboundedSourceVertex.java
+++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/source/BeamUnboundedSourceVertex.java
@@ -29,8 +29,9 @@
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Iterator;
 import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.function.Function;
 
 /**
  * SourceVertex implementation for UnboundedSource.
@@ -38,14 +39,12 @@
  * @param <M> checkpoint mark type.
  */
 public final class BeamUnboundedSourceVertex<O, M extends UnboundedSource.CheckpointMark> extends
-  SourceVertex<WindowedValue<O>> {
+  SourceVertex<Object> {
 
   private static final Logger LOG = LoggerFactory.getLogger(BeamUnboundedSourceVertex.class.getName());
   private UnboundedSource<O, M> source;
   private final String sourceDescription;
 
-  private static final long POLLING_INTERVAL = 10L;
-
   /**
    * The default constructor for beam unbounded source.
    * @param source unbounded source.
@@ -68,8 +67,13 @@ public IRVertex getClone() {
   }
 
   @Override
-  public List<Readable<WindowedValue<O>>> getReadables(final int desiredNumOfSplits) throws Exception {
-    final List<Readable<WindowedValue<O>>> readables = new ArrayList<>();
+  public boolean isBounded() {
+    return false;
+  }
+
+  @Override
+  public List<Readable<Object>> getReadables(final int desiredNumOfSplits) throws Exception {
+    final List<Readable<Object>> readables = new ArrayList<>();
     source.split(desiredNumOfSplits, null)
       .forEach(unboundedSource -> readables.add(new UnboundedSourceReadable<>(unboundedSource)));
     return readables;
@@ -93,90 +97,80 @@ public ObjectNode getPropertiesAsJsonNode() {
    * @param <M> checkpoint mark type.
    */
   private static final class UnboundedSourceReadable<O, M extends UnboundedSource.CheckpointMark>
-      implements Readable<WindowedValue<O>> {
+      implements Readable<Object> {
     private final UnboundedSource<O, M> unboundedSource;
+    private UnboundedSource.UnboundedReader<O> reader;
+    private Function<O, WindowedValue<O>> windowedValueConverter;
+    private boolean finished = false;
 
     UnboundedSourceReadable(final UnboundedSource<O, M> unboundedSource) {
       this.unboundedSource = unboundedSource;
     }
 
     @Override
-    public Iterable<WindowedValue<O>> read() throws IOException {
-      return new UnboundedSourceIterable<>(unboundedSource);
-    }
+    public void prepare() {
+      try {
+        reader = unboundedSource.createReader(null, null);
+        reader.start();
+      } catch (final Exception e) {
+        throw new RuntimeException(e);
+      }
 
-    @Override
-    public List<String> getLocations() throws Exception {
-      return new ArrayList<>();
+      // get first element
+      final O firstElement = retrieveFirstElement();
+      if (firstElement instanceof WindowedValue) {
+        windowedValueConverter = val -> (WindowedValue) val;
+      } else {
+        windowedValueConverter = WindowedValue::valueInGlobalWindow;
+      }
     }
-  }
-
-  /**
-   * The iterable class for unbounded sources.
-   * @param <O> output type.
-   * @param <M> checkpoint mark type.
-   */
-  private static final class UnboundedSourceIterable<O, M extends UnboundedSource.CheckpointMark>
-      implements Iterable<WindowedValue<O>> {
 
-    private UnboundedSourceIterator<O, M> iterator;
-
-    UnboundedSourceIterable(final UnboundedSource<O, M> unboundedSource) throws IOException {
-      this.iterator = new UnboundedSourceIterator<>(unboundedSource);
+    private O retrieveFirstElement() {
+      while (true) {
+        try {
+          return reader.getCurrent();
+        } catch (final NoSuchElementException e) {
+          // the first element is not currently available... retry
+          try {
+            Thread.sleep(100);
+          } catch (InterruptedException e1) {
+            e1.printStackTrace();
+            throw new RuntimeException(e);
+          }
+        }
+      }
     }
 
     @Override
-    public Iterator<WindowedValue<O>> iterator() {
-      return iterator;
+    public Object readCurrent() {
+      final O elem = reader.getCurrent();
+      return windowedValueConverter.apply(elem);
     }
-  }
 
-  /**
-   * The iterator for unbounded sources.
-   * @param <O> output type.
-   * @param <M> checkpoint mark type.
-   */
-  // TODO #233: Emit watermark at unbounded source
-  private static final class UnboundedSourceIterator<O, M extends UnboundedSource.CheckpointMark>
-      implements Iterator<WindowedValue<O>> {
+    @Override
+    public void advance() throws IOException {
+      reader.advance();
+    }
 
-    private final UnboundedSource.UnboundedReader<O> unboundedReader;
-    private boolean available;
+    @Override
+    public long readWatermark() {
+      return reader.getWatermark().getMillis();
+    }
 
-    UnboundedSourceIterator(final UnboundedSource<O, M> unboundedSource) throws IOException {
-      this.unboundedReader = unboundedSource.createReader(null, null);
-      available = unboundedReader.start();
+    @Override
+    public boolean isFinished() {
+      return finished;
     }
 
     @Override
-    public boolean hasNext() {
-      // Unbounded source always has next element until it finishes.
-      return true;
+    public List<String> getLocations() throws Exception {
+      return new ArrayList<>();
     }
 
     @Override
-    @SuppressWarnings("unchecked")
-    public WindowedValue<O> next() {
-      try {
-        while (true) {
-          if (!available) {
-            Thread.sleep(POLLING_INTERVAL);
-          } else {
-            final O element = unboundedReader.getCurrent();
-            final boolean windowed = element instanceof WindowedValue;
-            if (!windowed) {
-              return WindowedValue.valueInGlobalWindow(element);
-            } else {
-              return (WindowedValue<O>) element;
-            }
-          }
-          available = unboundedReader.advance();
-        }
-      } catch (final InterruptedException | IOException e) {
-        LOG.error("Exception occurred while waiting for the events...");
-        e.printStackTrace();
-        throw new RuntimeException(e);
-      }
+    public void close() throws IOException {
+      finished = true;
+      reader.close();
     }
   }
 }
diff --git a/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/source/SparkDatasetBoundedSourceVertex.java b/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/source/SparkDatasetBoundedSourceVertex.java
index 18eefb43f..31502beda 100644
--- a/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/source/SparkDatasetBoundedSourceVertex.java
+++ b/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/source/SparkDatasetBoundedSourceVertex.java
@@ -18,6 +18,7 @@
  */
 package org.apache.nemo.compiler.frontend.spark.source;
 
+import org.apache.nemo.common.ir.BoundedIteratorReadable;
 import org.apache.nemo.common.ir.Readable;
 import org.apache.nemo.common.ir.vertex.SourceVertex;
 import org.apache.nemo.compiler.frontend.spark.sql.Dataset;
@@ -73,6 +74,11 @@ public SparkDatasetBoundedSourceVertex getClone() {
     return new SparkDatasetBoundedSourceVertex(this);
   }
 
+  @Override
+  public boolean isBounded() {
+    return true;
+  }
+
   @Override
   public List<Readable<T>> getReadables(final int desiredNumOfSplits) {
     return readables;
@@ -86,7 +92,7 @@ public void clearInternalStates() {
   /**
    * A Readable wrapper for Spark Dataset.
    */
-  private final class SparkDatasetBoundedSourceReadable implements Readable<T> {
+  private final class SparkDatasetBoundedSourceReadable extends BoundedIteratorReadable<T> {
     private final LinkedHashMap<String, Object[]> commands;
     private final Map<String, String> sessionInitialConf;
     private final int partitionIndex;
@@ -111,11 +117,11 @@ private SparkDatasetBoundedSourceReadable(final Partition partition,
     }
 
     @Override
-    public Iterable<T> read() throws IOException {
+    protected Iterator<T> initializeIterator() {
       // for setting up the same environment in the executors.
       final SparkSession spark = SparkSession.builder()
-          .config(sessionInitialConf)
-          .getOrCreate();
+        .config(sessionInitialConf)
+        .getOrCreate();
       final Dataset<T> dataset;
 
       try {
@@ -126,8 +132,14 @@ private SparkDatasetBoundedSourceReadable(final Partition partition,
 
       // Spark does lazy evaluation: it doesn't load the full dataset, but only the partition it is asked for.
       final RDD<T> rdd = dataset.sparkRDD();
-      return () -> JavaConverters.asJavaIteratorConverter(
-          rdd.iterator(rdd.getPartitions()[partitionIndex], TaskContext$.MODULE$.empty())).asJava();
+      final Iterable<T> iterable = () -> JavaConverters.asJavaIteratorConverter(
+        rdd.iterator(rdd.getPartitions()[partitionIndex], TaskContext$.MODULE$.empty())).asJava();
+      return iterable.iterator();
+    }
+
+    @Override
+    public long readWatermark() {
+      throw new UnsupportedOperationException("No watermark");
     }
 
     @Override
@@ -138,5 +150,10 @@ private SparkDatasetBoundedSourceReadable(final Partition partition,
         return locations;
       }
     }
+
+    @Override
+    public void close() throws IOException {
+
+    }
   }
 }
diff --git a/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/source/SparkTextFileBoundedSourceVertex.java b/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/source/SparkTextFileBoundedSourceVertex.java
index ddfa85fd8..f0294b16e 100644
--- a/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/source/SparkTextFileBoundedSourceVertex.java
+++ b/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/source/SparkTextFileBoundedSourceVertex.java
@@ -18,6 +18,7 @@
  */
 package org.apache.nemo.compiler.frontend.spark.source;
 
+import org.apache.nemo.common.ir.BoundedIteratorReadable;
 import org.apache.nemo.common.ir.Readable;
 import org.apache.nemo.common.ir.vertex.SourceVertex;
 import org.apache.spark.*;
@@ -72,6 +73,11 @@ public SparkTextFileBoundedSourceVertex getClone() {
     return new SparkTextFileBoundedSourceVertex(this);
   }
 
+  @Override
+  public boolean isBounded() {
+    return true;
+  }
+
   @Override
   public List<Readable<String>> getReadables(final int desiredNumOfSplits) {
     return readables;
@@ -85,7 +91,7 @@ public void clearInternalStates() {
   /**
    * A Readable wrapper for Spark text file.
    */
-  private final class SparkTextFileBoundedSourceReadable implements Readable<String> {
+  private final class SparkTextFileBoundedSourceReadable extends BoundedIteratorReadable<String> {
     private final SparkConf sparkConf;
     private final int partitionIndex;
     private final List<String> locations;
@@ -114,14 +120,8 @@ private SparkTextFileBoundedSourceReadable(final Partition partition,
     }
 
     @Override
-    public Iterable<String> read() throws IOException {
-      // for setting up the same environment in the executors.
-      final SparkContext sparkContext = SparkContext.getOrCreate(sparkConf);
-
-      // Spark does lazy evaluation: it doesn't load the full data in rdd, but only the partition it is asked for.
-      final RDD<String> rdd = sparkContext.textFile(inputPath, numPartitions);
-      return () -> JavaConverters.asJavaIteratorConverter(
-          rdd.iterator(rdd.getPartitions()[partitionIndex], TaskContext$.MODULE$.empty())).asJava();
+    public long readWatermark() {
+      throw new UnsupportedOperationException("No watermark");
     }
 
     @Override
@@ -132,5 +132,21 @@ private SparkTextFileBoundedSourceReadable(final Partition partition,
         return locations;
       }
     }
+
+    @Override
+    public void close() throws IOException {
+    }
+
+    @Override
+    protected Iterator<String> initializeIterator() {
+      // for setting up the same environment in the executors.
+      final SparkContext sparkContext = SparkContext.getOrCreate(sparkConf);
+
+      // Spark does lazy evaluation: it doesn't load the full data in rdd, but only the partition it is asked for.
+      final RDD<String> rdd = sparkContext.textFile(inputPath, numPartitions);
+      final Iterable<String> iterable = () -> JavaConverters.asJavaIteratorConverter(
+        rdd.iterator(rdd.getPartitions()[partitionIndex], TaskContext$.MODULE$.empty())).asJava();
+      return iterable.iterator();
+    }
   }
 }
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/DataFetcher.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/DataFetcher.java
index 5b4777c33..2ca3df86c 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/DataFetcher.java
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/DataFetcher.java
@@ -22,11 +22,12 @@
 import org.apache.nemo.common.ir.vertex.IRVertex;
 
 import java.io.IOException;
+import java.util.NoSuchElementException;
 
 /**
  * An abstraction for fetching data from task-external sources.
  */
-abstract class DataFetcher {
+abstract class DataFetcher implements AutoCloseable {
   private final IRVertex dataSource;
   private final OutputCollector outputCollector;
 
@@ -42,7 +43,7 @@
    * @throws IOException upon I/O error
    * @throws java.util.NoSuchElementException if no more element is available
    */
-  abstract Object fetchDataElement() throws IOException;
+  abstract Object fetchDataElement() throws IOException, NoSuchElementException;
 
   OutputCollector getOutputCollector() {
     return outputCollector;
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/ParentTaskDataFetcher.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/ParentTaskDataFetcher.java
index 70680ccb8..6098b3028 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/ParentTaskDataFetcher.java
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/ParentTaskDataFetcher.java
@@ -20,6 +20,7 @@
 
 import org.apache.nemo.common.ir.OutputCollector;
 import org.apache.nemo.common.ir.vertex.IRVertex;
+import org.apache.nemo.common.punctuation.Finishmark;
 import org.apache.nemo.runtime.executor.data.DataUtil;
 import org.apache.nemo.runtime.executor.datatransfer.InputReader;
 import org.slf4j.Logger;
@@ -28,7 +29,6 @@
 import javax.annotation.concurrent.NotThreadSafe;
 import java.io.IOException;
 import java.util.List;
-import java.util.NoSuchElementException;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.LinkedBlockingQueue;
 
@@ -94,8 +94,7 @@ Object fetchDataElement() throws IOException {
       throw new IOException(e);
     }
 
-    // We throw the exception here, outside of the above try-catch region
-    throw new NoSuchElementException();
+    return Finishmark.getInstance();
   }
 
   private void advanceIterator() throws IOException {
@@ -160,4 +159,9 @@ private void countBytes(final DataUtil.IteratorWithNumBytes iterator) {
       LOG.error("Failed to get the number of bytes of encoded data - the data is not ready yet ", e);
     }
   }
+
+  @Override
+  public void close() throws Exception {
+
+  }
 }
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/SourceVertexDataFetcher.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/SourceVertexDataFetcher.java
index 505d24676..9ea8fa8fb 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/SourceVertexDataFetcher.java
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/SourceVertexDataFetcher.java
@@ -20,49 +20,92 @@
 
 import org.apache.nemo.common.ir.OutputCollector;
 import org.apache.nemo.common.ir.Readable;
-import org.apache.nemo.common.ir.vertex.IRVertex;
+import org.apache.nemo.common.ir.vertex.SourceVertex;
+import org.apache.nemo.common.punctuation.Watermark;
+import org.apache.nemo.common.punctuation.Finishmark;
 
 import java.io.IOException;
-import java.util.Iterator;
 import java.util.NoSuchElementException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 
 /**
  * Fetches data from a data source.
  */
 class SourceVertexDataFetcher extends DataFetcher {
   private final Readable readable;
-
-  // Non-finals (lazy fetching)
-  private Iterator iterator;
   private long boundedSourceReadTime = 0;
+  private static final long WATERMARK_PERIOD = 1000; // ms
+  private final ScheduledExecutorService watermarkTriggerService;
+  private boolean watermarkTriggered = false;
+  private final boolean bounded;
 
-  SourceVertexDataFetcher(final IRVertex dataSource,
+  SourceVertexDataFetcher(final SourceVertex dataSource,
                           final Readable readable,
                           final OutputCollector outputCollector) {
     super(dataSource, outputCollector);
     this.readable = readable;
+    this.readable.prepare();
+    this.bounded = dataSource.isBounded();
+
+    if (!bounded) {
+      this.watermarkTriggerService = Executors.newScheduledThreadPool(1);
+      this.watermarkTriggerService.scheduleAtFixedRate(() -> {
+        watermarkTriggered = true;
+      }, WATERMARK_PERIOD, WATERMARK_PERIOD, TimeUnit.MILLISECONDS);
+    } else {
+      this.watermarkTriggerService = null;
+    }
   }
 
+  /**
+   * This is non-blocking operation.
+   * @return current data
+   * @throws NoSuchElementException if the current data is not available
+   */
   @Override
-  Object fetchDataElement() throws IOException {
-    if (iterator == null) {
-      fetchDataLazily();
+  Object fetchDataElement() throws NoSuchElementException, IOException {
+    if (readable.isFinished()) {
+      return Finishmark.getInstance();
+    } else {
+      final long start = System.currentTimeMillis();
+      final Object element = retrieveElement();
+      boundedSourceReadTime += System.currentTimeMillis() - start;
+      return element;
     }
+  }
 
-    if (iterator.hasNext()) {
-      return iterator.next();
-    } else {
-      throw new NoSuchElementException();
+  final long getBoundedSourceReadTime() {
+    return boundedSourceReadTime;
+  }
+
+  @Override
+  public void close() throws Exception {
+    readable.close();
+    if (watermarkTriggerService != null) {
+      watermarkTriggerService.shutdown();
     }
   }
 
-  private void fetchDataLazily() throws IOException {
-    final long start = System.currentTimeMillis();
-    iterator = this.readable.read().iterator();
-    boundedSourceReadTime += System.currentTimeMillis() - start;
+  private boolean isWatermarkTriggerTime() {
+    if (watermarkTriggered) {
+      watermarkTriggered = false;
+      return true;
+    } else {
+      return false;
+    }
   }
 
-  final long getBoundedSourceReadTime() {
-    return boundedSourceReadTime;
+  private Object retrieveElement() throws NoSuchElementException, IOException {
+    // Emit watermark
+    if (!bounded && isWatermarkTriggerTime()) {
+      return new Watermark(readable.readWatermark());
+    }
+
+    // Data
+    final Object element = readable.readCurrent();
+    readable.advance();
+    return element;
   }
 }
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/TaskExecutor.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/TaskExecutor.java
index fac3e768c..449542a88 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/TaskExecutor.java
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/TaskExecutor.java
@@ -28,6 +28,8 @@
 import org.apache.nemo.common.ir.vertex.*;
 import org.apache.nemo.common.ir.vertex.transform.AggregateMetricTransform;
 import org.apache.nemo.common.ir.vertex.transform.Transform;
+import org.apache.nemo.common.punctuation.Watermark;
+import org.apache.nemo.common.punctuation.Finishmark;
 import org.apache.nemo.runtime.common.RuntimeIdManager;
 import org.apache.nemo.runtime.common.comm.ControlMessage;
 import org.apache.nemo.runtime.common.message.MessageEnvironment;
@@ -62,7 +64,6 @@
 @NotThreadSafe
 public final class TaskExecutor {
   private static final Logger LOG = LoggerFactory.getLogger(TaskExecutor.class.getName());
-  private static final int NONE_FINISHED = -1;
 
   // Essential information
   private boolean isExecuted;
@@ -194,7 +195,8 @@ public TaskExecutor(final Task task,
       // Source read
       if (irVertex instanceof SourceVertex) {
         // Source vertex read
-        nonBroadcastDataFetcherList.add(new SourceVertexDataFetcher(irVertex, sourceReader.get(), outputCollector));
+        nonBroadcastDataFetcherList.add(new SourceVertexDataFetcher(
+          (SourceVertex) irVertex, sourceReader.get(), outputCollector));
       }
 
       // Parent-task read (broadcasts)
@@ -243,6 +245,11 @@ private void processElement(final OutputCollector outputCollector, final Object
     outputCollector.emit(dataElement);
   }
 
+  private void processWatermark(final OutputCollector outputCollector, final Watermark watermark) {
+    // TODO #231: Add onWatermark() method to Transform and
+    // TODO #231: fowards watermark to Transforms and OutputWriters
+  }
+
   /**
    * Execute a task, while handling unrecoverable errors and exceptions.
    */
@@ -303,46 +310,145 @@ private void finalizeVertex(final VertexHarness vertexHarness) {
   }
 
   /**
+   * Process an element generated from the dataFetcher.
+   * If the element is an instance of Finishmark, we remove the dataFetcher from the current list.
+   * @param element element
+   * @param dataFetcher current data fetcher
+   * @param dataFetchers current list
+   */
+  private void handleElement(final Object element,
+                             final DataFetcher dataFetcher,
+                             final List<DataFetcher> dataFetchers) {
+    if (element instanceof Finishmark) {
+      // We've consumed all the data from this data fetcher.
+      if (dataFetcher instanceof SourceVertexDataFetcher) {
+        boundedSourceReadTime += ((SourceVertexDataFetcher) dataFetcher).getBoundedSourceReadTime();
+      } else if (dataFetcher instanceof ParentTaskDataFetcher) {
+        serializedReadBytes += ((ParentTaskDataFetcher) dataFetcher).getSerializedBytes();
+        encodedReadBytes += ((ParentTaskDataFetcher) dataFetcher).getEncodedBytes();
+      }
+
+      // remove current data fetcher from the list
+      dataFetchers.remove(dataFetcher);
+    } else if (element instanceof Watermark) {
+      // Watermark
+      processWatermark(dataFetcher.getOutputCollector(), (Watermark) element);
+    } else {
+      // Process data element
+      processElement(dataFetcher.getOutputCollector(), element);
+    }
+  }
+
+  /**
+   * Check if it is time to poll pending fetchers' data.
+   * @param pollingPeriod polling period
+   * @param currentTime current time
+   * @param prevTime prev time
+   */
+  private boolean isPollingTime(final long pollingPeriod,
+                                final long currentTime,
+                                final long prevTime) {
+    return (currentTime - prevTime) >= pollingPeriod;
+  }
+
+  /**
+   * This retrieves data from data fetchers and process them.
+   * It maintains two lists:
+   *  -- availableFetchers: maintain data fetchers that currently have data elements to retreive
+   *  -- pendingFetchers: maintain data fetchers that currently do not have available elements.
+   *     This can become available in the future, and therefore we check the pending fetchers every pollingInterval.
+   *
+   *  If a data fetcher finishes, we remove it from the two lists.
+   *  If a data fetcher has no available element, we move the data fetcher to pendingFetchers
+   *  If a pending data fetcher has element, we move it to availableFetchers
+   *  If there are no available fetchers but pending fetchers, sleep for pollingPeriod
+   *  and retry fetching data from the pendingFetchers.
+   *
    * @param fetchers to handle.
    * @return false if IOException.
    */
   private boolean handleDataFetchers(final List<DataFetcher> fetchers) {
-    final List<DataFetcher> availableFetchers = new ArrayList<>(fetchers);
-    while (!availableFetchers.isEmpty()) { // empty means we've consumed all task-external input data
-      // For this looping of available fetchers.
-      int finishedFetcherIndex = NONE_FINISHED;
-      for (int i = 0; i < availableFetchers.size(); i++) {
-        final DataFetcher dataFetcher = availableFetchers.get(i);
-        final Object element;
+    final List<DataFetcher> availableFetchers = new LinkedList<>(fetchers);
+    final List<DataFetcher> pendingFetchers = new LinkedList<>();
+
+    // Polling interval.
+    final long pollingInterval = 100; // ms
+
+    // Previous polling time
+    long prevPollingTime = System.currentTimeMillis();
+
+    // empty means we've consumed all task-external input data
+    while (!availableFetchers.isEmpty() || !pendingFetchers.isEmpty()) {
+      // We first fetch data from available data fetchers
+      final Iterator<DataFetcher> availableIterator = availableFetchers.iterator();
+
+      while (availableIterator.hasNext()) {
+        final DataFetcher dataFetcher = availableIterator.next();
         try {
-          element = dataFetcher.fetchDataElement();
-        } catch (NoSuchElementException e) {
-          // We've consumed all the data from this data fetcher.
-          if (dataFetcher instanceof SourceVertexDataFetcher) {
-            boundedSourceReadTime += ((SourceVertexDataFetcher) dataFetcher).getBoundedSourceReadTime();
-          } else if (dataFetcher instanceof ParentTaskDataFetcher) {
-            serializedReadBytes += ((ParentTaskDataFetcher) dataFetcher).getSerializedBytes();
-            encodedReadBytes += ((ParentTaskDataFetcher) dataFetcher).getEncodedBytes();
-          }
-          finishedFetcherIndex = i;
-          break;
-        } catch (IOException e) {
+          handleElement(dataFetcher.fetchDataElement(), dataFetcher, availableFetchers);
+        } catch (final NoSuchElementException e) {
+          // No element in current data fetcher, fetch data from next fetcher
+          // move current data fetcher to pending.
+          availableIterator.remove();
+          pendingFetchers.add(dataFetcher);
+        } catch (final IOException e) {
           // IOException means that this task should be retried.
           taskStateManager.onTaskStateChanged(TaskState.State.SHOULD_RETRY,
             Optional.empty(), Optional.of(TaskState.RecoverableTaskFailureCause.INPUT_READ_FAILURE));
           LOG.error("{} Execution Failed (Recoverable: input read failure)! Exception: {}", taskId, e);
           return false;
         }
+      }
+
+      final Iterator<DataFetcher> pendingIterator = pendingFetchers.iterator();
+      final long currentTime = System.currentTimeMillis();
+      // We check pending data every polling interval
+      while (pendingIterator.hasNext()
+        && isPollingTime(pollingInterval, currentTime, prevPollingTime)) {
+        prevPollingTime = currentTime;
+
+        final DataFetcher dataFetcher = pendingIterator.next();
+        try {
+          handleElement(dataFetcher.fetchDataElement(), dataFetcher, pendingFetchers);
 
-        // Successfully fetched an element
-        processElement(dataFetcher.getOutputCollector(), element);
+          // We processed data. This means the data fetcher is now available.
+          // Add current data fetcher to available
+          pendingIterator.remove();
+          availableFetchers.add(dataFetcher);
+
+        } catch (final NoSuchElementException e) {
+          // The current data fetcher is still pending.. try next data fetcher
+        } catch (final IOException e) {
+          // IOException means that this task should be retried.
+          taskStateManager.onTaskStateChanged(TaskState.State.SHOULD_RETRY,
+            Optional.empty(), Optional.of(TaskState.RecoverableTaskFailureCause.INPUT_READ_FAILURE));
+          LOG.error("{} Execution Failed (Recoverable: input read failure)! Exception: {}", taskId, e);
+          return false;
+        }
       }
 
-      // Remove the finished fetcher from the list
-      if (finishedFetcherIndex != NONE_FINISHED) {
-        availableFetchers.remove(finishedFetcherIndex);
+      // If there are no available fetchers,
+      // Sleep and retry fetching element from pending fetchers every polling interval
+      if (availableFetchers.isEmpty() && !pendingFetchers.isEmpty()) {
+        try {
+          Thread.sleep(pollingInterval);
+        } catch (InterruptedException e) {
+          e.printStackTrace();
+          throw new RuntimeException(e);
+        }
       }
     }
+
+    // Close all data fetchers
+    fetchers.forEach(fetcher -> {
+      try {
+        fetcher.close();
+      } catch (final Exception e) {
+        e.printStackTrace();
+        throw new RuntimeException(e);
+      }
+    });
+
     return true;
   }
 
diff --git a/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/task/ParentTaskDataFetcherTest.java b/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/task/ParentTaskDataFetcherTest.java
index b12024184..705f833bb 100644
--- a/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/task/ParentTaskDataFetcherTest.java
+++ b/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/task/ParentTaskDataFetcherTest.java
@@ -20,6 +20,7 @@
 
 import org.apache.nemo.common.ir.OutputCollector;
 import org.apache.nemo.common.ir.vertex.IRVertex;
+import org.apache.nemo.common.punctuation.Finishmark;
 import org.apache.nemo.runtime.executor.data.DataUtil;
 import org.apache.nemo.runtime.executor.datatransfer.InputReader;
 import org.junit.Test;
@@ -44,16 +45,14 @@
 @PrepareForTest({InputReader.class, VertexHarness.class})
 public final class ParentTaskDataFetcherTest {
 
-  @Test(timeout=5000, expected = NoSuchElementException.class)
+  @Test(timeout=5000)
   public void testEmpty() throws Exception {
     final List<String> empty = new ArrayList<>(0); // empty data
     final InputReader inputReader = generateInputReader(generateCompletableFuture(empty.iterator()));
 
     // Fetcher
     final ParentTaskDataFetcher fetcher = createFetcher(inputReader);
-
-    // Should trigger the expected 'NoSuchElementException'
-    fetcher.fetchDataElement();
+    assertEquals(Finishmark.getInstance(), fetcher.fetchDataElement());
   }
 
   @Test(timeout=5000)
diff --git a/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/task/TaskExecutorTest.java b/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/task/TaskExecutorTest.java
index d346a9df1..e0c4f957a 100644
--- a/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/task/TaskExecutorTest.java
+++ b/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/task/TaskExecutorTest.java
@@ -19,6 +19,7 @@
 package org.apache.nemo.runtime.executor.task;
 
 import org.apache.nemo.common.Pair;
+import org.apache.nemo.common.ir.BoundedIteratorReadable;
 import org.apache.nemo.common.ir.OutputCollector;
 import org.apache.nemo.common.dag.DAG;
 import org.apache.nemo.common.dag.DAGBuilder;
@@ -32,6 +33,7 @@
 import org.apache.nemo.common.ir.executionproperty.VertexExecutionProperty;
 import org.apache.nemo.common.ir.vertex.InMemorySourceVertex;
 import org.apache.nemo.common.ir.vertex.OperatorVertex;
+import org.apache.nemo.common.ir.vertex.SourceVertex;
 import org.apache.nemo.common.ir.vertex.transform.Transform;
 import org.apache.nemo.common.ir.executionproperty.ExecutionPropertyMap;
 import org.apache.nemo.common.ir.vertex.IRVertex;
@@ -61,9 +63,11 @@
 import java.util.*;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.ArgumentMatchers.anyString;
@@ -130,20 +134,32 @@ private boolean checkEqualElements(final List<Integer> left, final List<Integer>
   /**
    * Test source vertex data fetching.
    */
-  @Test(timeout=5000)
+  @Test()
   public void testSourceVertexDataFetching() throws Exception {
     final IRVertex sourceIRVertex = new InMemorySourceVertex<>(elements);
 
-    final Readable readable = new Readable() {
+    final Readable readable = new BoundedIteratorReadable() {
+      @Override
+      protected Iterator initializeIterator() {
+        return elements.iterator();
+      }
+
       @Override
-      public Iterable read() throws IOException {
-        return elements;
+      public long readWatermark() {
+        throw new UnsupportedOperationException();
       }
+
       @Override
       public List<String> getLocations() {
         throw new UnsupportedOperationException();
       }
+
+      @Override
+      public void close() throws IOException {
+
+      }
     };
+
     final Map<String, Readable> vertexIdToReadable = new HashMap<>();
     vertexIdToReadable.put(sourceIRVertex.getId(), readable);
 
@@ -171,6 +187,116 @@ public Iterable read() throws IOException {
     assertTrue(checkEqualElements(elements, runtimeEdgeToOutputData.get(taskOutEdge.getId())));
   }
 
+  /**
+   * This test emits data and watermark by emulating an unbounded source readable.
+   */
+  @Test()
+  public void testUnboundedSourceVertexDataFetching() throws Exception {
+    final IRVertex sourceIRVertex = new SourceVertex() {
+      @Override
+      public IRVertex getClone() {
+        return this;
+      }
+
+      @Override
+      public boolean isBounded() {
+        return false;
+      }
+
+      @Override
+      public List<Readable> getReadables(int desiredNumOfSplits) throws Exception {
+        return null;
+      }
+
+      @Override
+      public void clearInternalStates() {
+
+      }
+    };
+
+    final long watermark = 1234567L;
+    final AtomicLong emittedWatermark = new AtomicLong(0);
+
+    final Readable readable = new Readable() {
+      int pointer = 0;
+      final int middle = elements.size() / 2;
+      final int end = elements.size();
+      boolean watermarkEmitted = false;
+
+      @Override
+      public void prepare() {
+
+      }
+
+      // This emulates unbounded source that throws NoSuchElementException
+      // It reads current data until middle point and  throws NoSuchElementException at the middle point.
+      // It resumes the data reading after emitting a watermark, and finishes at the end of the data.
+      @Override
+      public Object readCurrent() throws NoSuchElementException {
+        if (pointer == middle && !watermarkEmitted) {
+          throw new NoSuchElementException();
+        }
+
+        return elements.get(pointer);
+      }
+
+      @Override
+      public void advance() throws IOException {
+        pointer += 1;
+      }
+
+      @Override
+      public long readWatermark() {
+        watermarkEmitted = true;
+        emittedWatermark.set(watermark);
+        return watermark;
+      }
+
+      @Override
+      public boolean isFinished() {
+        return pointer == end;
+      }
+
+      @Override
+      public List<String> getLocations() throws Exception {
+        return null;
+      }
+
+      @Override
+      public void close() throws IOException {
+      }
+    };
+
+    final Map<String, Readable> vertexIdToReadable = new HashMap<>();
+    vertexIdToReadable.put(sourceIRVertex.getId(), readable);
+
+    final DAG<IRVertex, RuntimeEdge<IRVertex>> taskDag =
+      new DAGBuilder<IRVertex, RuntimeEdge<IRVertex>>()
+        .addVertex(sourceIRVertex)
+        .buildWithoutSourceSinkCheck();
+
+    final StageEdge taskOutEdge = mockStageEdgeFrom(sourceIRVertex);
+    final Task task =
+      new Task(
+        "testSourceVertexDataFetching",
+        generateTaskId(),
+        TASK_EXECUTION_PROPERTY_MAP,
+        new byte[0],
+        Collections.emptyList(),
+        Collections.singletonList(taskOutEdge),
+        vertexIdToReadable);
+
+    // Execute the task.
+    final TaskExecutor taskExecutor = getTaskExecutor(task, taskDag);
+    taskExecutor.execute();
+
+    // Check whether the watermark is emitted
+    assertEquals(watermark, emittedWatermark.get());
+
+    // Check the output.
+    assertTrue(checkEqualElements(elements, runtimeEdgeToOutputData.get(taskOutEdge.getId())));
+  }
+
   /**
    * Test parent task data fetching.
    */


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services