You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nemo.apache.org by jo...@apache.org on 2018/11/02 03:26:59 UTC

[incubator-nemo] branch master updated: [NEMO-252] Fix CreatViewTransform to emit windowed materialized data (#141)

This is an automated email from the ASF dual-hosted git repository.

johnyangk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git


The following commit(s) were added to refs/heads/master by this push:
     new a726a5e  [NEMO-252] Fix CreatViewTransform to emit windowed materialized data (#141)
a726a5e is described below

commit a726a5eda3758acfae5d0c6ebee830aecd98a2c3
Author: Taegeon Um <ta...@gmail.com>
AuthorDate: Fri Nov 2 12:26:55 2018 +0900

    [NEMO-252] Fix CreatViewTransform to emit windowed materialized data (#141)
    
    JIRA: [NEMO-252: Fix CreatViewTransform to emit windowed materialized data](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-252)
    
    **Major changes:**
    - Fix `CreateViewTransform` to collect windowed data and emit them by applying a view function
    
    **Minor changes**
    - Fix emitting output watermarks in `GroupByKeyAndWindowDoFnTransform`
    
    **Tests for the changes:**
    - `CreateViewTransformTest` that tests materialized data in windows
---
 .../compiler/frontend/beam/PipelineTranslator.java |   2 +-
 .../beam/transform/CreateViewTransform.java        | 108 ++++++++++-----
 .../GroupByKeyAndWindowDoFnTransform.java          |  79 ++++++-----
 .../beam/transform/CreateViewTransformTest.java    | 150 +++++++++++++++++++++
 4 files changed, 278 insertions(+), 61 deletions(-)

diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslator.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslator.java
index 7dc7af6..ee10b21 100644
--- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslator.java
+++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslator.java
@@ -272,7 +272,7 @@ public final class PipelineTranslator
   private static void createPCollectionViewTranslator(final TranslationContext ctx,
                                                       final PrimitiveTransformVertex transformVertex,
                                                       final View.CreatePCollectionView<?, ?> transform) {
-    final IRVertex vertex = new OperatorVertex(new CreateViewTransform<>(transform.getView()));
+    final IRVertex vertex = new OperatorVertex(new CreateViewTransform(transform.getView().getViewFn()));
     ctx.addVertex(vertex);
     transformVertex.getNode().getInputs().values().forEach(input -> ctx.addEdgeTo(vertex, input));
     ctx.registerMainOutputFrom(vertex, transform.getView());
diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/CreateViewTransform.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/CreateViewTransform.java
index d60bcfc..05e5af6 100644
--- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/CreateViewTransform.java
+++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/CreateViewTransform.java
@@ -18,37 +18,43 @@
  */
 package org.apache.nemo.compiler.frontend.beam.transform;
 
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.nemo.common.ir.OutputCollector;
-import org.apache.nemo.common.ir.vertex.transform.NoWatermarkEmitTransform;
 import org.apache.beam.sdk.transforms.Materializations;
 import org.apache.beam.sdk.transforms.ViewFn;
 import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.nemo.common.ir.vertex.transform.Transform;
+import org.apache.nemo.common.punctuation.Watermark;
 
 import javax.annotation.Nullable;
 import java.io.Serializable;
-import java.util.ArrayList;
+import java.util.*;
 
 /**
- * CreateView transform implementation.
- * @param <I> input type.
- * @param <O> output type.
+ * This transforms emits materialized data for each window.
+ * @param <I> input type
+ * @param <O> materialized output type
  */
-public final class CreateViewTransform<I, O> extends NoWatermarkEmitTransform<WindowedValue<I>, WindowedValue<O>> {
-  private final PCollectionView pCollectionView;
+public final class CreateViewTransform<I, O> implements
+  Transform<WindowedValue<KV<?, I>>, WindowedValue<O>> {
   private OutputCollector<WindowedValue<O>> outputCollector;
   private final ViewFn<Materializations.MultimapView<Void, ?>, O> viewFn;
-  private final MultiView<Object> multiView;
+  private final Map<BoundedWindow, List<I>> windowListMap;
+
+  // TODO #259: we can remove this variable by implementing ReadyCheckingSideInputReader
+  private boolean isEmitted = false;
+  private long currentOutputWatermark;
 
   /**
    * Constructor of CreateViewTransform.
-   * @param pCollectionView the pCollectionView to create.
+   * @param viewFn the viewFn that materializes data.
    */
-  public CreateViewTransform(final PCollectionView<O> pCollectionView) {
-    this.pCollectionView = pCollectionView;
-    this.viewFn = this.pCollectionView.getViewFn();
-    this.multiView = new MultiView<>();
+  public CreateViewTransform(final ViewFn<Materializations.MultimapView<Void, ?>, O> viewFn)  {
+    this.viewFn = viewFn;
+    this.windowListMap = new HashMap<>();
+    this.currentOutputWatermark = Long.MIN_VALUE;
   }
 
   @Override
@@ -57,23 +63,69 @@ public final class CreateViewTransform<I, O> extends NoWatermarkEmitTransform<Wi
   }
 
   @Override
-  public void onData(final WindowedValue<I> element) {
-    // TODO #216: support window in view
-    final KV kv = ((WindowedValue<KV>) element).getValue();
-    multiView.getDataList().add(kv.getValue());
+  public void onData(final WindowedValue<KV<?, I>> element) {
+    // The key of element is always null (beam's semantic)
+    // because view is a globally materialized data regardless of key
+    for (final BoundedWindow window : element.getWindows()) {
+      windowListMap.putIfAbsent(window, new ArrayList<>());
+      final List<I> list = windowListMap.get(window);
+      list.add(element.getValue().getValue());
+    }
+  }
+
+  @Override
+  public void onWatermark(final Watermark inputWatermark) {
+
+    // If no data, just forwards the watermark
+    if (windowListMap.size() == 0 && currentOutputWatermark < inputWatermark.getTimestamp()) {
+      currentOutputWatermark = inputWatermark.getTimestamp();
+      outputCollector.emitWatermark(inputWatermark);
+      return;
+    }
+
+    final Iterator<Map.Entry<BoundedWindow, List<I>>> iterator = windowListMap.entrySet().iterator();
+    long minOutputTimestampOfEmittedWindows = Long.MAX_VALUE;
+
+    while (iterator.hasNext()) {
+      final Map.Entry<BoundedWindow, List<I>> entry = iterator.next();
+      if (entry.getKey().maxTimestamp().getMillis() <= inputWatermark.getTimestamp()) {
+        // emit the windowed data if the watermark timestamp > the window max boundary
+        final O view = viewFn.apply(new MultiView<>(entry.getValue()));
+        outputCollector.emit(WindowedValue.of(
+          view, entry.getKey().maxTimestamp(), entry.getKey(), PaneInfo.ON_TIME_AND_ONLY_FIRING));
+        iterator.remove();
+        isEmitted = true;
+
+        minOutputTimestampOfEmittedWindows =
+          Math.min(minOutputTimestampOfEmittedWindows, entry.getKey().maxTimestamp().getMillis());
+      }
+    }
+
+    if (minOutputTimestampOfEmittedWindows != Long.MAX_VALUE
+      && currentOutputWatermark < minOutputTimestampOfEmittedWindows) {
+      // update current output watermark and emit to next operators
+      currentOutputWatermark = minOutputTimestampOfEmittedWindows;
+      outputCollector.emitWatermark(new Watermark(currentOutputWatermark));
+    }
   }
 
   @Override
   public void close() {
-    final Object view = viewFn.apply(multiView);
-    // TODO #216: support window in view
-    outputCollector.emit(WindowedValue.valueInGlobalWindow((O) view));
+    onWatermark(new Watermark(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()));
+
+    if (!isEmitted) {
+      // TODO #259: This is an ad-hoc code to resolve the view that has no data
+      // Currently, broadCastWorker reads the view data, but it throws exception if no data is available for a view.
+      // We should use watermark value to track whether the materialized data in a view is available or not.
+      final O view = viewFn.apply(new MultiView<>(Collections.emptyList()));
+      outputCollector.emit(WindowedValue.valueInGlobalWindow(view));
+    }
   }
 
   @Override
   public String toString() {
     final StringBuilder sb = new StringBuilder();
-    sb.append("CreateViewTransform:" + pCollectionView);
+    sb.append("CreateViewTransform:" + viewFn);
     return sb.toString();
   }
 
@@ -82,23 +134,19 @@ public final class CreateViewTransform<I, O> extends NoWatermarkEmitTransform<Wi
    * @param <T> primitive view type
    */
   public final class MultiView<T> implements Materializations.MultimapView<Void, T>, Serializable {
-    private final ArrayList<T> dataList;
+    private final Iterable<T> iterable;
 
     /**
      * Constructor.
      */
-    MultiView() {
+    MultiView(final Iterable<T> iterable) {
       // Create a placeholder for side input data. CreateViewTransform#onData stores data to this list.
-      dataList = new ArrayList<>();
+      this.iterable = iterable;
     }
 
     @Override
     public Iterable<T> get(@Nullable final Void aVoid) {
-      return dataList;
-    }
-
-    public ArrayList<T> getDataList() {
-      return dataList;
+      return iterable;
     }
   }
 }
diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransform.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransform.java
index 7c44b76..7d20f26 100644
--- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransform.java
+++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransform.java
@@ -48,6 +48,7 @@ public final class GroupByKeyAndWindowDoFnTransform<K, InputT>
   private final Map<K, List<WindowedValue<InputT>>> keyToValues;
   private transient InMemoryTimerInternalsFactory inMemoryTimerInternalsFactory;
   private transient InMemoryStateInternalsFactory inMemoryStateInternalsFactory;
+  private long currentOutputWatermark;
 
   /**
    * GroupByKey constructor.
@@ -69,6 +70,7 @@ public final class GroupByKeyAndWindowDoFnTransform<K, InputT>
       options);
     this.keyToValues = new HashMap<>();
     this.reduceFn = reduceFn;
+    this.currentOutputWatermark = Long.MIN_VALUE;
   }
 
   /**
@@ -113,34 +115,50 @@ public final class GroupByKeyAndWindowDoFnTransform<K, InputT>
 
   /**
    * Process the collected data and trigger timers.
-   * @param watermark current watermark
+   * @param inputWatermark current input watermark
    * @param processingTime processing time
    * @param synchronizedTime synchronized time
    */
-  private void processElementsAndTriggerTimers(final Watermark watermark,
+  private void processElementsAndTriggerTimers(final Watermark inputWatermark,
                                                final Instant processingTime,
                                                final Instant synchronizedTime) {
-    keyToValues.forEach((key, val) -> {
+    long minOutputTimestampsOfEmittedWindows = Long.MAX_VALUE;
+
+    for (final Map.Entry<K, List<WindowedValue<InputT>>> entry : keyToValues.entrySet()) {
+      final K key = entry.getKey();
+      final List<WindowedValue<InputT>> values = entry.getValue();
+
       // for each key
       // Process elements
-      if (!val.isEmpty()) {
+      if (!values.isEmpty()) {
         final KeyedWorkItem<K, InputT> keyedWorkItem =
-          KeyedWorkItems.elementsWorkItem(key, val);
+          KeyedWorkItems.elementsWorkItem(key, values);
         // The DoFnRunner interface requires WindowedValue,
         // but this windowed value is actually not used in the ReduceFnRunner internal.
         getDoFnRunner().processElement(WindowedValue.valueInGlobalWindow(keyedWorkItem));
       }
 
       // Trigger timers
-      triggerTimers(key, watermark, processingTime, synchronizedTime);
+      final long minOutputTimestamp =
+        triggerTimers(key, inputWatermark, processingTime, synchronizedTime);
+
+      minOutputTimestampsOfEmittedWindows = Math.min(minOutputTimestampsOfEmittedWindows, minOutputTimestamp);
+
       // Remove values
-      val.clear();
-    });
+      values.clear();
+    }
+
+    // Emit watermark to downstream operators
+    if (minOutputTimestampsOfEmittedWindows != Long.MAX_VALUE
+      && currentOutputWatermark < minOutputTimestampsOfEmittedWindows) {
+      currentOutputWatermark = minOutputTimestampsOfEmittedWindows;
+      getOutputCollector().emitWatermark(new Watermark(minOutputTimestampsOfEmittedWindows));
+    }
   }
 
   @Override
-  public void onWatermark(final Watermark watermark) {
-    processElementsAndTriggerTimers(watermark, Instant.now(), Instant.now());
+  public void onWatermark(final Watermark inputWatermark) {
+    processElementsAndTriggerTimers(inputWatermark, Instant.now(), Instant.now());
   }
 
   /**
@@ -161,8 +179,10 @@ public final class GroupByKeyAndWindowDoFnTransform<K, InputT>
    * @param watermark watermark
    * @param processingTime processing time
    * @param synchronizedTime synchronized time
+   * @return the minimum output timestamp.
+   * If no data is emitted, it returns Long.MAX_VALUE.
    */
-  private void triggerTimers(final K key,
+  private long triggerTimers(final K key,
                              final Watermark watermark,
                              final Instant processingTime,
                              final Instant synchronizedTime) {
@@ -179,28 +199,27 @@ public final class GroupByKeyAndWindowDoFnTransform<K, InputT>
     final List<TimerInternals.TimerData> timerDataList = getEligibleTimers(timerInternals);
 
     if (timerDataList.isEmpty()) {
-      return;
-    }
+      return Long.MAX_VALUE;
+    } else {
 
-    // Trigger timers and emit windowed data
-    final KeyedWorkItem<K, InputT> timerWorkItem =
-      KeyedWorkItems.timersWorkItem(key, timerDataList);
-    // The DoFnRunner interface requires WindowedValue,
-    // but this windowed value is actually not used in the ReduceFnRunner internal.
-    getDoFnRunner().processElement(WindowedValue.valueInGlobalWindow(timerWorkItem));
-
-    // output watermark
-    // we set output watermark to the minimum of the timer data
-    long outputWatermark = Long.MAX_VALUE;
-    for (final TimerInternals.TimerData timer : timerDataList) {
-      if (outputWatermark > timer.getTimestamp().getMillis()) {
-        outputWatermark = timer.getTimestamp().getMillis();
+      // Trigger timers and emit windowed data
+      final KeyedWorkItem<K, InputT> timerWorkItem =
+        KeyedWorkItems.timersWorkItem(key, timerDataList);
+      // The DoFnRunner interface requires WindowedValue,
+      // but this windowed value is actually not used in the ReduceFnRunner internal.
+      getDoFnRunner().processElement(WindowedValue.valueInGlobalWindow(timerWorkItem));
+
+      // output watermark
+      // we set output watermark to the minimum of the timer data
+      long keyOutputTimestamp = Long.MAX_VALUE;
+      for (final TimerInternals.TimerData timer : timerDataList) {
+        keyOutputTimestamp = Math.min(keyOutputTimestamp, timer.getTimestamp().getMillis());
       }
-    }
 
-    // Emit watermark to downstream operators
-    timerInternals.advanceOutputWatermark(new Instant(outputWatermark));
-    getOutputCollector().emitWatermark(new Watermark(outputWatermark));
+      timerInternals.advanceOutputWatermark(new Instant(keyOutputTimestamp));
+
+      return keyOutputTimestamp;
+    }
   }
 
   @Override
diff --git a/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/CreateViewTransformTest.java b/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/CreateViewTransformTest.java
new file mode 100644
index 0000000..762e327
--- /dev/null
+++ b/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/CreateViewTransformTest.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.compiler.frontend.beam.transform;
+
+import org.apache.beam.sdk.transforms.Materialization;
+import org.apache.beam.sdk.transforms.Materializations;
+import org.apache.beam.sdk.transforms.ViewFn;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
+import org.apache.nemo.common.ir.vertex.transform.Transform;
+import org.apache.nemo.common.punctuation.Watermark;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Test;
+
+import java.util.*;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+
+public final class CreateViewTransformTest {
+
+  // [---- window1 --------]         [--------------- window2 ---------------]
+  // ts1 -- ts2 -- ts3 -- watermark -- ts4 -- watermark2 -- ts5 --ts6 --ts7 -- watermark7
+  // (null, "hello")
+  //      (null, "world")
+  //             (null, "hello")
+  //                   ==> window1: {3} (calculate # of elements)
+  //                                 (null, "a")
+  //                                                       (null,"a")
+  //                                                             (null,"a")
+  //                                                                  (null,"b")
+  //                                                       => window2: {4} (calculate # of elements)
+  @Test
+  @SuppressWarnings("unchecked")
+  public void test() {
+
+    final FixedWindows fixedwindows = FixedWindows.of(Duration.standardSeconds(1));
+    final CreateViewTransform<String, Integer> viewTransform =
+      new CreateViewTransform(new SumViewFn());
+
+    final Instant ts1 = new Instant(1);
+    final Instant ts2 = new Instant(100);
+    final Instant ts3 = new Instant(300);
+    final Watermark watermark = new Watermark(1003);
+    final Instant ts4 = new Instant(1200);
+    final Watermark watermark2 = new Watermark(1400);
+    final Instant ts5 = new Instant(1600);
+    final Instant ts6 = new Instant(1800);
+    final Instant ts7 = new Instant(1900);
+    final Watermark watermark3 = new Watermark(2100);
+
+
+    final Transform.Context context = mock(Transform.Context.class);
+    final TestOutputCollector<Integer> oc = new TestOutputCollector();
+    viewTransform.prepare(context, oc);
+
+    viewTransform.onData(WindowedValue.of(
+      KV.of(null, "hello"), ts1, fixedwindows.assignWindow(ts1), PaneInfo.NO_FIRING));
+
+    viewTransform.onData(WindowedValue.of(
+      KV.of(null, "world"), ts2, fixedwindows.assignWindow(ts2), PaneInfo.NO_FIRING));
+
+    viewTransform.onData(WindowedValue.of(
+      KV.of(null, "hello"), ts3, fixedwindows.assignWindow(ts3), PaneInfo.NO_FIRING));
+
+    viewTransform.onWatermark(watermark);
+
+    // materialized data
+    assertEquals(Arrays.asList(fixedwindows.assignWindow(ts1)), oc.outputs.get(0).getWindows());
+    assertEquals(new Integer(3), oc.outputs.get(0).getValue());
+
+    // check output watermark
+    assertEquals(fixedwindows.assignWindow(ts1).maxTimestamp().getMillis(),
+      oc.watermarks.get(0).getTimestamp());
+
+    oc.outputs.clear();
+    oc.watermarks.clear();
+
+
+    viewTransform.onData(WindowedValue.of(
+      KV.of(null, "a"), ts4, fixedwindows.assignWindow(ts4), PaneInfo.NO_FIRING));
+
+    // do not emit anything
+    viewTransform.onWatermark(watermark2);
+    assertEquals(0, oc.outputs.size());
+    assertEquals(0, oc.watermarks.size());
+
+    viewTransform.onData(WindowedValue.of(
+      KV.of(null, "a"), ts5, fixedwindows.assignWindow(ts5), PaneInfo.NO_FIRING));
+
+    viewTransform.onData(WindowedValue.of(
+      KV.of(null, "a"), ts6, fixedwindows.assignWindow(ts6), PaneInfo.NO_FIRING));
+
+    viewTransform.onData(WindowedValue.of(
+      KV.of(null, "b"), ts7, fixedwindows.assignWindow(ts7), PaneInfo.NO_FIRING));
+
+    // emit windowed value
+    viewTransform.onWatermark(watermark3);
+
+    // materialized data
+    assertEquals(Arrays.asList(fixedwindows.assignWindow(ts4)), oc.outputs.get(0).getWindows());
+    assertEquals(new Integer(4), oc.outputs.get(0).getValue());
+
+    // check output watermark
+    assertEquals(fixedwindows.assignWindow(ts4).maxTimestamp().getMillis(),
+      oc.watermarks.get(0).getTimestamp());
+
+    oc.outputs.clear();
+
+    viewTransform.close();
+    assertEquals(0, oc.outputs.size());
+  }
+
+  final class SumViewFn extends ViewFn<Materializations.MultimapView<Void, String>, Integer> {
+
+    @Override
+    public Materialization<Materializations.MultimapView<Void, String>> getMaterialization() {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public Integer apply(final Materializations.MultimapView<Void, String> view) {
+      int sum = 0;
+      // MultimapView.get is Nullable
+      for (String s : view.get(null)) {
+        sum += 1;
+      }
+      return sum;
+    }
+  }
+}