You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nemo.apache.org by jo...@apache.org on 2018/11/02 03:26:59 UTC
[incubator-nemo] branch master updated: [NEMO-252] Fix
CreatViewTransform to emit windowed materialized data (#141)
This is an automated email from the ASF dual-hosted git repository.
johnyangk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git
The following commit(s) were added to refs/heads/master by this push:
new a726a5e [NEMO-252] Fix CreatViewTransform to emit windowed materialized data (#141)
a726a5e is described below
commit a726a5eda3758acfae5d0c6ebee830aecd98a2c3
Author: Taegeon Um <ta...@gmail.com>
AuthorDate: Fri Nov 2 12:26:55 2018 +0900
[NEMO-252] Fix CreatViewTransform to emit windowed materialized data (#141)
JIRA: [NEMO-252: Fix CreatViewTransform to emit windowed materialized data](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-252)
**Major changes:**
- Fix `CreateViewTransform` to collect windowed data and emit them by applying a view function
**Minor changes**
- Fix emitting output watermarks in `GroupByKeyAndWindowDoFnTransform`
**Tests for the changes:**
- `CreateViewTransformTest` that tests materialized data in windows
---
.../compiler/frontend/beam/PipelineTranslator.java | 2 +-
.../beam/transform/CreateViewTransform.java | 108 ++++++++++-----
.../GroupByKeyAndWindowDoFnTransform.java | 79 ++++++-----
.../beam/transform/CreateViewTransformTest.java | 150 +++++++++++++++++++++
4 files changed, 278 insertions(+), 61 deletions(-)
diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslator.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslator.java
index 7dc7af6..ee10b21 100644
--- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslator.java
+++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslator.java
@@ -272,7 +272,7 @@ public final class PipelineTranslator
private static void createPCollectionViewTranslator(final TranslationContext ctx,
final PrimitiveTransformVertex transformVertex,
final View.CreatePCollectionView<?, ?> transform) {
- final IRVertex vertex = new OperatorVertex(new CreateViewTransform<>(transform.getView()));
+ final IRVertex vertex = new OperatorVertex(new CreateViewTransform(transform.getView().getViewFn()));
ctx.addVertex(vertex);
transformVertex.getNode().getInputs().values().forEach(input -> ctx.addEdgeTo(vertex, input));
ctx.registerMainOutputFrom(vertex, transform.getView());
diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/CreateViewTransform.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/CreateViewTransform.java
index d60bcfc..05e5af6 100644
--- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/CreateViewTransform.java
+++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/CreateViewTransform.java
@@ -18,37 +18,43 @@
*/
package org.apache.nemo.compiler.frontend.beam.transform;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.nemo.common.ir.OutputCollector;
-import org.apache.nemo.common.ir.vertex.transform.NoWatermarkEmitTransform;
import org.apache.beam.sdk.transforms.Materializations;
import org.apache.beam.sdk.transforms.ViewFn;
import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.nemo.common.ir.vertex.transform.Transform;
+import org.apache.nemo.common.punctuation.Watermark;
import javax.annotation.Nullable;
import java.io.Serializable;
-import java.util.ArrayList;
+import java.util.*;
/**
- * CreateView transform implementation.
- * @param <I> input type.
- * @param <O> output type.
+ * This transforms emits materialized data for each window.
+ * @param <I> input type
+ * @param <O> materialized output type
*/
-public final class CreateViewTransform<I, O> extends NoWatermarkEmitTransform<WindowedValue<I>, WindowedValue<O>> {
- private final PCollectionView pCollectionView;
+public final class CreateViewTransform<I, O> implements
+ Transform<WindowedValue<KV<?, I>>, WindowedValue<O>> {
private OutputCollector<WindowedValue<O>> outputCollector;
private final ViewFn<Materializations.MultimapView<Void, ?>, O> viewFn;
- private final MultiView<Object> multiView;
+ private final Map<BoundedWindow, List<I>> windowListMap;
+
+ // TODO #259: we can remove this variable by implementing ReadyCheckingSideInputReader
+ private boolean isEmitted = false;
+ private long currentOutputWatermark;
/**
* Constructor of CreateViewTransform.
- * @param pCollectionView the pCollectionView to create.
+ * @param viewFn the viewFn that materializes data.
*/
- public CreateViewTransform(final PCollectionView<O> pCollectionView) {
- this.pCollectionView = pCollectionView;
- this.viewFn = this.pCollectionView.getViewFn();
- this.multiView = new MultiView<>();
+ public CreateViewTransform(final ViewFn<Materializations.MultimapView<Void, ?>, O> viewFn) {
+ this.viewFn = viewFn;
+ this.windowListMap = new HashMap<>();
+ this.currentOutputWatermark = Long.MIN_VALUE;
}
@Override
@@ -57,23 +63,69 @@ public final class CreateViewTransform<I, O> extends NoWatermarkEmitTransform<Wi
}
@Override
- public void onData(final WindowedValue<I> element) {
- // TODO #216: support window in view
- final KV kv = ((WindowedValue<KV>) element).getValue();
- multiView.getDataList().add(kv.getValue());
+ public void onData(final WindowedValue<KV<?, I>> element) {
+ // The key of element is always null (beam's semantic)
+ // because view is a globally materialized data regardless of key
+ for (final BoundedWindow window : element.getWindows()) {
+ windowListMap.putIfAbsent(window, new ArrayList<>());
+ final List<I> list = windowListMap.get(window);
+ list.add(element.getValue().getValue());
+ }
+ }
+
+ @Override
+ public void onWatermark(final Watermark inputWatermark) {
+
+ // If no data, just forwards the watermark
+ if (windowListMap.size() == 0 && currentOutputWatermark < inputWatermark.getTimestamp()) {
+ currentOutputWatermark = inputWatermark.getTimestamp();
+ outputCollector.emitWatermark(inputWatermark);
+ return;
+ }
+
+ final Iterator<Map.Entry<BoundedWindow, List<I>>> iterator = windowListMap.entrySet().iterator();
+ long minOutputTimestampOfEmittedWindows = Long.MAX_VALUE;
+
+ while (iterator.hasNext()) {
+ final Map.Entry<BoundedWindow, List<I>> entry = iterator.next();
+ if (entry.getKey().maxTimestamp().getMillis() <= inputWatermark.getTimestamp()) {
+ // emit the windowed data if the watermark timestamp > the window max boundary
+ final O view = viewFn.apply(new MultiView<>(entry.getValue()));
+ outputCollector.emit(WindowedValue.of(
+ view, entry.getKey().maxTimestamp(), entry.getKey(), PaneInfo.ON_TIME_AND_ONLY_FIRING));
+ iterator.remove();
+ isEmitted = true;
+
+ minOutputTimestampOfEmittedWindows =
+ Math.min(minOutputTimestampOfEmittedWindows, entry.getKey().maxTimestamp().getMillis());
+ }
+ }
+
+ if (minOutputTimestampOfEmittedWindows != Long.MAX_VALUE
+ && currentOutputWatermark < minOutputTimestampOfEmittedWindows) {
+ // update current output watermark and emit to next operators
+ currentOutputWatermark = minOutputTimestampOfEmittedWindows;
+ outputCollector.emitWatermark(new Watermark(currentOutputWatermark));
+ }
}
@Override
public void close() {
- final Object view = viewFn.apply(multiView);
- // TODO #216: support window in view
- outputCollector.emit(WindowedValue.valueInGlobalWindow((O) view));
+ onWatermark(new Watermark(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()));
+
+ if (!isEmitted) {
+ // TODO #259: This is an ad-hoc code to resolve the view that has no data
+ // Currently, broadCastWorker reads the view data, but it throws exception if no data is available for a view.
+ // We should use watermark value to track whether the materialized data in a view is available or not.
+ final O view = viewFn.apply(new MultiView<>(Collections.emptyList()));
+ outputCollector.emit(WindowedValue.valueInGlobalWindow(view));
+ }
}
@Override
public String toString() {
final StringBuilder sb = new StringBuilder();
- sb.append("CreateViewTransform:" + pCollectionView);
+ sb.append("CreateViewTransform:" + viewFn);
return sb.toString();
}
@@ -82,23 +134,19 @@ public final class CreateViewTransform<I, O> extends NoWatermarkEmitTransform<Wi
* @param <T> primitive view type
*/
public final class MultiView<T> implements Materializations.MultimapView<Void, T>, Serializable {
- private final ArrayList<T> dataList;
+ private final Iterable<T> iterable;
/**
* Constructor.
*/
- MultiView() {
+ MultiView(final Iterable<T> iterable) {
// Create a placeholder for side input data. CreateViewTransform#onData stores data to this list.
- dataList = new ArrayList<>();
+ this.iterable = iterable;
}
@Override
public Iterable<T> get(@Nullable final Void aVoid) {
- return dataList;
- }
-
- public ArrayList<T> getDataList() {
- return dataList;
+ return iterable;
}
}
}
diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransform.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransform.java
index 7c44b76..7d20f26 100644
--- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransform.java
+++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransform.java
@@ -48,6 +48,7 @@ public final class GroupByKeyAndWindowDoFnTransform<K, InputT>
private final Map<K, List<WindowedValue<InputT>>> keyToValues;
private transient InMemoryTimerInternalsFactory inMemoryTimerInternalsFactory;
private transient InMemoryStateInternalsFactory inMemoryStateInternalsFactory;
+ private long currentOutputWatermark;
/**
* GroupByKey constructor.
@@ -69,6 +70,7 @@ public final class GroupByKeyAndWindowDoFnTransform<K, InputT>
options);
this.keyToValues = new HashMap<>();
this.reduceFn = reduceFn;
+ this.currentOutputWatermark = Long.MIN_VALUE;
}
/**
@@ -113,34 +115,50 @@ public final class GroupByKeyAndWindowDoFnTransform<K, InputT>
/**
* Process the collected data and trigger timers.
- * @param watermark current watermark
+ * @param inputWatermark current input watermark
* @param processingTime processing time
* @param synchronizedTime synchronized time
*/
- private void processElementsAndTriggerTimers(final Watermark watermark,
+ private void processElementsAndTriggerTimers(final Watermark inputWatermark,
final Instant processingTime,
final Instant synchronizedTime) {
- keyToValues.forEach((key, val) -> {
+ long minOutputTimestampsOfEmittedWindows = Long.MAX_VALUE;
+
+ for (final Map.Entry<K, List<WindowedValue<InputT>>> entry : keyToValues.entrySet()) {
+ final K key = entry.getKey();
+ final List<WindowedValue<InputT>> values = entry.getValue();
+
// for each key
// Process elements
- if (!val.isEmpty()) {
+ if (!values.isEmpty()) {
final KeyedWorkItem<K, InputT> keyedWorkItem =
- KeyedWorkItems.elementsWorkItem(key, val);
+ KeyedWorkItems.elementsWorkItem(key, values);
// The DoFnRunner interface requires WindowedValue,
// but this windowed value is actually not used in the ReduceFnRunner internal.
getDoFnRunner().processElement(WindowedValue.valueInGlobalWindow(keyedWorkItem));
}
// Trigger timers
- triggerTimers(key, watermark, processingTime, synchronizedTime);
+ final long minOutputTimestamp =
+ triggerTimers(key, inputWatermark, processingTime, synchronizedTime);
+
+ minOutputTimestampsOfEmittedWindows = Math.min(minOutputTimestampsOfEmittedWindows, minOutputTimestamp);
+
// Remove values
- val.clear();
- });
+ values.clear();
+ }
+
+ // Emit watermark to downstream operators
+ if (minOutputTimestampsOfEmittedWindows != Long.MAX_VALUE
+ && currentOutputWatermark < minOutputTimestampsOfEmittedWindows) {
+ currentOutputWatermark = minOutputTimestampsOfEmittedWindows;
+ getOutputCollector().emitWatermark(new Watermark(minOutputTimestampsOfEmittedWindows));
+ }
}
@Override
- public void onWatermark(final Watermark watermark) {
- processElementsAndTriggerTimers(watermark, Instant.now(), Instant.now());
+ public void onWatermark(final Watermark inputWatermark) {
+ processElementsAndTriggerTimers(inputWatermark, Instant.now(), Instant.now());
}
/**
@@ -161,8 +179,10 @@ public final class GroupByKeyAndWindowDoFnTransform<K, InputT>
* @param watermark watermark
* @param processingTime processing time
* @param synchronizedTime synchronized time
+ * @return the minimum output timestamp.
+ * If no data is emitted, it returns Long.MAX_VALUE.
*/
- private void triggerTimers(final K key,
+ private long triggerTimers(final K key,
final Watermark watermark,
final Instant processingTime,
final Instant synchronizedTime) {
@@ -179,28 +199,27 @@ public final class GroupByKeyAndWindowDoFnTransform<K, InputT>
final List<TimerInternals.TimerData> timerDataList = getEligibleTimers(timerInternals);
if (timerDataList.isEmpty()) {
- return;
- }
+ return Long.MAX_VALUE;
+ } else {
- // Trigger timers and emit windowed data
- final KeyedWorkItem<K, InputT> timerWorkItem =
- KeyedWorkItems.timersWorkItem(key, timerDataList);
- // The DoFnRunner interface requires WindowedValue,
- // but this windowed value is actually not used in the ReduceFnRunner internal.
- getDoFnRunner().processElement(WindowedValue.valueInGlobalWindow(timerWorkItem));
-
- // output watermark
- // we set output watermark to the minimum of the timer data
- long outputWatermark = Long.MAX_VALUE;
- for (final TimerInternals.TimerData timer : timerDataList) {
- if (outputWatermark > timer.getTimestamp().getMillis()) {
- outputWatermark = timer.getTimestamp().getMillis();
+ // Trigger timers and emit windowed data
+ final KeyedWorkItem<K, InputT> timerWorkItem =
+ KeyedWorkItems.timersWorkItem(key, timerDataList);
+ // The DoFnRunner interface requires WindowedValue,
+ // but this windowed value is actually not used in the ReduceFnRunner internal.
+ getDoFnRunner().processElement(WindowedValue.valueInGlobalWindow(timerWorkItem));
+
+ // output watermark
+ // we set output watermark to the minimum of the timer data
+ long keyOutputTimestamp = Long.MAX_VALUE;
+ for (final TimerInternals.TimerData timer : timerDataList) {
+ keyOutputTimestamp = Math.min(keyOutputTimestamp, timer.getTimestamp().getMillis());
}
- }
- // Emit watermark to downstream operators
- timerInternals.advanceOutputWatermark(new Instant(outputWatermark));
- getOutputCollector().emitWatermark(new Watermark(outputWatermark));
+ timerInternals.advanceOutputWatermark(new Instant(keyOutputTimestamp));
+
+ return keyOutputTimestamp;
+ }
}
@Override
diff --git a/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/CreateViewTransformTest.java b/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/CreateViewTransformTest.java
new file mode 100644
index 0000000..762e327
--- /dev/null
+++ b/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/CreateViewTransformTest.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.compiler.frontend.beam.transform;
+
+import org.apache.beam.sdk.transforms.Materialization;
+import org.apache.beam.sdk.transforms.Materializations;
+import org.apache.beam.sdk.transforms.ViewFn;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
+import org.apache.nemo.common.ir.vertex.transform.Transform;
+import org.apache.nemo.common.punctuation.Watermark;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Test;
+
+import java.util.*;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+
+public final class CreateViewTransformTest {
+
+ // [---- window1 --------] [--------------- window2 ---------------]
+ // ts1 -- ts2 -- ts3 -- watermark -- ts4 -- watermark2 -- ts5 --ts6 --ts7 -- watermark7
+ // (null, "hello")
+ // (null, "world")
+ // (null, "hello")
+ // ==> window1: {3} (calculate # of elements)
+ // (null, "a")
+ // (null,"a")
+ // (null,"a")
+ // (null,"b")
+ // => window2: {4} (calculate # of elements)
+ @Test
+ @SuppressWarnings("unchecked")
+ public void test() {
+
+ final FixedWindows fixedwindows = FixedWindows.of(Duration.standardSeconds(1));
+ final CreateViewTransform<String, Integer> viewTransform =
+ new CreateViewTransform(new SumViewFn());
+
+ final Instant ts1 = new Instant(1);
+ final Instant ts2 = new Instant(100);
+ final Instant ts3 = new Instant(300);
+ final Watermark watermark = new Watermark(1003);
+ final Instant ts4 = new Instant(1200);
+ final Watermark watermark2 = new Watermark(1400);
+ final Instant ts5 = new Instant(1600);
+ final Instant ts6 = new Instant(1800);
+ final Instant ts7 = new Instant(1900);
+ final Watermark watermark3 = new Watermark(2100);
+
+
+ final Transform.Context context = mock(Transform.Context.class);
+ final TestOutputCollector<Integer> oc = new TestOutputCollector();
+ viewTransform.prepare(context, oc);
+
+ viewTransform.onData(WindowedValue.of(
+ KV.of(null, "hello"), ts1, fixedwindows.assignWindow(ts1), PaneInfo.NO_FIRING));
+
+ viewTransform.onData(WindowedValue.of(
+ KV.of(null, "world"), ts2, fixedwindows.assignWindow(ts2), PaneInfo.NO_FIRING));
+
+ viewTransform.onData(WindowedValue.of(
+ KV.of(null, "hello"), ts3, fixedwindows.assignWindow(ts3), PaneInfo.NO_FIRING));
+
+ viewTransform.onWatermark(watermark);
+
+ // materialized data
+ assertEquals(Arrays.asList(fixedwindows.assignWindow(ts1)), oc.outputs.get(0).getWindows());
+ assertEquals(new Integer(3), oc.outputs.get(0).getValue());
+
+ // check output watermark
+ assertEquals(fixedwindows.assignWindow(ts1).maxTimestamp().getMillis(),
+ oc.watermarks.get(0).getTimestamp());
+
+ oc.outputs.clear();
+ oc.watermarks.clear();
+
+
+ viewTransform.onData(WindowedValue.of(
+ KV.of(null, "a"), ts4, fixedwindows.assignWindow(ts4), PaneInfo.NO_FIRING));
+
+ // do not emit anything
+ viewTransform.onWatermark(watermark2);
+ assertEquals(0, oc.outputs.size());
+ assertEquals(0, oc.watermarks.size());
+
+ viewTransform.onData(WindowedValue.of(
+ KV.of(null, "a"), ts5, fixedwindows.assignWindow(ts5), PaneInfo.NO_FIRING));
+
+ viewTransform.onData(WindowedValue.of(
+ KV.of(null, "a"), ts6, fixedwindows.assignWindow(ts6), PaneInfo.NO_FIRING));
+
+ viewTransform.onData(WindowedValue.of(
+ KV.of(null, "b"), ts7, fixedwindows.assignWindow(ts7), PaneInfo.NO_FIRING));
+
+ // emit windowed value
+ viewTransform.onWatermark(watermark3);
+
+ // materialized data
+ assertEquals(Arrays.asList(fixedwindows.assignWindow(ts4)), oc.outputs.get(0).getWindows());
+ assertEquals(new Integer(4), oc.outputs.get(0).getValue());
+
+ // check output watermark
+ assertEquals(fixedwindows.assignWindow(ts4).maxTimestamp().getMillis(),
+ oc.watermarks.get(0).getTimestamp());
+
+ oc.outputs.clear();
+
+ viewTransform.close();
+ assertEquals(0, oc.outputs.size());
+ }
+
+ final class SumViewFn extends ViewFn<Materializations.MultimapView<Void, String>, Integer> {
+
+ @Override
+ public Materialization<Materializations.MultimapView<Void, String>> getMaterialization() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Integer apply(final Materializations.MultimapView<Void, String> view) {
+ int sum = 0;
+ // MultimapView.get is Nullable
+ for (String s : view.get(null)) {
+ sum += 1;
+ }
+ return sum;
+ }
+ }
+}