You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nemo.apache.org by ja...@apache.org on 2018/08/09 05:23:52 UTC
[incubator-nemo] branch master updated: [NEMO-160] Handle Beam
VoidCoder properly (#93)
This is an automated email from the ASF dual-hosted git repository.
jangho pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git
The following commit(s) were added to refs/heads/master by this push:
new 0ec68b6 [NEMO-160] Handle Beam VoidCoder properly (#93)
0ec68b6 is described below
commit 0ec68b6f0be0f20969eda0f0378cfdde8bcfbb4c
Author: John Yang <jo...@gmail.com>
AuthorDate: Thu Aug 9 14:23:50 2018 +0900
[NEMO-160] Handle Beam VoidCoder properly (#93)
JIRA: [NEMO-160: Handle Beam VoidCoder properly](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-160)
**Major changes:**
- Proper support for (1) empty, or (2) 'null'-value input in DataFetchers
- Allow 'null' values emitted by user code (e.g., Create.of((Void) null) by Beam WriteFiles) to be passed around in the data plane
- Remove specific assumptions about Beam VoidCoder in the data plane (e.g., Void.TYPE), and use the coder transparently just like we use any other coders
- DataFetcher throws a NoSuchElementException instead of returning 'null', when indicating that all data has been fetched
**Minor changes to note:**
- Cleans up tests
**Tests for the changes:**
- ParentTaskDataFetcherTest#testEmpty, testNull
**Other comments:**
- N/A
resolves [NEMO-160](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-160)
---
.../executor/datatransfer/OutputCollectorImpl.java | 81 ++++--------
.../nemo/runtime/executor/task/DataFetcher.java | 6 +-
.../executor/task/ParentTaskDataFetcher.java | 143 +++++++++------------
.../executor/task/SourceVertexDataFetcher.java | 15 ++-
.../nemo/runtime/executor/task/TaskExecutor.java | 60 ++++-----
.../executor/task/ParentTaskDataFetcherTest.java | 53 ++------
6 files changed, 139 insertions(+), 219 deletions(-)
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/OutputCollectorImpl.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/OutputCollectorImpl.java
index 32b9352..e6433fd 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/OutputCollectorImpl.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/OutputCollectorImpl.java
@@ -17,11 +17,7 @@ package edu.snu.nemo.runtime.executor.datatransfer;
import edu.snu.nemo.common.ir.OutputCollector;
-import java.util.ArrayDeque;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.List;
-import java.util.Queue;
+import java.util.*;
/**
* OutputCollector implementation.
@@ -29,92 +25,61 @@ import java.util.Queue;
* @param <O> output type.
*/
public final class OutputCollectorImpl<O> implements OutputCollector<O> {
- private final Queue<O> mainTagOutputQueue;
- private final Map<String, Queue<Object>> additionalTagOutputQueues;
-
- /**
- * Constructor of a new OutputCollectorImpl.
- */
- public OutputCollectorImpl() {
- this.mainTagOutputQueue = new ArrayDeque<>(1);
- this.additionalTagOutputQueues = new HashMap<>();
- }
+ // Use ArrayList (not Queue) to allow 'null' values
+ private final ArrayList<O> mainTagElements;
+ private final Map<String, ArrayList<Object>> additionalTagElementsMap;
/**
* Constructor of a new OutputCollectorImpl with tagged outputs.
* @param taggedChildren tagged children
*/
public OutputCollectorImpl(final List<String> taggedChildren) {
- this.mainTagOutputQueue = new ArrayDeque<>(1);
- this.additionalTagOutputQueues = new HashMap<>();
- taggedChildren.forEach(child -> this.additionalTagOutputQueues.put(child, new ArrayDeque<>(1)));
+ this.mainTagElements = new ArrayList<>(1);
+ this.additionalTagElementsMap = new HashMap<>();
+ taggedChildren.forEach(child -> this.additionalTagElementsMap.put(child, new ArrayList<>(1)));
}
@Override
public void emit(final O output) {
- mainTagOutputQueue.add(output);
+ mainTagElements.add(output);
}
@Override
public <T> void emit(final String dstVertexId, final T output) {
- if (this.additionalTagOutputQueues.get(dstVertexId) == null) {
+ if (this.additionalTagElementsMap.get(dstVertexId) == null) {
// This dstVertexId is for the main tag
emit((O) output);
} else {
// Note that String#hashCode() can be cached, thus accessing additional output queues can be fast.
- this.additionalTagOutputQueues.get(dstVertexId).add(output);
+ this.additionalTagElementsMap.get(dstVertexId).add(output);
}
}
- /**
- * Inter-Task data is transferred from sender-side Task's OutputCollectorImpl
- * to receiver-side Task.
- *
- * @return the first element of this list
- */
- public O remove() {
- return mainTagOutputQueue.remove();
+ public Iterable<O> iterateMain() {
+ return mainTagElements;
}
- /**
- * Inter-task data is transferred from sender-side Task's OutputCollectorImpl
- * to receiver-side Task.
- *
- * @param tag output tag
- * @return the first element of corresponding list
- */
- public Object remove(final String tag) {
- if (this.additionalTagOutputQueues.get(tag) == null) {
+ public Iterable<Object> iterateTag(final String tag) {
+ if (this.additionalTagElementsMap.get(tag) == null) {
// This dstVertexId is for the main tag
- return remove();
+ return (Iterable<Object>) iterateMain();
} else {
// Note that String#hashCode() can be cached, thus accessing additional output queues can be fast.
- return this.additionalTagOutputQueues.get(tag).remove();
+ return this.additionalTagElementsMap.get(tag);
}
-
}
- /**
- * Check if this OutputCollector is empty.
- *
- * @return true if this OutputCollector is empty.
- */
- public boolean isEmpty() {
- return mainTagOutputQueue.isEmpty();
+ public void clearMain() {
+ mainTagElements.clear();
}
- /**
- * Check if this OutputCollector is empty.
- *
- * @param tag output tag
- * @return true if this OutputCollector is empty.
- */
- public boolean isEmpty(final String tag) {
- if (this.additionalTagOutputQueues.get(tag) == null) {
- return isEmpty();
+ public void clearTag(final String tag) {
+ if (this.additionalTagElementsMap.get(tag) == null) {
+ // This dstVertexId is for the main tag
+ clearMain();
} else {
// Note that String#hashCode() can be cached, thus accessing additional output queues can be fast.
- return this.additionalTagOutputQueues.get(tag).isEmpty();
+ this.additionalTagElementsMap.get(tag).clear();
}
}
}
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/DataFetcher.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/DataFetcher.java
index c7aeb0e..bb80e1a 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/DataFetcher.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/DataFetcher.java
@@ -40,9 +40,9 @@ abstract class DataFetcher {
/**
* Can block until the next data element becomes available.
- *
- * @return null if there's no more data element.
- * @throws IOException while fetching data
+ * @return data element
+ * @throws IOException upon I/O error
+ * @throws java.util.NoSuchElementException if no more element is available
*/
abstract Object fetchDataElement() throws IOException;
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/ParentTaskDataFetcher.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/ParentTaskDataFetcher.java
index 91c80d0..0193bae 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/ParentTaskDataFetcher.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/ParentTaskDataFetcher.java
@@ -15,8 +15,6 @@
*/
package edu.snu.nemo.runtime.executor.task;
-import edu.snu.nemo.common.coder.DecoderFactory;
-import edu.snu.nemo.common.ir.edge.executionproperty.DecoderProperty;
import edu.snu.nemo.common.ir.vertex.IRVertex;
import edu.snu.nemo.runtime.executor.data.DataUtil;
import edu.snu.nemo.runtime.executor.datatransfer.InputReader;
@@ -26,7 +24,7 @@ import org.slf4j.LoggerFactory;
import javax.annotation.concurrent.NotThreadSafe;
import java.io.IOException;
import java.util.List;
-import java.util.Optional;
+import java.util.NoSuchElementException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.LinkedBlockingQueue;
@@ -41,11 +39,10 @@ class ParentTaskDataFetcher extends DataFetcher {
private final LinkedBlockingQueue iteratorQueue;
// Non-finals (lazy fetching)
- private boolean hasFetchStarted;
+ private boolean firstFetch;
private int expectedNumOfIterators;
private DataUtil.IteratorWithNumBytes currentIterator;
private int currentIteratorIndex;
- private boolean noElementAtAll = true;
private long serBytes = 0;
private long encodedBytes = 0;
@@ -55,85 +52,35 @@ class ParentTaskDataFetcher extends DataFetcher {
final boolean isToSideInput) {
super(dataSource, child, readerForParentTask.isSideInputReader(), isToSideInput);
this.readersForParentTask = readerForParentTask;
- this.hasFetchStarted = false;
+ this.firstFetch = true;
this.currentIteratorIndex = 0;
this.iteratorQueue = new LinkedBlockingQueue<>();
}
- private void countBytes(final DataUtil.IteratorWithNumBytes iterator) {
- try {
- serBytes += iterator.getNumSerializedBytes();
- } catch (final DataUtil.IteratorWithNumBytes.NumBytesNotSupportedException e) {
- serBytes = -1;
- } catch (final IllegalStateException e) {
- LOG.error("Failed to get the number of bytes of serialized data - the data is not ready yet ", e);
- }
- try {
- encodedBytes += iterator.getNumEncodedBytes();
- } catch (final DataUtil.IteratorWithNumBytes.NumBytesNotSupportedException e) {
- encodedBytes = -1;
- } catch (final IllegalStateException e) {
- LOG.error("Failed to get the number of bytes of encoded data - the data is not ready yet ", e);
- }
- }
-
- /**
- * Blocking call.
- */
- private void fetchInBackground() {
- final List<CompletableFuture<DataUtil.IteratorWithNumBytes>> futures = readersForParentTask.read();
- this.expectedNumOfIterators = futures.size();
-
- futures.forEach(compFuture -> compFuture.whenComplete((iterator, exception) -> {
- try {
- if (exception != null) {
- iteratorQueue.put(exception); // can block here
- } else {
- iteratorQueue.put(iterator); // can block here
- }
- } catch (final InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new RuntimeException(e); // This shouldn't happen
- }
- }));
- }
-
@Override
Object fetchDataElement() throws IOException {
try {
- if (!hasFetchStarted) {
- fetchInBackground();
+ if (firstFetch) {
+ fetchDataLazily();
advanceIterator();
+ firstFetch = false;
}
- if (this.currentIterator.hasNext()) {
- // This iterator has an element available
- noElementAtAll = false;
- return this.currentIterator.next();
- } else {
- if (currentIteratorIndex == expectedNumOfIterators) {
- // Entire fetcher is done
- if (noElementAtAll) {
- final Optional<DecoderFactory> decoderFactory =
- readersForParentTask.getRuntimeEdge().getPropertyValue(DecoderProperty.class);
-
- // TODO #173: Properly handle zero-element task outputs. Currently fetchDataElement relies on
- // toString() method to distinguish whether to return Void.TYPE or not.
- if (decoderFactory.get().toString().equals("VoidCoder")) {
- noElementAtAll = false;
- return Void.TYPE;
- } else {
- return null;
- }
- } else {
- // This whole fetcher's done
- return null;
- }
- } else {
- // Advance to the next one
+ while (true) {
+ // This iterator has the element
+ if (this.currentIterator.hasNext()) {
+ return this.currentIterator.next();
+ }
+
+ // This iterator does not have the element
+ if (currentIteratorIndex < expectedNumOfIterators) {
+ // Next iterator has the element
countBytes(currentIterator);
advanceIterator();
- return fetchDataElement();
+ continue;
+ } else {
+ // We've consumed all the iterators
+ break;
}
}
} catch (final Throwable e) {
@@ -144,33 +91,71 @@ class ParentTaskDataFetcher extends DataFetcher {
// "throw Exception" that the TaskExecutor thread can catch and handle.
throw new IOException(e);
}
+
+ // We throw the exception here, outside of the above try-catch region
+ throw new NoSuchElementException();
}
- private void advanceIterator() throws Throwable {
+ private void advanceIterator() throws IOException {
// Take from iteratorQueue
final Object iteratorOrThrowable;
try {
- iteratorOrThrowable = iteratorQueue.take();
+ iteratorOrThrowable = iteratorQueue.take(); // blocking call
} catch (InterruptedException e) {
- throw e;
+ Thread.currentThread().interrupt();
+ throw new IOException(e);
}
// Handle iteratorOrThrowable
if (iteratorOrThrowable instanceof Throwable) {
- throw (Throwable) iteratorOrThrowable;
+ throw new IOException((Throwable) iteratorOrThrowable);
} else {
// This iterator is valid. Do advance.
- hasFetchStarted = true;
this.currentIterator = (DataUtil.IteratorWithNumBytes) iteratorOrThrowable;
this.currentIteratorIndex++;
}
}
- public final long getSerializedBytes() {
+ private void fetchDataLazily() throws IOException {
+ final List<CompletableFuture<DataUtil.IteratorWithNumBytes>> futures = readersForParentTask.read();
+ this.expectedNumOfIterators = futures.size();
+
+ futures.forEach(compFuture -> compFuture.whenComplete((iterator, exception) -> {
+ try {
+ if (exception != null) {
+ iteratorQueue.put(exception);
+ } else {
+ iteratorQueue.put(iterator);
+ }
+ } catch (final InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(e); // this should not happen
+ }
+ }));
+ }
+
+ final long getSerializedBytes() {
return serBytes;
}
- public final long getEncodedBytes() {
+ final long getEncodedBytes() {
return encodedBytes;
}
+
+ private void countBytes(final DataUtil.IteratorWithNumBytes iterator) {
+ try {
+ serBytes += iterator.getNumSerializedBytes();
+ } catch (final DataUtil.IteratorWithNumBytes.NumBytesNotSupportedException e) {
+ serBytes = -1;
+ } catch (final IllegalStateException e) {
+ LOG.error("Failed to get the number of bytes of serialized data - the data is not ready yet ", e);
+ }
+ try {
+ encodedBytes += iterator.getNumEncodedBytes();
+ } catch (final DataUtil.IteratorWithNumBytes.NumBytesNotSupportedException e) {
+ encodedBytes = -1;
+ } catch (final IllegalStateException e) {
+ LOG.error("Failed to get the number of bytes of encoded data - the data is not ready yet ", e);
+ }
+ }
}
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/SourceVertexDataFetcher.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/SourceVertexDataFetcher.java
index 817139b..425cb46 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/SourceVertexDataFetcher.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/SourceVertexDataFetcher.java
@@ -20,6 +20,7 @@ import edu.snu.nemo.common.ir.vertex.IRVertex;
import java.io.IOException;
import java.util.Iterator;
+import java.util.NoSuchElementException;
/**
* Fetches data from a data source.
@@ -42,19 +43,23 @@ class SourceVertexDataFetcher extends DataFetcher {
@Override
Object fetchDataElement() throws IOException {
if (iterator == null) {
- final long start = System.currentTimeMillis();
- iterator = this.readable.read().iterator();
- boundedSourceReadTime += System.currentTimeMillis() - start;
+ fetchDataLazily();
}
if (iterator.hasNext()) {
return iterator.next();
} else {
- return null;
+ throw new NoSuchElementException();
}
}
- public final long getBoundedSourceReadTime() {
+ private void fetchDataLazily() throws IOException {
+ final long start = System.currentTimeMillis();
+ iterator = this.readable.read().iterator();
+ boundedSourceReadTime += System.currentTimeMillis() - start;
+ }
+
+ final long getBoundedSourceReadTime() {
return boundedSourceReadTime;
}
}
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/TaskExecutor.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/TaskExecutor.java
index 809d747..807e5bb 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/TaskExecutor.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/TaskExecutor.java
@@ -220,19 +220,15 @@ public final class TaskExecutor {
}
// Given a single input element, a vertex can produce many output elements.
- // Here, we recursively process all of the main output elements.
- while (!outputCollector.isEmpty()) {
- final Object element = outputCollector.remove();
- handleMainOutputElement(vertexHarness, element); // Recursion
- }
+ // Here, we recursively process all of the main oltput elements.
+ outputCollector.iterateMain().forEach(element -> handleMainOutputElement(vertexHarness, element)); // Recursion
+ outputCollector.clearMain();
// Recursively process all of the additional output elements.
- vertexHarness.getContext().getAdditionalTagOutputs().values().forEach(value -> {
- final String dstVertexId = (String) value;
- while (!outputCollector.isEmpty(dstVertexId)) {
- final Object element = outputCollector.remove(dstVertexId);
- handleAdditionalOutputElement(vertexHarness, element, dstVertexId); // Recursion
- }
+ vertexHarness.getContext().getAdditionalTagOutputs().values().forEach(tag -> {
+ outputCollector.iterateTag(tag).forEach(
+ element -> handleAdditionalOutputElement(vertexHarness, element, tag)); // Recursion
+ outputCollector.clearTag(tag);
});
}
@@ -326,17 +322,14 @@ public final class TaskExecutor {
final OutputCollectorImpl outputCollector = vertexHarness.getOutputCollector();
// handle main outputs
- while (!outputCollector.isEmpty()) {
- final Object element = outputCollector.remove();
- handleMainOutputElement(vertexHarness, element);
- }
+ outputCollector.iterateMain().forEach(element -> handleMainOutputElement(vertexHarness, element)); // Recursion
+ outputCollector.clearMain();
// handle additional tagged outputs
vertexHarness.getAdditionalTagOutputChildren().keySet().forEach(tag -> {
- while (!outputCollector.isEmpty(tag)) {
- final Object element = outputCollector.remove(tag);
- handleAdditionalOutputElement(vertexHarness, element, tag);
- }
+ outputCollector.iterateTag(tag).forEach(
+ element -> handleAdditionalOutputElement(vertexHarness, element, tag)); // Recursion
+ outputCollector.clearTag(tag);
});
finalizeOutputWriters(vertexHarness);
}
@@ -383,16 +376,11 @@ public final class TaskExecutor {
for (int i = 0; i < availableFetchers.size(); i++) {
final DataFetcher dataFetcher = availableFetchers.get(i);
final Object element;
+
try {
element = dataFetcher.fetchDataElement();
- } catch (IOException e) {
- taskStateManager.onTaskStateChanged(TaskState.State.SHOULD_RETRY,
- Optional.empty(), Optional.of(TaskState.RecoverableTaskFailureCause.INPUT_READ_FAILURE));
- LOG.error("{} Execution Failed (Recoverable: input read failure)! Exception: {}", taskId, e.toString());
- return false;
- }
-
- if (element == null) {
+ } catch (NoSuchElementException e) {
+ // We've consumed all the data from this data fetcher.
if (dataFetcher instanceof SourceVertexDataFetcher) {
boundedSourceReadTime += ((SourceVertexDataFetcher) dataFetcher).getBoundedSourceReadTime();
} else if (dataFetcher instanceof ParentTaskDataFetcher) {
@@ -401,12 +389,19 @@ public final class TaskExecutor {
}
finishedFetcherIndex = i;
break;
+ } catch (IOException e) {
+ // IOException means that this task should be retried.
+ taskStateManager.onTaskStateChanged(TaskState.State.SHOULD_RETRY,
+ Optional.empty(), Optional.of(TaskState.RecoverableTaskFailureCause.INPUT_READ_FAILURE));
+ LOG.error("{} Execution Failed (Recoverable: input read failure)! Exception: {}", taskId, e.toString());
+ return false;
+ }
+
+ // Successfully fetched an element
+ if (dataFetcher.isFromSideInput()) {
+ sideInputMap.put(((OperatorVertex) dataFetcher.getDataSource()).getTransform().getTag(), element);
} else {
- if (dataFetcher.isFromSideInput()) {
- sideInputMap.put(((OperatorVertex) dataFetcher.getDataSource()).getTransform().getTag(), element);
- } else {
- processElementRecursively(dataFetcher.getChild(), element);
- }
+ processElementRecursively(dataFetcher.getChild(), element);
}
}
@@ -588,6 +583,7 @@ public final class TaskExecutor {
// finalize OutputWriters for additional tagged children
vertexHarness.getWritersToAdditionalChildrenTasks().values().forEach(outputWriter -> {
outputWriter.close();
+
final Optional<Long> writtenBytes = outputWriter.getWrittenBytes();
writtenBytes.ifPresent(writtenBytesList::add);
});
diff --git a/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/task/ParentTaskDataFetcherTest.java b/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/task/ParentTaskDataFetcherTest.java
index 3f1fd3d..47c2d19 100644
--- a/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/task/ParentTaskDataFetcherTest.java
+++ b/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/task/ParentTaskDataFetcherTest.java
@@ -15,11 +15,7 @@
*/
package edu.snu.nemo.runtime.executor.task;
-import edu.snu.nemo.common.coder.DecoderFactory;
-import edu.snu.nemo.common.ir.edge.executionproperty.DecoderProperty;
-import edu.snu.nemo.common.ir.executionproperty.ExecutionPropertyMap;
import edu.snu.nemo.common.ir.vertex.IRVertex;
-import edu.snu.nemo.runtime.common.plan.RuntimeEdge;
import edu.snu.nemo.runtime.executor.data.DataUtil;
import edu.snu.nemo.runtime.executor.datatransfer.InputReader;
import org.junit.Test;
@@ -35,7 +31,6 @@ import java.util.concurrent.Executors;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.mockingDetails;
import static org.mockito.Mockito.when;
/**
@@ -45,31 +40,28 @@ import static org.mockito.Mockito.when;
@PrepareForTest({InputReader.class, VertexHarness.class})
public final class ParentTaskDataFetcherTest {
- @Test(timeout=5000)
- public void testVoid() throws Exception {
- // TODO #173: Properly handle zero-element. This test should be updated too.
- final List<String> dataElements = new ArrayList<>(0); // empty data
- final InputReader inputReader = generateInputReaderWithCoder(generateCompletableFuture(dataElements.iterator()),
- "VoidCoder");
+ @Test(timeout=5000, expected = NoSuchElementException.class)
+ public void testEmpty() throws Exception {
+ final List<String> empty = new ArrayList<>(0); // empty data
+ final InputReader inputReader = generateInputReader(generateCompletableFuture(empty.iterator()));
// Fetcher
final ParentTaskDataFetcher fetcher = createFetcher(inputReader);
- // Should return Void.TYPE
- assertEquals(Void.TYPE, fetcher.fetchDataElement());
+ // Should trigger the expected 'NoSuchElementException'
+ fetcher.fetchDataElement();
}
@Test(timeout=5000)
- public void testEmpty() throws Exception {
- // TODO #173: Properly handle zero-element. This test should be updated too.
- final List<String> dataElements = new ArrayList<>(0); // empty data
- final InputReader inputReader = generateInputReaderWithCoder(generateCompletableFuture(dataElements.iterator()),
- "IntCoder");
+ public void testNull() throws Exception {
+ final List<String> oneNull = new ArrayList<>(1); // empty data
+ oneNull.add(null);
+ final InputReader inputReader = generateInputReader(generateCompletableFuture(oneNull.iterator()));
// Fetcher
final ParentTaskDataFetcher fetcher = createFetcher(inputReader);
- // Should return Void.TYPE
+ // Should return 'null'
assertEquals(null, fetcher.fetchDataElement());
}
@@ -86,7 +78,6 @@ public final class ParentTaskDataFetcherTest {
// Should return only a single element
assertEquals(singleData, fetcher.fetchDataElement());
- assertEquals(null, fetcher.fetchDataElement());
}
@Test(timeout=5000, expected = IOException.class)
@@ -131,28 +122,6 @@ public final class ParentTaskDataFetcherTest {
false);
}
-
- private DecoderFactory generateCoder(final String coder) {
- final DecoderFactory decoderFactory = mock(DecoderFactory.class);
- when(decoderFactory.toString()).thenReturn(coder);
- return decoderFactory;
- }
-
- private RuntimeEdge generateEdge(final String coder) {
- final String runtimeIREdgeId = "Runtime edge with coder";
- final ExecutionPropertyMap edgeProperties = new ExecutionPropertyMap(runtimeIREdgeId);
- edgeProperties.put(DecoderProperty.of(generateCoder(coder)));
- return new RuntimeEdge<>(runtimeIREdgeId, edgeProperties, mock(IRVertex.class), mock(IRVertex.class), false);
- }
-
- private InputReader generateInputReaderWithCoder(final CompletableFuture completableFuture, final String coder) {
- final InputReader inputReader = mock(InputReader.class);
- when(inputReader.read()).thenReturn(Arrays.asList(completableFuture));
- final RuntimeEdge runtimeEdge = generateEdge(coder);
- when(inputReader.getRuntimeEdge()).thenReturn(runtimeEdge);
- return inputReader;
- }
-
private InputReader generateInputReader(final CompletableFuture completableFuture) {
final InputReader inputReader = mock(InputReader.class);
when(inputReader.read()).thenReturn(Arrays.asList(completableFuture));