You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nemo.apache.org by jo...@apache.org on 2018/11/06 04:27:38 UTC

[incubator-nemo] branch master updated: [NEMO-266] Throws NoSuchElementException in Readeable.readCurrent #149

This is an automated email from the ASF dual-hosted git repository.

johnyangk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git


The following commit(s) were added to refs/heads/master by this push:
     new 3626ae8  [NEMO-266] Throws NoSuchElementException in Readeable.readCurrent #149
3626ae8 is described below

commit 3626ae81d2ec68626505e1d94c7e0bcfc4e6d1aa
Author: Taegeon Um <ta...@gmail.com>
AuthorDate: Tue Nov 6 13:27:34 2018 +0900

    [NEMO-266] Throws NoSuchElementException in Readeable.readCurrent #149
    
    JIRA: [NEMO-266: Throws NoSuchElementException in Readeable.readCurrent](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-266)
    
    **Major changes:**
    - remove `advance` method in `Readable` and throws `NoSuchElementException` in `BeamUnboundedSourceVertex.readCurrent()`.
---
 .../nemo/common/ir/BoundedIteratorReadable.java    |  5 --
 .../java/org/apache/nemo/common/ir/Readable.java   |  5 --
 .../nemo/common/ir/vertex/CachedSourceVertex.java  |  6 ---
 .../apache/nemo/common/test/EmptyComponents.java   |  4 --
 .../beam/source/BeamBoundedSourceVertex.java       | 25 ++++------
 .../beam/source/BeamUnboundedSourceVertex.java     | 55 ++++++++--------------
 .../executor/task/SourceVertexDataFetcher.java     |  1 -
 .../runtime/executor/task/TaskExecutorTest.java    |  7 +--
 8 files changed, 29 insertions(+), 79 deletions(-)

diff --git a/common/src/main/java/org/apache/nemo/common/ir/BoundedIteratorReadable.java b/common/src/main/java/org/apache/nemo/common/ir/BoundedIteratorReadable.java
index 4d15755..586fc47 100644
--- a/common/src/main/java/org/apache/nemo/common/ir/BoundedIteratorReadable.java
+++ b/common/src/main/java/org/apache/nemo/common/ir/BoundedIteratorReadable.java
@@ -48,11 +48,6 @@ public abstract class BoundedIteratorReadable<O> implements Readable<O> {
   }
 
   @Override
-  public final void advance() {
-    // do nothing
-  }
-
-  @Override
   public final boolean isFinished() {
     return !iterator.hasNext();
   }
diff --git a/common/src/main/java/org/apache/nemo/common/ir/Readable.java b/common/src/main/java/org/apache/nemo/common/ir/Readable.java
index e0f7b7c..785d662 100644
--- a/common/src/main/java/org/apache/nemo/common/ir/Readable.java
+++ b/common/src/main/java/org/apache/nemo/common/ir/Readable.java
@@ -45,11 +45,6 @@ public interface Readable<O> extends Serializable {
   O readCurrent() throws NoSuchElementException;
 
   /**
-   * Advance current data point.
-   */
-  void advance() throws IOException;
-
-  /**
    * Read watermark.
    * @return watermark
    */
diff --git a/common/src/main/java/org/apache/nemo/common/ir/vertex/CachedSourceVertex.java b/common/src/main/java/org/apache/nemo/common/ir/vertex/CachedSourceVertex.java
index 6e909cf..ecdd34b 100644
--- a/common/src/main/java/org/apache/nemo/common/ir/vertex/CachedSourceVertex.java
+++ b/common/src/main/java/org/apache/nemo/common/ir/vertex/CachedSourceVertex.java
@@ -101,12 +101,6 @@ public final class CachedSourceVertex<T> extends SourceVertex<T> {
     }
 
     @Override
-    public void advance() throws IOException {
-      throw new UnsupportedOperationException(
-        "CachedSourceVertex should not be used");
-    }
-
-    @Override
     public long readWatermark() {
       throw new UnsupportedOperationException(
         "CachedSourceVertex should not be used");
diff --git a/common/src/main/java/org/apache/nemo/common/test/EmptyComponents.java b/common/src/main/java/org/apache/nemo/common/test/EmptyComponents.java
index 20bc145..ee1ea48 100644
--- a/common/src/main/java/org/apache/nemo/common/test/EmptyComponents.java
+++ b/common/src/main/java/org/apache/nemo/common/test/EmptyComponents.java
@@ -247,10 +247,6 @@ public final class EmptyComponents {
     }
 
     @Override
-    public void advance() {
-    }
-
-    @Override
     public long readWatermark() {
       return 0;
     }
diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/source/BeamBoundedSourceVertex.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/source/BeamBoundedSourceVertex.java
index 880a3a0..bc672b7 100644
--- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/source/BeamBoundedSourceVertex.java
+++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/source/BeamBoundedSourceVertex.java
@@ -27,7 +27,6 @@ import java.lang.reflect.Field;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
-import java.util.function.Function;
 
 import org.apache.nemo.common.ir.vertex.SourceVertex;
 import org.apache.beam.sdk.io.BoundedSource;
@@ -107,7 +106,6 @@ public final class BeamBoundedSourceVertex<O> extends SourceVertex<WindowedValue
     private final BoundedSource<T> boundedSource;
     private boolean finished = false;
     private BoundedSource.BoundedReader<T> reader;
-    private Function<T, WindowedValue<T>> windowedValueConverter;
 
     /**
      * Constructor of the BoundedSourceReadable.
@@ -122,16 +120,6 @@ public final class BeamBoundedSourceVertex<O> extends SourceVertex<WindowedValue
       try {
         reader = boundedSource.createReader(null);
         finished = !reader.start();
-
-        if (!finished) {
-          T elem = reader.getCurrent();
-
-          if (elem instanceof WindowedValue) {
-            windowedValueConverter = val -> (WindowedValue) val;
-          } else {
-            windowedValueConverter = WindowedValue::valueInGlobalWindow;
-          }
-        }
       } catch (final Exception e) {
         throw new RuntimeException(e);
       }
@@ -144,12 +132,15 @@ public final class BeamBoundedSourceVertex<O> extends SourceVertex<WindowedValue
       }
 
       final T elem = reader.getCurrent();
-      return windowedValueConverter.apply(elem);
-    }
 
-    @Override
-    public void advance() throws IOException {
-      finished = !reader.advance();
+      try {
+        finished = !reader.advance();
+      } catch (final IOException e) {
+        e.printStackTrace();
+        throw new RuntimeException(e);
+      }
+
+      return WindowedValue.valueInGlobalWindow(elem);
     }
 
     @Override
diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/source/BeamUnboundedSourceVertex.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/source/BeamUnboundedSourceVertex.java
index 97adc5b..482dd9d 100644
--- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/source/BeamUnboundedSourceVertex.java
+++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/source/BeamUnboundedSourceVertex.java
@@ -31,7 +31,6 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.NoSuchElementException;
-import java.util.function.Function;
 
 /**
  * SourceVertex implementation for UnboundedSource.
@@ -100,8 +99,8 @@ public final class BeamUnboundedSourceVertex<O, M extends UnboundedSource.Checkp
       implements Readable<Object> {
     private final UnboundedSource<O, M> unboundedSource;
     private UnboundedSource.UnboundedReader<O> reader;
-    private Function<O, WindowedValue<O>> windowedValueConverter;
-    private boolean finished = false;
+    private boolean isStarted = false;
+    private boolean isCurrentAvailable = false;
 
     UnboundedSourceReadable(final UnboundedSource<O, M> unboundedSource) {
       this.unboundedSource = unboundedSource;
@@ -111,45 +110,30 @@ public final class BeamUnboundedSourceVertex<O, M extends UnboundedSource.Checkp
     public void prepare() {
       try {
         reader = unboundedSource.createReader(null, null);
-        reader.start();
       } catch (final Exception e) {
         throw new RuntimeException(e);
       }
-
-      // get first element
-      final O firstElement = retrieveFirstElement();
-      if (firstElement instanceof WindowedValue) {
-        windowedValueConverter = val -> (WindowedValue) val;
-      } else {
-        windowedValueConverter = WindowedValue::valueInGlobalWindow;
-      }
-    }
-
-    private O retrieveFirstElement() {
-      while (true) {
-        try {
-          return reader.getCurrent();
-        } catch (final NoSuchElementException e) {
-          // the first element is not currently available... retry
-          try {
-            Thread.sleep(100);
-          } catch (InterruptedException e1) {
-            e1.printStackTrace();
-            throw new RuntimeException(e);
-          }
-        }
-      }
     }
 
     @Override
     public Object readCurrent() {
-      final O elem = reader.getCurrent();
-      return windowedValueConverter.apply(elem);
-    }
+      try {
+        if (!isStarted) {
+          isStarted = true;
+          isCurrentAvailable = reader.start();
+        } else {
+          isCurrentAvailable = reader.advance();
+        }
+      } catch (final Exception e) {
+        throw new RuntimeException(e);
+      }
 
-    @Override
-    public void advance() throws IOException {
-      reader.advance();
+      if (isCurrentAvailable) {
+        final O elem = reader.getCurrent();
+        return WindowedValue.timestampedValueInGlobalWindow(elem, reader.getCurrentTimestamp());
+      } else {
+        throw new NoSuchElementException();
+      }
     }
 
     @Override
@@ -159,7 +143,7 @@ public final class BeamUnboundedSourceVertex<O, M extends UnboundedSource.Checkp
 
     @Override
     public boolean isFinished() {
-      return finished;
+      return false;
     }
 
     @Override
@@ -169,7 +153,6 @@ public final class BeamUnboundedSourceVertex<O, M extends UnboundedSource.Checkp
 
     @Override
     public void close() throws IOException {
-      finished = true;
       reader.close();
     }
   }
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/SourceVertexDataFetcher.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/SourceVertexDataFetcher.java
index 9ea8fa8..fa4bd8a 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/SourceVertexDataFetcher.java
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/SourceVertexDataFetcher.java
@@ -105,7 +105,6 @@ class SourceVertexDataFetcher extends DataFetcher {
 
     // Data
     final Object element = readable.readCurrent();
-    readable.advance();
     return element;
   }
 }
diff --git a/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/task/TaskExecutorTest.java b/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/task/TaskExecutorTest.java
index e05bbfb..6ae716a 100644
--- a/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/task/TaskExecutorTest.java
+++ b/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/task/TaskExecutorTest.java
@@ -694,12 +694,9 @@ public final class TaskExecutorTest {
       if (pointer == middle && numEmittedWatermarks < expectedNumWatermarks) {
         throw new NoSuchElementException();
       }
-      return elements.get(pointer);
-    }
-
-    @Override
-    public void advance() throws IOException {
+      final Object element = elements.get(pointer);
       pointer += 1;
+      return element;
     }
 
     @Override