You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nemo.apache.org by ta...@apache.org on 2018/10/25 06:59:11 UTC
[incubator-nemo] branch master updated: [NEMO-128] Support Beam
UnboundedSource (#125)
This is an automated email from the ASF dual-hosted git repository.
taegeonum pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git
The following commit(s) were added to refs/heads/master by this push:
new 13cb991 [NEMO-128] Support Beam UnboundedSource (#125)
13cb991 is described below
commit 13cb99155e0ae537b6cb65528839beface3fc6fb
Author: Gyewon Lee <st...@gmail.com>
AuthorDate: Thu Oct 25 15:59:06 2018 +0900
[NEMO-128] Support Beam UnboundedSource (#125)
JIRA: [NEMO-128: Support Beam UnboundedSource](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-128)
**Major changes:**
- Support Beam UnboundedSources
**Minor changes to note:**
-
**Tests for the changes:**
-
**Other comments:**
-
Closes #
---
.../compiler/frontend/beam/PipelineTranslator.java | 11 ++
.../beam/source/BeamUnboundedSourceVertex.java | 179 +++++++++++++++++++++
2 files changed, 190 insertions(+)
diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslator.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslator.java
index a4e5d1b..9fab64e 100644
--- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslator.java
+++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslator.java
@@ -34,6 +34,7 @@ import org.apache.nemo.compiler.frontend.beam.PipelineVisitor.*;
import org.apache.nemo.compiler.frontend.beam.coder.BeamDecoderFactory;
import org.apache.nemo.compiler.frontend.beam.coder.BeamEncoderFactory;
import org.apache.nemo.compiler.frontend.beam.source.BeamBoundedSourceVertex;
+import org.apache.nemo.compiler.frontend.beam.source.BeamUnboundedSourceVertex;
import org.apache.nemo.compiler.frontend.beam.transform.*;
import org.apache.beam.sdk.coders.*;
import org.apache.beam.sdk.io.Read;
@@ -116,6 +117,16 @@ public final class PipelineTranslator
}
}
+ @PrimitiveTransformTranslator(Read.Unbounded.class)
+ private static void unboundedReadTranslator(final TranslationContext ctx,
+ final PrimitiveTransformVertex transformVertex,
+ final Read.Unbounded<?> transform) {
+ final IRVertex vertex = new BeamUnboundedSourceVertex<>(transform.getSource());
+ ctx.addVertex(vertex);
+ transformVertex.getNode().getInputs().values().forEach(input -> ctx.addEdgeTo(vertex, input));
+ transformVertex.getNode().getOutputs().values().forEach(output -> ctx.registerMainOutputFrom(vertex, output));
+ }
+
@PrimitiveTransformTranslator(Read.Bounded.class)
private static void boundedReadTranslator(final TranslationContext ctx,
final PrimitiveTransformVertex transformVertex,
diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/source/BeamUnboundedSourceVertex.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/source/BeamUnboundedSourceVertex.java
new file mode 100644
index 0000000..b799d3b
--- /dev/null
+++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/source/BeamUnboundedSourceVertex.java
@@ -0,0 +1,179 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nemo.compiler.frontend.beam.source;
+
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.beam.sdk.io.UnboundedSource;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.nemo.common.ir.Readable;
+import org.apache.nemo.common.ir.vertex.IRVertex;
+import org.apache.nemo.common.ir.vertex.SourceVertex;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * SourceVertex implementation for UnboundedSource.
+ * @param <O> output type.
+ * @param <M> checkpoint mark type.
+ */
+public final class BeamUnboundedSourceVertex<O, M extends UnboundedSource.CheckpointMark> extends
+ SourceVertex<WindowedValue<O>> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(BeamUnboundedSourceVertex.class.getName());
+ private UnboundedSource<O, M> source;
+ private final String sourceDescription;
+
+ private static final long POLLING_INTERVAL = 10L;
+
+ /**
+ * The default constructor for beam unbounded source.
+ * @param source unbounded source.
+ */
+ public BeamUnboundedSourceVertex(final UnboundedSource<O, M> source) {
+ super();
+ this.source = source;
+ this.sourceDescription = source.toString();
+ }
+
+ private BeamUnboundedSourceVertex(final BeamUnboundedSourceVertex<O, M> that) {
+ super(that);
+ this.source = that.source;
+ this.sourceDescription = that.source.toString();
+ }
+
+ @Override
+ public IRVertex getClone() {
+ return new BeamUnboundedSourceVertex<>(this);
+ }
+
+ @Override
+ public List<Readable<WindowedValue<O>>> getReadables(final int desiredNumOfSplits) throws Exception {
+ final List<Readable<WindowedValue<O>>> readables = new ArrayList<>();
+ source.split(desiredNumOfSplits, null)
+ .forEach(unboundedSource -> readables.add(new UnboundedSourceReadable<>(unboundedSource)));
+ return readables;
+ }
+
+ @Override
+ public void clearInternalStates() {
+ source = null;
+ }
+
+ @Override
+ public ObjectNode getPropertiesAsJsonNode() {
+ final ObjectNode node = getIRVertexPropertiesAsJsonNode();
+ node.put("source", sourceDescription);
+ return node;
+ }
+
+ /**
+ * UnboundedSourceReadable class.
+ * @param <O> output type.
+ * @param <M> checkpoint mark type.
+ */
+ private static final class UnboundedSourceReadable<O, M extends UnboundedSource.CheckpointMark>
+ implements Readable<WindowedValue<O>> {
+ private final UnboundedSource<O, M> unboundedSource;
+
+ UnboundedSourceReadable(final UnboundedSource<O, M> unboundedSource) {
+ this.unboundedSource = unboundedSource;
+ }
+
+ @Override
+ public Iterable<WindowedValue<O>> read() throws IOException {
+ return new UnboundedSourceIterable<>(unboundedSource);
+ }
+
+ @Override
+ public List<String> getLocations() throws Exception {
+ return new ArrayList<>();
+ }
+ }
+
+ /**
+ * The iterable class for unbounded sources.
+ * @param <O> output type.
+ * @param <M> checkpoint mark type.
+ */
+ private static final class UnboundedSourceIterable<O, M extends UnboundedSource.CheckpointMark>
+ implements Iterable<WindowedValue<O>> {
+
+ private UnboundedSourceIterator<O, M> iterator;
+
+ UnboundedSourceIterable(final UnboundedSource<O, M> unboundedSource) throws IOException {
+ this.iterator = new UnboundedSourceIterator<>(unboundedSource);
+ }
+
+ @Override
+ public Iterator<WindowedValue<O>> iterator() {
+ return iterator;
+ }
+ }
+
+ /**
+ * The iterator for unbounded sources.
+ * @param <O> output type.
+ * @param <M> checkpoint mark type.
+ */
+ // TODO #233: Emit watermark at unbounded source
+ private static final class UnboundedSourceIterator<O, M extends UnboundedSource.CheckpointMark>
+ implements Iterator<WindowedValue<O>> {
+
+ private final UnboundedSource.UnboundedReader<O> unboundedReader;
+ private boolean available;
+
+ UnboundedSourceIterator(final UnboundedSource<O, M> unboundedSource) throws IOException {
+ this.unboundedReader = unboundedSource.createReader(null, null);
+ available = unboundedReader.start();
+ }
+
+ @Override
+ public boolean hasNext() {
+ // Unbounded source always has next element until it finishes.
+ return true;
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public WindowedValue<O> next() {
+ try {
+ while (true) {
+ if (!available) {
+ Thread.sleep(POLLING_INTERVAL);
+ } else {
+ final O element = unboundedReader.getCurrent();
+ final boolean windowed = element instanceof WindowedValue;
+ if (!windowed) {
+ return WindowedValue.valueInGlobalWindow(element);
+ } else {
+ return (WindowedValue<O>) element;
+ }
+ }
+ available = unboundedReader.advance();
+ }
+ } catch (final InterruptedException | IOException e) {
+ LOG.error("Exception occurred while waiting for the events...");
+ e.printStackTrace();
+ throw new RuntimeException(e);
+ }
+ }
+ }
+}