You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ec...@apache.org on 2019/05/28 07:33:19 UTC

[beam] 04/06: Re-code GroupByKeyTranslatorBatch to conserve windowing instead of unwindowing/windowing(GlobalWindow): simplify code, use ReduceFnRunner to merge the windows

This is an automated email from the ASF dual-hosted git repository.

echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 89df2bf87c7dc926ff9c0a4cb38b83fed1a545a1
Author: Etienne Chauchot <ec...@apache.org>
AuthorDate: Mon May 27 15:10:02 2019 +0200

    Re-code GroupByKeyTranslatorBatch to conserve windowing instead of unwindowing/windowing(GlobalWindow): simplify code, use ReduceFnRunner to merge the windows
---
 .../batch/GroupByKeyTranslatorBatch.java           |  74 ++++++---
 .../GroupAlsoByWindowViaOutputBufferFn.java        | 168 +++++++++++++++++++++
 2 files changed, 217 insertions(+), 25 deletions(-)

diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyTranslatorBatch.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyTranslatorBatch.java
index 48cceee..b2b4441 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyTranslatorBatch.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyTranslatorBatch.java
@@ -17,16 +17,23 @@
  */
 package org.apache.beam.runners.spark.structuredstreaming.translation.batch;
 
+import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.List;
+import org.apache.beam.runners.core.InMemoryStateInternals;
+import org.apache.beam.runners.core.StateInternals;
+import org.apache.beam.runners.core.StateInternalsFactory;
+import org.apache.beam.runners.core.SystemReduceFn;
 import org.apache.beam.runners.spark.structuredstreaming.translation.TransformTranslator;
 import org.apache.beam.runners.spark.structuredstreaming.translation.TranslationContext;
+import org.apache.beam.runners.spark.structuredstreaming.translation.batch.functions.GroupAlsoByWindowViaOutputBufferFn;
 import org.apache.beam.runners.spark.structuredstreaming.translation.helpers.EncoderHelpers;
-import org.apache.beam.runners.spark.structuredstreaming.translation.helpers.WindowingHelpers;
+import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.WindowingStrategy;
 import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Iterables;
 import org.apache.spark.api.java.function.MapFunction;
 import org.apache.spark.api.java.function.MapGroupsFunction;
@@ -42,34 +49,51 @@ class GroupByKeyTranslatorBatch<K, V>
       PTransform<PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>> transform,
       TranslationContext context) {
 
-    Dataset<WindowedValue<KV<K, V>>> input = context.getDataset(context.getInput());
+    @SuppressWarnings("unchecked")
+    final PCollection<KV<K, V>> inputPCollection = (PCollection<KV<K, V>>) context.getInput();
 
-    // Extract key to group by key only.
-    KeyValueGroupedDataset<K, KV<K, V>> grouped =
-        input
-            .map(WindowingHelpers.unwindowMapFunction(), EncoderHelpers.kvEncoder())
-            .groupByKey((MapFunction<KV<K, V>, K>) KV::getKey, EncoderHelpers.genericEncoder());
+    Dataset<WindowedValue<KV<K, V>>> input = context.getDataset(inputPCollection);
 
-    // Materialize grouped values, potential OOM because of creation of new iterable
-    Dataset<KV<K, Iterable<V>>> materialized =
-        grouped.mapGroups(
-            (MapGroupsFunction<K, KV<K, V>, KV<K, Iterable<V>>>)
-                // TODO: We need to improve this part and avoid creating of new List (potential OOM)
-                // (key, iterator) -> KV.of(key, () -> Iterators.transform(iterator, KV::getValue)),
-                (key, iterator) -> {
-                  List<V> values = new ArrayList<>();
-                  while (iterator.hasNext()) {
-                    values.add(iterator.next().getValue());
-                  }
-                  return KV.of(key, Iterables.unmodifiableIterable(values));
-                },
-            EncoderHelpers.kvEncoder());
+    //group by key only
+    KeyValueGroupedDataset<K, WindowedValue<KV<K, V>>> groupByKeyOnly = input
+        .groupByKey((MapFunction<WindowedValue<KV<K, V>>, K>) wv -> wv.getValue().getKey(),
+            EncoderHelpers.genericEncoder());
 
-    // Window the result into global window.
-    Dataset<WindowedValue<KV<K, Iterable<V>>>> output =
-        materialized.map(
-            WindowingHelpers.windowMapFunction(), EncoderHelpers.windowedValueEncoder());
+    // Materialize groupByKeyOnly values, potential OOM because of creation of new iterable
+    Dataset<KV<K, Iterable<WindowedValue<V>>>> materialized = groupByKeyOnly.mapGroups(
+        (MapGroupsFunction<K, WindowedValue<KV<K, V>>, KV<K, Iterable<WindowedValue<V>>>>) (key, iterator) -> {
+          List<WindowedValue<V>> values = new ArrayList<>();
+          while (iterator.hasNext()) {
+            WindowedValue<KV<K, V>> next = iterator.next();
+            values.add(WindowedValue
+                .of(next.getValue().getValue(), next.getTimestamp(), next.getWindows(),
+                    next.getPane()));
+          }
+          KV<K, Iterable<WindowedValue<V>>> kv = KV.of(key, Iterables.unmodifiableIterable(values));
+          return kv;
+        }, EncoderHelpers.kvEncoder());
+
+    WindowingStrategy<?, ?> windowingStrategy = inputPCollection.getWindowingStrategy();
+    KvCoder<K, V> coder = (KvCoder<K, V>) inputPCollection.getCoder();
+    // group also by windows
+    Dataset<WindowedValue<KV<K, Iterable<V>>>> output = materialized.flatMap(
+        new GroupAlsoByWindowViaOutputBufferFn<>(windowingStrategy,
+            new InMemoryStateInternalsFactory<>(), SystemReduceFn.buffering(coder.getValueCoder()),
+            context.getSerializableOptions()), EncoderHelpers.windowedValueEncoder());
 
     context.putDataset(context.getOutput(), output);
   }
+
+  /**
+   * In-memory state internals factory.
+   *
+   * @param <K> State key type.
+   */
+  static class InMemoryStateInternalsFactory<K> implements StateInternalsFactory<K>, Serializable {
+    @Override
+    public StateInternals stateInternalsForKey(K key) {
+      return InMemoryStateInternals.forKey(key);
+    }
+  }
+
 }
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/functions/GroupAlsoByWindowViaOutputBufferFn.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/functions/GroupAlsoByWindowViaOutputBufferFn.java
new file mode 100644
index 0000000..2fb08f5
--- /dev/null
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/functions/GroupAlsoByWindowViaOutputBufferFn.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.spark.structuredstreaming.translation.batch.functions;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import org.apache.beam.runners.core.InMemoryTimerInternals;
+import org.apache.beam.runners.core.OutputWindowedValue;
+import org.apache.beam.runners.core.ReduceFnRunner;
+import org.apache.beam.runners.core.StateInternals;
+import org.apache.beam.runners.core.StateInternalsFactory;
+import org.apache.beam.runners.core.SystemReduceFn;
+import org.apache.beam.runners.core.UnsupportedSideInputReader;
+import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
+import org.apache.beam.runners.core.construction.TriggerTranslation;
+import org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine;
+import org.apache.beam.runners.core.triggers.TriggerStateMachines;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.WindowingStrategy;
+import org.apache.spark.api.java.function.FlatMapFunction;
+import org.joda.time.Instant;
+
+/** A FlatMap function that groups by windows in batch mode using {@link ReduceFnRunner}. */
+public class GroupAlsoByWindowViaOutputBufferFn<K, InputT, W extends BoundedWindow>
+    implements FlatMapFunction<
+        KV<K, Iterable<WindowedValue<InputT>>>,
+        WindowedValue<KV<K, Iterable<InputT>>>> {
+
+  private final WindowingStrategy<?, W> windowingStrategy;
+  private final StateInternalsFactory<K> stateInternalsFactory;
+  private final SystemReduceFn<K, InputT, Iterable<InputT>, Iterable<InputT>, W> reduceFn;
+  private final SerializablePipelineOptions options;
+
+  public GroupAlsoByWindowViaOutputBufferFn(
+      WindowingStrategy<?, W> windowingStrategy,
+      StateInternalsFactory<K> stateInternalsFactory,
+      SystemReduceFn<K, InputT, Iterable<InputT>, Iterable<InputT>, W> reduceFn,
+      SerializablePipelineOptions options) {
+    this.windowingStrategy = windowingStrategy;
+    this.stateInternalsFactory = stateInternalsFactory;
+    this.reduceFn = reduceFn;
+    this.options = options;
+  }
+
+  @Override
+  public Iterator<WindowedValue<KV<K, Iterable<InputT>>>> call(
+      KV<K, Iterable<WindowedValue<InputT>>> kv) throws Exception {
+    K key = kv.getKey();
+    Iterable<WindowedValue<InputT>> values = kv.getValue();
+
+    // ------ based on GroupAlsoByWindowsViaOutputBufferDoFn ------//
+
+    // Used with Batch, we know that all the data is available for this key. We can't use the
+    // timer manager from the context because it doesn't exist. So we create one and emulate the
+    // watermark, knowing that we have all data and it is in timestamp order.
+    InMemoryTimerInternals timerInternals = new InMemoryTimerInternals();
+    timerInternals.advanceProcessingTime(Instant.now());
+    timerInternals.advanceSynchronizedProcessingTime(Instant.now());
+    StateInternals stateInternals = stateInternalsFactory.stateInternalsForKey(key);
+    GABWOutputWindowedValue<K, InputT> outputter = new GABWOutputWindowedValue<>();
+
+    ReduceFnRunner<K, InputT, Iterable<InputT>, W> reduceFnRunner =
+        new ReduceFnRunner<>(
+            key,
+            windowingStrategy,
+            ExecutableTriggerStateMachine.create(
+                TriggerStateMachines.stateMachineForTrigger(
+                    TriggerTranslation.toProto(windowingStrategy.getTrigger()))),
+            stateInternals,
+            timerInternals,
+            outputter,
+            new UnsupportedSideInputReader("GroupAlsoByWindow"),
+            reduceFn,
+            options.get());
+
+    // Process the grouped values.
+    reduceFnRunner.processElements(values);
+
+    // Finish any pending windows by advancing the input watermark to infinity.
+    timerInternals.advanceInputWatermark(BoundedWindow.TIMESTAMP_MAX_VALUE);
+
+    // not supported yet
+/*
+    // Finally, advance the processing time to infinity to fire any timers.
+    timerInternals.advanceProcessingTime(BoundedWindow.TIMESTAMP_MAX_VALUE);
+    timerInternals.advanceSynchronizedProcessingTime(BoundedWindow.TIMESTAMP_MAX_VALUE);
+
+    fireEligibleTimers(timerInternals, reduceFnRunner);
+*/
+
+    reduceFnRunner.persist();
+
+    return outputter.getOutputs().iterator();
+  }
+
+/*  private void fireEligibleTimers(
+      InMemoryTimerInternals timerInternals,
+      ReduceFnRunner<K, InputT, Iterable<InputT>, W> reduceFnRunner)
+      throws Exception {
+    List<TimerInternals.TimerData> timers = new ArrayList<>();
+    while (true) {
+      TimerInternals.TimerData timer;
+      while ((timer = timerInternals.removeNextEventTimer()) != null) {
+        timers.add(timer);
+      }
+      while ((timer = timerInternals.removeNextProcessingTimer()) != null) {
+        timers.add(timer);
+      }
+      while ((timer = timerInternals.removeNextSynchronizedProcessingTimer()) != null) {
+        timers.add(timer);
+      }
+      if (timers.isEmpty()) {
+        break;
+      }
+      reduceFnRunner.onTimers(timers);
+      timers.clear();
+    }
+  }*/
+
+  private static class GABWOutputWindowedValue<K, V>
+      implements OutputWindowedValue<KV<K, Iterable<V>>> {
+    private final List<WindowedValue<KV<K, Iterable<V>>>> outputs = new ArrayList<>();
+
+    @Override
+    public void outputWindowedValue(
+        KV<K, Iterable<V>> output,
+        Instant timestamp,
+        Collection<? extends BoundedWindow> windows,
+        PaneInfo pane) {
+      outputs.add(WindowedValue.of(output, timestamp, windows, pane));
+    }
+
+    @Override
+    public <AdditionalOutputT> void outputWindowedValue(
+        TupleTag<AdditionalOutputT> tag,
+        AdditionalOutputT output,
+        Instant timestamp,
+        Collection<? extends BoundedWindow> windows,
+        PaneInfo pane) {
+      throw new UnsupportedOperationException("GroupAlsoByWindow should not use tagged outputs.");
+    }
+
+    Iterable<WindowedValue<KV<K, Iterable<V>>>> getOutputs() {
+      return outputs;
+    }
+  }
+}