You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nemo.apache.org by ta...@apache.org on 2018/10/25 06:59:11 UTC

[incubator-nemo] branch master updated: [NEMO-128] Support Beam UnboundedSource (#125)

This is an automated email from the ASF dual-hosted git repository.

taegeonum pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git


The following commit(s) were added to refs/heads/master by this push:
     new 13cb991  [NEMO-128] Support Beam UnboundedSource (#125)
13cb991 is described below

commit 13cb99155e0ae537b6cb65528839beface3fc6fb
Author: Gyewon Lee <st...@gmail.com>
AuthorDate: Thu Oct 25 15:59:06 2018 +0900

    [NEMO-128] Support Beam UnboundedSource (#125)
    
    JIRA: [NEMO-128: Support Beam UnboundedSource](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-128)
    
    **Major changes:**
    - Support Beam UnboundedSources
    
    **Minor changes to note:**
    -
    
    **Tests for the changes:**
    -
    
    **Other comments:**
    -
    
    Closes #
---
 .../compiler/frontend/beam/PipelineTranslator.java |  11 ++
 .../beam/source/BeamUnboundedSourceVertex.java     | 179 +++++++++++++++++++++
 2 files changed, 190 insertions(+)

diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslator.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslator.java
index a4e5d1b..9fab64e 100644
--- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslator.java
+++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslator.java
@@ -34,6 +34,7 @@ import org.apache.nemo.compiler.frontend.beam.PipelineVisitor.*;
 import org.apache.nemo.compiler.frontend.beam.coder.BeamDecoderFactory;
 import org.apache.nemo.compiler.frontend.beam.coder.BeamEncoderFactory;
 import org.apache.nemo.compiler.frontend.beam.source.BeamBoundedSourceVertex;
+import org.apache.nemo.compiler.frontend.beam.source.BeamUnboundedSourceVertex;
 import org.apache.nemo.compiler.frontend.beam.transform.*;
 import org.apache.beam.sdk.coders.*;
 import org.apache.beam.sdk.io.Read;
@@ -116,6 +117,16 @@ public final class PipelineTranslator
     }
   }
 
+  @PrimitiveTransformTranslator(Read.Unbounded.class)
+  private static void unboundedReadTranslator(final TranslationContext ctx,
+                                              final PrimitiveTransformVertex transformVertex,
+                                              final Read.Unbounded<?> transform) {
+    final IRVertex vertex = new BeamUnboundedSourceVertex<>(transform.getSource());
+    ctx.addVertex(vertex);
+    transformVertex.getNode().getInputs().values().forEach(input -> ctx.addEdgeTo(vertex, input));
+    transformVertex.getNode().getOutputs().values().forEach(output -> ctx.registerMainOutputFrom(vertex, output));
+  }
+
   @PrimitiveTransformTranslator(Read.Bounded.class)
   private static void boundedReadTranslator(final TranslationContext ctx,
                                             final PrimitiveTransformVertex transformVertex,
diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/source/BeamUnboundedSourceVertex.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/source/BeamUnboundedSourceVertex.java
new file mode 100644
index 0000000..b799d3b
--- /dev/null
+++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/source/BeamUnboundedSourceVertex.java
@@ -0,0 +1,179 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nemo.compiler.frontend.beam.source;
+
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.beam.sdk.io.UnboundedSource;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.nemo.common.ir.Readable;
+import org.apache.nemo.common.ir.vertex.IRVertex;
+import org.apache.nemo.common.ir.vertex.SourceVertex;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * SourceVertex implementation for UnboundedSource.
+ * @param <O> output type.
+ * @param <M> checkpoint mark type.
+ */
+public final class BeamUnboundedSourceVertex<O, M extends UnboundedSource.CheckpointMark> extends
+  SourceVertex<WindowedValue<O>> {
+
+  private static final Logger LOG = LoggerFactory.getLogger(BeamUnboundedSourceVertex.class.getName());
+  private UnboundedSource<O, M> source;
+  private final String sourceDescription;
+
+  private static final long POLLING_INTERVAL = 10L;
+
+  /**
+   * The default constructor for beam unbounded source.
+   * @param source unbounded source.
+   */
+  public BeamUnboundedSourceVertex(final UnboundedSource<O, M> source) {
+    super();
+    this.source = source;
+    this.sourceDescription = source.toString();
+  }
+
+  private BeamUnboundedSourceVertex(final BeamUnboundedSourceVertex<O, M> that) {
+    super(that);
+    this.source = that.source;
+    this.sourceDescription = that.source.toString();
+  }
+
+  @Override
+  public IRVertex getClone() {
+    return new BeamUnboundedSourceVertex<>(this);
+  }
+
+  @Override
+  public List<Readable<WindowedValue<O>>> getReadables(final int desiredNumOfSplits) throws Exception {
+    final List<Readable<WindowedValue<O>>> readables = new ArrayList<>();
+    source.split(desiredNumOfSplits, null)
+      .forEach(unboundedSource -> readables.add(new UnboundedSourceReadable<>(unboundedSource)));
+    return readables;
+  }
+
+  @Override
+  public void clearInternalStates() {
+    source = null;
+  }
+
+  @Override
+  public ObjectNode getPropertiesAsJsonNode() {
+    final ObjectNode node = getIRVertexPropertiesAsJsonNode();
+    node.put("source", sourceDescription);
+    return node;
+  }
+
+  /**
+   * UnboundedSourceReadable class.
+   * @param <O> output type.
+   * @param <M> checkpoint mark type.
+   */
+  private static final class UnboundedSourceReadable<O, M extends UnboundedSource.CheckpointMark>
+      implements Readable<WindowedValue<O>> {
+    private final UnboundedSource<O, M> unboundedSource;
+
+    UnboundedSourceReadable(final UnboundedSource<O, M> unboundedSource) {
+      this.unboundedSource = unboundedSource;
+    }
+
+    @Override
+    public Iterable<WindowedValue<O>> read() throws IOException {
+      return new UnboundedSourceIterable<>(unboundedSource);
+    }
+
+    @Override
+    public List<String> getLocations() throws Exception {
+      return new ArrayList<>();
+    }
+  }
+
+  /**
+   * The iterable class for unbounded sources.
+   * @param <O> output type.
+   * @param <M> checkpoint mark type.
+   */
+  private static final class UnboundedSourceIterable<O, M extends UnboundedSource.CheckpointMark>
+      implements Iterable<WindowedValue<O>> {
+
+    private UnboundedSourceIterator<O, M> iterator;
+
+    UnboundedSourceIterable(final UnboundedSource<O, M> unboundedSource) throws IOException {
+      this.iterator = new UnboundedSourceIterator<>(unboundedSource);
+    }
+
+    @Override
+    public Iterator<WindowedValue<O>> iterator() {
+      return iterator;
+    }
+  }
+
+  /**
+   * The iterator for unbounded sources.
+   * @param <O> output type.
+   * @param <M> checkpoint mark type.
+   */
+  // TODO #233: Emit watermark at unbounded source
+  private static final class UnboundedSourceIterator<O, M extends UnboundedSource.CheckpointMark>
+      implements Iterator<WindowedValue<O>> {
+
+    private final UnboundedSource.UnboundedReader<O> unboundedReader;
+    private boolean available;
+
+    UnboundedSourceIterator(final UnboundedSource<O, M> unboundedSource) throws IOException {
+      this.unboundedReader = unboundedSource.createReader(null, null);
+      available = unboundedReader.start();
+    }
+
+    @Override
+    public boolean hasNext() {
+      // Unbounded source always has next element until it finishes.
+      return true;
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public WindowedValue<O> next() {
+      try {
+        while (true) {
+          if (!available) {
+            Thread.sleep(POLLING_INTERVAL);
+          } else {
+            final O element = unboundedReader.getCurrent();
+            final boolean windowed = element instanceof WindowedValue;
+            if (!windowed) {
+              return WindowedValue.valueInGlobalWindow(element);
+            } else {
+              return (WindowedValue<O>) element;
+            }
+          }
+          available = unboundedReader.advance();
+        }
+      } catch (final InterruptedException | IOException e) {
+        LOG.error("Exception occurred while waiting for the events...");
+        e.printStackTrace();
+        throw new RuntimeException(e);
+      }
+    }
+  }
+}