You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nemo.apache.org by jo...@apache.org on 2018/11/06 04:27:38 UTC
[incubator-nemo] branch master updated: [NEMO-266] Throws
NoSuchElementException in Readeable.readCurrent #149
This is an automated email from the ASF dual-hosted git repository.
johnyangk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git
The following commit(s) were added to refs/heads/master by this push:
new 3626ae8 [NEMO-266] Throws NoSuchElementException in Readeable.readCurrent #149
3626ae8 is described below
commit 3626ae81d2ec68626505e1d94c7e0bcfc4e6d1aa
Author: Taegeon Um <ta...@gmail.com>
AuthorDate: Tue Nov 6 13:27:34 2018 +0900
[NEMO-266] Throws NoSuchElementException in Readeable.readCurrent #149
JIRA: [NEMO-266: Throws NoSuchElementException in Readeable.readCurrent](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-266)
**Major changes:**
- remove `advance` method in `Readable` and throws `NoSuchElementException` in `BeamUnboundedSourceVertex.readCurrent()`.
---
.../nemo/common/ir/BoundedIteratorReadable.java | 5 --
.../java/org/apache/nemo/common/ir/Readable.java | 5 --
.../nemo/common/ir/vertex/CachedSourceVertex.java | 6 ---
.../apache/nemo/common/test/EmptyComponents.java | 4 --
.../beam/source/BeamBoundedSourceVertex.java | 25 ++++------
.../beam/source/BeamUnboundedSourceVertex.java | 55 ++++++++--------------
.../executor/task/SourceVertexDataFetcher.java | 1 -
.../runtime/executor/task/TaskExecutorTest.java | 7 +--
8 files changed, 29 insertions(+), 79 deletions(-)
diff --git a/common/src/main/java/org/apache/nemo/common/ir/BoundedIteratorReadable.java b/common/src/main/java/org/apache/nemo/common/ir/BoundedIteratorReadable.java
index 4d15755..586fc47 100644
--- a/common/src/main/java/org/apache/nemo/common/ir/BoundedIteratorReadable.java
+++ b/common/src/main/java/org/apache/nemo/common/ir/BoundedIteratorReadable.java
@@ -48,11 +48,6 @@ public abstract class BoundedIteratorReadable<O> implements Readable<O> {
}
@Override
- public final void advance() {
- // do nothing
- }
-
- @Override
public final boolean isFinished() {
return !iterator.hasNext();
}
diff --git a/common/src/main/java/org/apache/nemo/common/ir/Readable.java b/common/src/main/java/org/apache/nemo/common/ir/Readable.java
index e0f7b7c..785d662 100644
--- a/common/src/main/java/org/apache/nemo/common/ir/Readable.java
+++ b/common/src/main/java/org/apache/nemo/common/ir/Readable.java
@@ -45,11 +45,6 @@ public interface Readable<O> extends Serializable {
O readCurrent() throws NoSuchElementException;
/**
- * Advance current data point.
- */
- void advance() throws IOException;
-
- /**
* Read watermark.
* @return watermark
*/
diff --git a/common/src/main/java/org/apache/nemo/common/ir/vertex/CachedSourceVertex.java b/common/src/main/java/org/apache/nemo/common/ir/vertex/CachedSourceVertex.java
index 6e909cf..ecdd34b 100644
--- a/common/src/main/java/org/apache/nemo/common/ir/vertex/CachedSourceVertex.java
+++ b/common/src/main/java/org/apache/nemo/common/ir/vertex/CachedSourceVertex.java
@@ -101,12 +101,6 @@ public final class CachedSourceVertex<T> extends SourceVertex<T> {
}
@Override
- public void advance() throws IOException {
- throw new UnsupportedOperationException(
- "CachedSourceVertex should not be used");
- }
-
- @Override
public long readWatermark() {
throw new UnsupportedOperationException(
"CachedSourceVertex should not be used");
diff --git a/common/src/main/java/org/apache/nemo/common/test/EmptyComponents.java b/common/src/main/java/org/apache/nemo/common/test/EmptyComponents.java
index 20bc145..ee1ea48 100644
--- a/common/src/main/java/org/apache/nemo/common/test/EmptyComponents.java
+++ b/common/src/main/java/org/apache/nemo/common/test/EmptyComponents.java
@@ -247,10 +247,6 @@ public final class EmptyComponents {
}
@Override
- public void advance() {
- }
-
- @Override
public long readWatermark() {
return 0;
}
diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/source/BeamBoundedSourceVertex.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/source/BeamBoundedSourceVertex.java
index 880a3a0..bc672b7 100644
--- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/source/BeamBoundedSourceVertex.java
+++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/source/BeamBoundedSourceVertex.java
@@ -27,7 +27,6 @@ import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
-import java.util.function.Function;
import org.apache.nemo.common.ir.vertex.SourceVertex;
import org.apache.beam.sdk.io.BoundedSource;
@@ -107,7 +106,6 @@ public final class BeamBoundedSourceVertex<O> extends SourceVertex<WindowedValue
private final BoundedSource<T> boundedSource;
private boolean finished = false;
private BoundedSource.BoundedReader<T> reader;
- private Function<T, WindowedValue<T>> windowedValueConverter;
/**
* Constructor of the BoundedSourceReadable.
@@ -122,16 +120,6 @@ public final class BeamBoundedSourceVertex<O> extends SourceVertex<WindowedValue
try {
reader = boundedSource.createReader(null);
finished = !reader.start();
-
- if (!finished) {
- T elem = reader.getCurrent();
-
- if (elem instanceof WindowedValue) {
- windowedValueConverter = val -> (WindowedValue) val;
- } else {
- windowedValueConverter = WindowedValue::valueInGlobalWindow;
- }
- }
} catch (final Exception e) {
throw new RuntimeException(e);
}
@@ -144,12 +132,15 @@ public final class BeamBoundedSourceVertex<O> extends SourceVertex<WindowedValue
}
final T elem = reader.getCurrent();
- return windowedValueConverter.apply(elem);
- }
- @Override
- public void advance() throws IOException {
- finished = !reader.advance();
+ try {
+ finished = !reader.advance();
+ } catch (final IOException e) {
+ e.printStackTrace();
+ throw new RuntimeException(e);
+ }
+
+ return WindowedValue.valueInGlobalWindow(elem);
}
@Override
diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/source/BeamUnboundedSourceVertex.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/source/BeamUnboundedSourceVertex.java
index 97adc5b..482dd9d 100644
--- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/source/BeamUnboundedSourceVertex.java
+++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/source/BeamUnboundedSourceVertex.java
@@ -31,7 +31,6 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.NoSuchElementException;
-import java.util.function.Function;
/**
* SourceVertex implementation for UnboundedSource.
@@ -100,8 +99,8 @@ public final class BeamUnboundedSourceVertex<O, M extends UnboundedSource.Checkp
implements Readable<Object> {
private final UnboundedSource<O, M> unboundedSource;
private UnboundedSource.UnboundedReader<O> reader;
- private Function<O, WindowedValue<O>> windowedValueConverter;
- private boolean finished = false;
+ private boolean isStarted = false;
+ private boolean isCurrentAvailable = false;
UnboundedSourceReadable(final UnboundedSource<O, M> unboundedSource) {
this.unboundedSource = unboundedSource;
@@ -111,45 +110,30 @@ public final class BeamUnboundedSourceVertex<O, M extends UnboundedSource.Checkp
public void prepare() {
try {
reader = unboundedSource.createReader(null, null);
- reader.start();
} catch (final Exception e) {
throw new RuntimeException(e);
}
-
- // get first element
- final O firstElement = retrieveFirstElement();
- if (firstElement instanceof WindowedValue) {
- windowedValueConverter = val -> (WindowedValue) val;
- } else {
- windowedValueConverter = WindowedValue::valueInGlobalWindow;
- }
- }
-
- private O retrieveFirstElement() {
- while (true) {
- try {
- return reader.getCurrent();
- } catch (final NoSuchElementException e) {
- // the first element is not currently available... retry
- try {
- Thread.sleep(100);
- } catch (InterruptedException e1) {
- e1.printStackTrace();
- throw new RuntimeException(e);
- }
- }
- }
}
@Override
public Object readCurrent() {
- final O elem = reader.getCurrent();
- return windowedValueConverter.apply(elem);
- }
+ try {
+ if (!isStarted) {
+ isStarted = true;
+ isCurrentAvailable = reader.start();
+ } else {
+ isCurrentAvailable = reader.advance();
+ }
+ } catch (final Exception e) {
+ throw new RuntimeException(e);
+ }
- @Override
- public void advance() throws IOException {
- reader.advance();
+ if (isCurrentAvailable) {
+ final O elem = reader.getCurrent();
+ return WindowedValue.timestampedValueInGlobalWindow(elem, reader.getCurrentTimestamp());
+ } else {
+ throw new NoSuchElementException();
+ }
}
@Override
@@ -159,7 +143,7 @@ public final class BeamUnboundedSourceVertex<O, M extends UnboundedSource.Checkp
@Override
public boolean isFinished() {
- return finished;
+ return false;
}
@Override
@@ -169,7 +153,6 @@ public final class BeamUnboundedSourceVertex<O, M extends UnboundedSource.Checkp
@Override
public void close() throws IOException {
- finished = true;
reader.close();
}
}
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/SourceVertexDataFetcher.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/SourceVertexDataFetcher.java
index 9ea8fa8..fa4bd8a 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/SourceVertexDataFetcher.java
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/SourceVertexDataFetcher.java
@@ -105,7 +105,6 @@ class SourceVertexDataFetcher extends DataFetcher {
// Data
final Object element = readable.readCurrent();
- readable.advance();
return element;
}
}
diff --git a/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/task/TaskExecutorTest.java b/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/task/TaskExecutorTest.java
index e05bbfb..6ae716a 100644
--- a/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/task/TaskExecutorTest.java
+++ b/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/task/TaskExecutorTest.java
@@ -694,12 +694,9 @@ public final class TaskExecutorTest {
if (pointer == middle && numEmittedWatermarks < expectedNumWatermarks) {
throw new NoSuchElementException();
}
- return elements.get(pointer);
- }
-
- @Override
- public void advance() throws IOException {
+ final Object element = elements.get(pointer);
pointer += 1;
+ return element;
}
@Override