You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nemo.apache.org by ja...@apache.org on 2018/08/09 05:23:52 UTC

[incubator-nemo] branch master updated: [NEMO-160] Handle Beam VoidCoder properly (#93)

This is an automated email from the ASF dual-hosted git repository.

jangho pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git


The following commit(s) were added to refs/heads/master by this push:
     new 0ec68b6  [NEMO-160] Handle Beam VoidCoder properly (#93)
0ec68b6 is described below

commit 0ec68b6f0be0f20969eda0f0378cfdde8bcfbb4c
Author: John Yang <jo...@gmail.com>
AuthorDate: Thu Aug 9 14:23:50 2018 +0900

    [NEMO-160] Handle Beam VoidCoder properly (#93)
    
    JIRA: [NEMO-160: Handle Beam VoidCoder properly](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-160)
    
    **Major changes:**
    - Proper support for (1) empty, or (2) 'null'-value input in DataFetchers
    - Allow 'null' values emitted by user code (e.g., Create.of((Void) null) by Beam WriteFiles) to be passed around in the data plane
    - Remove specific assumptions about Beam VoidCoder in the data plane (e.g., Void.TYPE), and use the coder transparently just like we use any other coders
    - DataFetcher throws a NoSuchElementException instead of returning 'null', when indicating that all data has been fetched
    
    **Minor changes to note:**
    - Cleans up tests
    
    **Tests for the changes:**
    - ParentTaskDataFetcherTest#testEmpty, testNull
    
    **Other comments:**
    - N/A
    
    resolves [NEMO-160](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-160)
---
 .../executor/datatransfer/OutputCollectorImpl.java |  81 ++++--------
 .../nemo/runtime/executor/task/DataFetcher.java    |   6 +-
 .../executor/task/ParentTaskDataFetcher.java       | 143 +++++++++------------
 .../executor/task/SourceVertexDataFetcher.java     |  15 ++-
 .../nemo/runtime/executor/task/TaskExecutor.java   |  60 ++++-----
 .../executor/task/ParentTaskDataFetcherTest.java   |  53 ++------
 6 files changed, 139 insertions(+), 219 deletions(-)

diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/OutputCollectorImpl.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/OutputCollectorImpl.java
index 32b9352..e6433fd 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/OutputCollectorImpl.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/OutputCollectorImpl.java
@@ -17,11 +17,7 @@ package edu.snu.nemo.runtime.executor.datatransfer;
 
 import edu.snu.nemo.common.ir.OutputCollector;
 
-import java.util.ArrayDeque;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.List;
-import java.util.Queue;
+import java.util.*;
 
 /**
  * OutputCollector implementation.
@@ -29,92 +25,61 @@ import java.util.Queue;
  * @param <O> output type.
  */
 public final class OutputCollectorImpl<O> implements OutputCollector<O> {
-  private final Queue<O> mainTagOutputQueue;
-  private final Map<String, Queue<Object>> additionalTagOutputQueues;
-
-  /**
-   * Constructor of a new OutputCollectorImpl.
-   */
-  public OutputCollectorImpl() {
-    this.mainTagOutputQueue = new ArrayDeque<>(1);
-    this.additionalTagOutputQueues = new HashMap<>();
-  }
+  // Use ArrayList (not Queue) to allow 'null' values
+  private final ArrayList<O> mainTagElements;
+  private final Map<String, ArrayList<Object>> additionalTagElementsMap;
 
   /**
    * Constructor of a new OutputCollectorImpl with tagged outputs.
    * @param taggedChildren tagged children
    */
   public OutputCollectorImpl(final List<String> taggedChildren) {
-    this.mainTagOutputQueue = new ArrayDeque<>(1);
-    this.additionalTagOutputQueues = new HashMap<>();
-    taggedChildren.forEach(child -> this.additionalTagOutputQueues.put(child, new ArrayDeque<>(1)));
+    this.mainTagElements = new ArrayList<>(1);
+    this.additionalTagElementsMap = new HashMap<>();
+    taggedChildren.forEach(child -> this.additionalTagElementsMap.put(child, new ArrayList<>(1)));
   }
 
   @Override
   public void emit(final O output) {
-    mainTagOutputQueue.add(output);
+    mainTagElements.add(output);
   }
 
   @Override
   public <T> void emit(final String dstVertexId, final T output) {
-    if (this.additionalTagOutputQueues.get(dstVertexId) == null) {
+    if (this.additionalTagElementsMap.get(dstVertexId) == null) {
       // This dstVertexId is for the main tag
       emit((O) output);
     } else {
       // Note that String#hashCode() can be cached, thus accessing additional output queues can be fast.
-      this.additionalTagOutputQueues.get(dstVertexId).add(output);
+      this.additionalTagElementsMap.get(dstVertexId).add(output);
     }
   }
 
-  /**
-   * Inter-Task data is transferred from sender-side Task's OutputCollectorImpl
-   * to receiver-side Task.
-   *
-   * @return the first element of this list
-   */
-  public O remove() {
-    return mainTagOutputQueue.remove();
+  public Iterable<O> iterateMain() {
+    return mainTagElements;
   }
 
-  /**
-   * Inter-task data is transferred from sender-side Task's OutputCollectorImpl
-   * to receiver-side Task.
-   *
-   * @param tag output tag
-   * @return the first element of corresponding list
-   */
-  public Object remove(final String tag) {
-    if (this.additionalTagOutputQueues.get(tag) == null) {
+  public Iterable<Object> iterateTag(final String tag) {
+    if (this.additionalTagElementsMap.get(tag) == null) {
       // This dstVertexId is for the main tag
-      return remove();
+      return (Iterable<Object>) iterateMain();
     } else {
       // Note that String#hashCode() can be cached, thus accessing additional output queues can be fast.
-      return this.additionalTagOutputQueues.get(tag).remove();
+      return this.additionalTagElementsMap.get(tag);
     }
-
   }
 
-  /**
-   * Check if this OutputCollector is empty.
-   *
-   * @return true if this OutputCollector is empty.
-   */
-  public boolean isEmpty() {
-    return mainTagOutputQueue.isEmpty();
+  public void clearMain() {
+    mainTagElements.clear();
   }
 
-  /**
-   * Check if this OutputCollector is empty.
-   *
-   * @param tag output tag
-   * @return true if this OutputCollector is empty.
-   */
-  public boolean isEmpty(final String tag) {
-    if (this.additionalTagOutputQueues.get(tag) == null) {
-      return isEmpty();
+  public void clearTag(final String tag) {
+    if (this.additionalTagElementsMap.get(tag) == null) {
+      // This dstVertexId is for the main tag
+      clearMain();
     } else {
       // Note that String#hashCode() can be cached, thus accessing additional output queues can be fast.
-      return this.additionalTagOutputQueues.get(tag).isEmpty();
+      this.additionalTagElementsMap.get(tag).clear();
     }
   }
 }
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/DataFetcher.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/DataFetcher.java
index c7aeb0e..bb80e1a 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/DataFetcher.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/DataFetcher.java
@@ -40,9 +40,9 @@ abstract class DataFetcher {
 
   /**
    * Can block until the next data element becomes available.
-   *
-   * @return null if there's no more data element.
-   * @throws IOException while fetching data
+   * @return data element
+   * @throws IOException upon I/O error
+   * @throws java.util.NoSuchElementException if no more element is available
    */
   abstract Object fetchDataElement() throws IOException;
 
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/ParentTaskDataFetcher.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/ParentTaskDataFetcher.java
index 91c80d0..0193bae 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/ParentTaskDataFetcher.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/ParentTaskDataFetcher.java
@@ -15,8 +15,6 @@
  */
 package edu.snu.nemo.runtime.executor.task;
 
-import edu.snu.nemo.common.coder.DecoderFactory;
-import edu.snu.nemo.common.ir.edge.executionproperty.DecoderProperty;
 import edu.snu.nemo.common.ir.vertex.IRVertex;
 import edu.snu.nemo.runtime.executor.data.DataUtil;
 import edu.snu.nemo.runtime.executor.datatransfer.InputReader;
@@ -26,7 +24,7 @@ import org.slf4j.LoggerFactory;
 import javax.annotation.concurrent.NotThreadSafe;
 import java.io.IOException;
 import java.util.List;
-import java.util.Optional;
+import java.util.NoSuchElementException;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.LinkedBlockingQueue;
 
@@ -41,11 +39,10 @@ class ParentTaskDataFetcher extends DataFetcher {
   private final LinkedBlockingQueue iteratorQueue;
 
   // Non-finals (lazy fetching)
-  private boolean hasFetchStarted;
+  private boolean firstFetch;
   private int expectedNumOfIterators;
   private DataUtil.IteratorWithNumBytes currentIterator;
   private int currentIteratorIndex;
-  private boolean noElementAtAll = true;
   private long serBytes = 0;
   private long encodedBytes = 0;
 
@@ -55,85 +52,35 @@ class ParentTaskDataFetcher extends DataFetcher {
                         final boolean isToSideInput) {
     super(dataSource, child, readerForParentTask.isSideInputReader(), isToSideInput);
     this.readersForParentTask = readerForParentTask;
-    this.hasFetchStarted = false;
+    this.firstFetch = true;
     this.currentIteratorIndex = 0;
     this.iteratorQueue = new LinkedBlockingQueue<>();
   }
 
-  private void countBytes(final DataUtil.IteratorWithNumBytes iterator) {
-    try {
-      serBytes += iterator.getNumSerializedBytes();
-    } catch (final DataUtil.IteratorWithNumBytes.NumBytesNotSupportedException e) {
-      serBytes = -1;
-    } catch (final IllegalStateException e) {
-      LOG.error("Failed to get the number of bytes of serialized data - the data is not ready yet ", e);
-    }
-    try {
-      encodedBytes += iterator.getNumEncodedBytes();
-    } catch (final DataUtil.IteratorWithNumBytes.NumBytesNotSupportedException e) {
-      encodedBytes = -1;
-    } catch (final IllegalStateException e) {
-      LOG.error("Failed to get the number of bytes of encoded data - the data is not ready yet ", e);
-    }
-  }
-
-  /**
-   * Blocking call.
-   */
-  private void fetchInBackground() {
-    final List<CompletableFuture<DataUtil.IteratorWithNumBytes>> futures = readersForParentTask.read();
-    this.expectedNumOfIterators = futures.size();
-
-    futures.forEach(compFuture -> compFuture.whenComplete((iterator, exception) -> {
-      try {
-        if (exception != null) {
-          iteratorQueue.put(exception); // can block here
-        } else {
-          iteratorQueue.put(iterator); // can block here
-        }
-      } catch (final InterruptedException e) {
-        Thread.currentThread().interrupt();
-        throw new RuntimeException(e); // This shouldn't happen
-      }
-    }));
-  }
-
   @Override
   Object fetchDataElement() throws IOException {
     try {
-      if (!hasFetchStarted) {
-        fetchInBackground();
+      if (firstFetch) {
+        fetchDataLazily();
         advanceIterator();
+        firstFetch = false;
       }
 
-      if (this.currentIterator.hasNext()) {
-        // This iterator has an element available
-        noElementAtAll = false;
-        return this.currentIterator.next();
-      } else {
-        if (currentIteratorIndex == expectedNumOfIterators) {
-          // Entire fetcher is done
-          if (noElementAtAll) {
-            final Optional<DecoderFactory> decoderFactory =
-                readersForParentTask.getRuntimeEdge().getPropertyValue(DecoderProperty.class);
-
-            // TODO #173: Properly handle zero-element task outputs. Currently fetchDataElement relies on
-            // toString() method to distinguish whether to return Void.TYPE or not.
-            if (decoderFactory.get().toString().equals("VoidCoder")) {
-              noElementAtAll = false;
-              return Void.TYPE;
-            } else {
-              return null;
-            }
-          } else {
-            // This whole fetcher's done
-            return null;
-          }
-        } else {
-          // Advance to the next one
+      while (true) {
+        // This iterator has the element
+        if (this.currentIterator.hasNext()) {
+          return this.currentIterator.next();
+        }
+
+        // This iterator does not have the element
+        if (currentIteratorIndex < expectedNumOfIterators) {
+          // Next iterator has the element
           countBytes(currentIterator);
           advanceIterator();
-          return fetchDataElement();
+          continue;
+        } else {
+          // We've consumed all the iterators
+          break;
         }
       }
     } catch (final Throwable e) {
@@ -144,33 +91,71 @@ class ParentTaskDataFetcher extends DataFetcher {
       // "throw Exception" that the TaskExecutor thread can catch and handle.
       throw new IOException(e);
     }
+
+    // We throw the exception here, outside of the above try-catch region
+    throw new NoSuchElementException();
   }
 
-  private void advanceIterator() throws Throwable {
+  private void advanceIterator() throws IOException {
     // Take from iteratorQueue
     final Object iteratorOrThrowable;
     try {
-      iteratorOrThrowable = iteratorQueue.take();
+      iteratorOrThrowable = iteratorQueue.take(); // blocking call
     } catch (InterruptedException e) {
-      throw e;
+      Thread.currentThread().interrupt();
+      throw new IOException(e);
     }
 
     // Handle iteratorOrThrowable
     if (iteratorOrThrowable instanceof Throwable) {
-      throw (Throwable) iteratorOrThrowable;
+      throw new IOException((Throwable) iteratorOrThrowable);
     } else {
       // This iterator is valid. Do advance.
-      hasFetchStarted = true;
       this.currentIterator = (DataUtil.IteratorWithNumBytes) iteratorOrThrowable;
       this.currentIteratorIndex++;
     }
   }
 
-  public final long getSerializedBytes() {
+  private void fetchDataLazily() throws IOException {
+    final List<CompletableFuture<DataUtil.IteratorWithNumBytes>> futures = readersForParentTask.read();
+    this.expectedNumOfIterators = futures.size();
+
+    futures.forEach(compFuture -> compFuture.whenComplete((iterator, exception) -> {
+      try {
+        if (exception != null) {
+          iteratorQueue.put(exception);
+        } else {
+          iteratorQueue.put(iterator);
+        }
+      } catch (final InterruptedException e) {
+        Thread.currentThread().interrupt();
+        throw new RuntimeException(e); // this should not happen
+      }
+    }));
+  }
+
+  final long getSerializedBytes() {
     return serBytes;
   }
 
-  public final long getEncodedBytes() {
+  final long getEncodedBytes() {
     return encodedBytes;
   }
+
+  private void countBytes(final DataUtil.IteratorWithNumBytes iterator) {
+    try {
+      serBytes += iterator.getNumSerializedBytes();
+    } catch (final DataUtil.IteratorWithNumBytes.NumBytesNotSupportedException e) {
+      serBytes = -1;
+    } catch (final IllegalStateException e) {
+      LOG.error("Failed to get the number of bytes of serialized data - the data is not ready yet ", e);
+    }
+    try {
+      encodedBytes += iterator.getNumEncodedBytes();
+    } catch (final DataUtil.IteratorWithNumBytes.NumBytesNotSupportedException e) {
+      encodedBytes = -1;
+    } catch (final IllegalStateException e) {
+      LOG.error("Failed to get the number of bytes of encoded data - the data is not ready yet ", e);
+    }
+  }
 }
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/SourceVertexDataFetcher.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/SourceVertexDataFetcher.java
index 817139b..425cb46 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/SourceVertexDataFetcher.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/SourceVertexDataFetcher.java
@@ -20,6 +20,7 @@ import edu.snu.nemo.common.ir.vertex.IRVertex;
 
 import java.io.IOException;
 import java.util.Iterator;
+import java.util.NoSuchElementException;
 
 /**
  * Fetches data from a data source.
@@ -42,19 +43,23 @@ class SourceVertexDataFetcher extends DataFetcher {
   @Override
   Object fetchDataElement() throws IOException {
     if (iterator == null) {
-      final long start = System.currentTimeMillis();
-      iterator = this.readable.read().iterator();
-      boundedSourceReadTime += System.currentTimeMillis() - start;
+      fetchDataLazily();
     }
 
     if (iterator.hasNext()) {
       return iterator.next();
     } else {
-      return null;
+      throw new NoSuchElementException();
     }
   }
 
-  public final long getBoundedSourceReadTime() {
+  private void fetchDataLazily() throws IOException {
+    final long start = System.currentTimeMillis();
+    iterator = this.readable.read().iterator();
+    boundedSourceReadTime += System.currentTimeMillis() - start;
+  }
+
+  final long getBoundedSourceReadTime() {
     return boundedSourceReadTime;
   }
 }
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/TaskExecutor.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/TaskExecutor.java
index 809d747..807e5bb 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/TaskExecutor.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/TaskExecutor.java
@@ -220,19 +220,15 @@ public final class TaskExecutor {
     }
 
     // Given a single input element, a vertex can produce many output elements.
-    // Here, we recursively process all of the main output elements.
-    while (!outputCollector.isEmpty()) {
-      final Object element = outputCollector.remove();
-      handleMainOutputElement(vertexHarness, element); // Recursion
-    }
+    // Here, we recursively process all of the main oltput elements.
+    outputCollector.iterateMain().forEach(element -> handleMainOutputElement(vertexHarness, element)); // Recursion
+    outputCollector.clearMain();
 
     // Recursively process all of the additional output elements.
-    vertexHarness.getContext().getAdditionalTagOutputs().values().forEach(value -> {
-      final String dstVertexId = (String) value;
-      while (!outputCollector.isEmpty(dstVertexId)) {
-        final Object element = outputCollector.remove(dstVertexId);
-        handleAdditionalOutputElement(vertexHarness, element, dstVertexId); // Recursion
-      }
+    vertexHarness.getContext().getAdditionalTagOutputs().values().forEach(tag -> {
+      outputCollector.iterateTag(tag).forEach(
+          element -> handleAdditionalOutputElement(vertexHarness, element, tag)); // Recursion
+      outputCollector.clearTag(tag);
     });
   }
 
@@ -326,17 +322,14 @@ public final class TaskExecutor {
     final OutputCollectorImpl outputCollector = vertexHarness.getOutputCollector();
 
     // handle main outputs
-    while (!outputCollector.isEmpty()) {
-      final Object element = outputCollector.remove();
-      handleMainOutputElement(vertexHarness, element);
-    }
+    outputCollector.iterateMain().forEach(element -> handleMainOutputElement(vertexHarness, element)); // Recursion
+    outputCollector.clearMain();
 
     // handle additional tagged outputs
     vertexHarness.getAdditionalTagOutputChildren().keySet().forEach(tag -> {
-      while (!outputCollector.isEmpty(tag)) {
-        final Object element = outputCollector.remove(tag);
-        handleAdditionalOutputElement(vertexHarness, element, tag);
-      }
+      outputCollector.iterateTag(tag).forEach(
+          element -> handleAdditionalOutputElement(vertexHarness, element, tag)); // Recursion
+      outputCollector.clearTag(tag);
     });
     finalizeOutputWriters(vertexHarness);
   }
@@ -383,16 +376,11 @@ public final class TaskExecutor {
       for (int i = 0; i < availableFetchers.size(); i++) {
         final DataFetcher dataFetcher = availableFetchers.get(i);
         final Object element;
+
         try {
           element = dataFetcher.fetchDataElement();
-        } catch (IOException e) {
-          taskStateManager.onTaskStateChanged(TaskState.State.SHOULD_RETRY,
-              Optional.empty(), Optional.of(TaskState.RecoverableTaskFailureCause.INPUT_READ_FAILURE));
-          LOG.error("{} Execution Failed (Recoverable: input read failure)! Exception: {}", taskId, e.toString());
-          return false;
-        }
-
-        if (element == null) {
+        } catch (NoSuchElementException e) {
+          // We've consumed all the data from this data fetcher.
           if (dataFetcher instanceof SourceVertexDataFetcher) {
             boundedSourceReadTime += ((SourceVertexDataFetcher) dataFetcher).getBoundedSourceReadTime();
           } else if (dataFetcher instanceof ParentTaskDataFetcher) {
@@ -401,12 +389,19 @@ public final class TaskExecutor {
           }
           finishedFetcherIndex = i;
           break;
+        } catch (IOException e) {
+          // IOException means that this task should be retried.
+          taskStateManager.onTaskStateChanged(TaskState.State.SHOULD_RETRY,
+              Optional.empty(), Optional.of(TaskState.RecoverableTaskFailureCause.INPUT_READ_FAILURE));
+          LOG.error("{} Execution Failed (Recoverable: input read failure)! Exception: {}", taskId, e.toString());
+          return false;
+        }
+
+        // Successfully fetched an element
+        if (dataFetcher.isFromSideInput()) {
+          sideInputMap.put(((OperatorVertex) dataFetcher.getDataSource()).getTransform().getTag(), element);
         } else {
-          if (dataFetcher.isFromSideInput()) {
-            sideInputMap.put(((OperatorVertex) dataFetcher.getDataSource()).getTransform().getTag(), element);
-          } else {
-            processElementRecursively(dataFetcher.getChild(), element);
-          }
+          processElementRecursively(dataFetcher.getChild(), element);
         }
       }
 
@@ -588,6 +583,7 @@ public final class TaskExecutor {
     // finalize OutputWriters for additional tagged children
     vertexHarness.getWritersToAdditionalChildrenTasks().values().forEach(outputWriter -> {
       outputWriter.close();
+
       final Optional<Long> writtenBytes = outputWriter.getWrittenBytes();
       writtenBytes.ifPresent(writtenBytesList::add);
     });
diff --git a/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/task/ParentTaskDataFetcherTest.java b/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/task/ParentTaskDataFetcherTest.java
index 3f1fd3d..47c2d19 100644
--- a/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/task/ParentTaskDataFetcherTest.java
+++ b/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/task/ParentTaskDataFetcherTest.java
@@ -15,11 +15,7 @@
  */
 package edu.snu.nemo.runtime.executor.task;
 
-import edu.snu.nemo.common.coder.DecoderFactory;
-import edu.snu.nemo.common.ir.edge.executionproperty.DecoderProperty;
-import edu.snu.nemo.common.ir.executionproperty.ExecutionPropertyMap;
 import edu.snu.nemo.common.ir.vertex.IRVertex;
-import edu.snu.nemo.runtime.common.plan.RuntimeEdge;
 import edu.snu.nemo.runtime.executor.data.DataUtil;
 import edu.snu.nemo.runtime.executor.datatransfer.InputReader;
 import org.junit.Test;
@@ -35,7 +31,6 @@ import java.util.concurrent.Executors;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.mockingDetails;
 import static org.mockito.Mockito.when;
 
 /**
@@ -45,31 +40,28 @@ import static org.mockito.Mockito.when;
 @PrepareForTest({InputReader.class, VertexHarness.class})
 public final class ParentTaskDataFetcherTest {
 
-  @Test(timeout=5000)
-  public void testVoid() throws Exception {
-    // TODO #173: Properly handle zero-element. This test should be updated too.
-    final List<String> dataElements = new ArrayList<>(0); // empty data
-    final InputReader inputReader = generateInputReaderWithCoder(generateCompletableFuture(dataElements.iterator()),
-        "VoidCoder");
+  @Test(timeout=5000, expected = NoSuchElementException.class)
+  public void testEmpty() throws Exception {
+    final List<String> empty = new ArrayList<>(0); // empty data
+    final InputReader inputReader = generateInputReader(generateCompletableFuture(empty.iterator()));
 
     // Fetcher
     final ParentTaskDataFetcher fetcher = createFetcher(inputReader);
 
-    // Should return Void.TYPE
-    assertEquals(Void.TYPE, fetcher.fetchDataElement());
+    // Should trigger the expected 'NoSuchElementException'
+    fetcher.fetchDataElement();
   }
 
   @Test(timeout=5000)
-  public void testEmpty() throws Exception {
-    // TODO #173: Properly handle zero-element. This test should be updated too.
-    final List<String> dataElements = new ArrayList<>(0); // empty data
-    final InputReader inputReader = generateInputReaderWithCoder(generateCompletableFuture(dataElements.iterator()),
-        "IntCoder");
+  public void testNull() throws Exception {
+    final List<String> oneNull = new ArrayList<>(1); // empty data
+    oneNull.add(null);
+    final InputReader inputReader = generateInputReader(generateCompletableFuture(oneNull.iterator()));
 
     // Fetcher
     final ParentTaskDataFetcher fetcher = createFetcher(inputReader);
 
-    // Should return Void.TYPE
+    // Should return 'null'
     assertEquals(null, fetcher.fetchDataElement());
   }
 
@@ -86,7 +78,6 @@ public final class ParentTaskDataFetcherTest {
 
     // Should return only a single element
     assertEquals(singleData, fetcher.fetchDataElement());
-    assertEquals(null, fetcher.fetchDataElement());
   }
 
   @Test(timeout=5000, expected = IOException.class)
@@ -131,28 +122,6 @@ public final class ParentTaskDataFetcherTest {
         false);
   }
 
-
-  private DecoderFactory generateCoder(final String coder) {
-    final DecoderFactory decoderFactory = mock(DecoderFactory.class);
-    when(decoderFactory.toString()).thenReturn(coder);
-    return decoderFactory;
-  }
-
-  private RuntimeEdge generateEdge(final String coder) {
-    final String runtimeIREdgeId = "Runtime edge with coder";
-    final ExecutionPropertyMap edgeProperties = new ExecutionPropertyMap(runtimeIREdgeId);
-    edgeProperties.put(DecoderProperty.of(generateCoder(coder)));
-    return new RuntimeEdge<>(runtimeIREdgeId, edgeProperties, mock(IRVertex.class), mock(IRVertex.class), false);
-  }
-
-  private InputReader generateInputReaderWithCoder(final CompletableFuture completableFuture, final String coder) {
-    final InputReader inputReader = mock(InputReader.class);
-    when(inputReader.read()).thenReturn(Arrays.asList(completableFuture));
-    final RuntimeEdge runtimeEdge = generateEdge(coder);
-    when(inputReader.getRuntimeEdge()).thenReturn(runtimeEdge);
-    return inputReader;
-  }
-
   private InputReader generateInputReader(final CompletableFuture completableFuture) {
     final InputReader inputReader = mock(InputReader.class);
     when(inputReader.read()).thenReturn(Arrays.asList(completableFuture));