You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/04/25 17:30:07 UTC

[12/50] [abbrv] beam git commit: [BEAM-1915] Removes use of OldDoFn from Apex

[BEAM-1915] Removes use of OldDoFn from Apex

This is the last occurrence of OldDoFn in the Beam repository
outside of OldDoFn itself.

It's also used in the Dataflow worker, but it can be
deleted entirely once we (Dataflow team) take care of that.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/3e243881
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/3e243881
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/3e243881

Branch: refs/heads/jstorm-runner
Commit: 3e243881fe767cf30869abf5c745c26f96d66fc4
Parents: fdbadfc
Author: Eugene Kirpichov <ki...@google.com>
Authored: Mon Apr 10 22:51:16 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Fri Apr 14 23:34:11 2017 -0700

----------------------------------------------------------------------
 .../operators/ApexGroupByKeyOperator.java       | 225 ++++++-------------
 1 file changed, 63 insertions(+), 162 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/3e243881/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java
index 1697921..7d17ac6 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java
@@ -42,32 +42,29 @@ import org.apache.beam.runners.apex.ApexPipelineOptions;
 import org.apache.beam.runners.apex.translation.utils.ApexStateInternals.ApexStateBackend;
 import org.apache.beam.runners.apex.translation.utils.ApexStreamTuple;
 import org.apache.beam.runners.apex.translation.utils.SerializablePipelineOptions;
-import org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetDoFn;
-import org.apache.beam.runners.core.KeyedWorkItem;
-import org.apache.beam.runners.core.KeyedWorkItems;
-import org.apache.beam.runners.core.OldDoFn;
+import org.apache.beam.runners.core.OutputWindowedValue;
+import org.apache.beam.runners.core.ReduceFnRunner;
 import org.apache.beam.runners.core.StateInternals;
 import org.apache.beam.runners.core.StateInternalsFactory;
 import org.apache.beam.runners.core.StateNamespace;
 import org.apache.beam.runners.core.SystemReduceFn;
 import org.apache.beam.runners.core.TimerInternals;
-import org.apache.beam.runners.core.WindowingInternals;
+import org.apache.beam.runners.core.construction.Triggers;
+import org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine;
+import org.apache.beam.runners.core.triggers.TriggerStateMachines;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
 import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.util.CoderUtils;
+import org.apache.beam.sdk.util.NullSideInputReader;
 import org.apache.beam.sdk.util.TimeDomain;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TupleTag;
 import org.joda.time.Instant;
 import org.slf4j.Logger;
@@ -98,8 +95,6 @@ public class ApexGroupByKeyOperator<K, V> implements Operator {
   private final StateInternalsFactory<K> stateInternalsFactory;
   private Map<Slice, Set<TimerInternals.TimerData>> activeTimers = new HashMap<>();
 
-  private transient ProcessContext context;
-  private transient OldDoFn<KeyedWorkItem<K, V>, KV<K, Iterable<V>>> fn;
   private transient ApexTimerInternals timerInternals = new ApexTimerInternals();
   private Instant inputWatermark = BoundedWindow.TIMESTAMP_MIN_VALUE;
 
@@ -161,16 +156,53 @@ public class ApexGroupByKeyOperator<K, V> implements Operator {
   @Override
   public void setup(OperatorContext context) {
     this.traceTuples = ApexStreamTuple.Logging.isDebugEnabled(serializedOptions.get(), this);
-    StateInternalsFactory<K> stateInternalsFactory = new GroupByKeyStateInternalsFactory();
-    this.fn = GroupAlsoByWindowViaWindowSetDoFn.create(this.windowingStrategy,
-        stateInternalsFactory, SystemReduceFn.<K, V, BoundedWindow>buffering(this.valueCoder));
-    this.context = new ProcessContext(fn, this.timerInternals);
   }
 
   @Override
   public void teardown() {
   }
 
+
+  private ReduceFnRunner<K, V, Iterable<V>, BoundedWindow> newReduceFnRunner(K key) {
+    return new ReduceFnRunner<>(
+        key,
+        windowingStrategy,
+        ExecutableTriggerStateMachine.create(
+            TriggerStateMachines.stateMachineForTrigger(
+                Triggers.toProto(windowingStrategy.getTrigger()))),
+        stateInternalsFactory.stateInternalsForKey(key),
+        timerInternals,
+        new OutputWindowedValue<KV<K, Iterable<V>>>() {
+          @Override
+          public void outputWindowedValue(
+              KV<K, Iterable<V>> output,
+              Instant timestamp,
+              Collection<? extends BoundedWindow> windows,
+              PaneInfo pane) {
+            if (traceTuples) {
+              LOG.debug("\nemitting {} timestamp {}\n", output, timestamp);
+            }
+            ApexGroupByKeyOperator.this.output.emit(
+                ApexStreamTuple.DataTuple.of(WindowedValue.of(output, timestamp, windows, pane)));
+          }
+
+          @Override
+          public <AdditionalOutputT> void outputWindowedValue(
+              TupleTag<AdditionalOutputT> tag,
+              AdditionalOutputT output,
+              Instant timestamp,
+              Collection<? extends BoundedWindow> windows,
+              PaneInfo pane) {
+            throw new UnsupportedOperationException(
+                "GroupAlsoByWindow should not use side outputs");
+          }
+        },
+        NullSideInputReader.empty(),
+        null,
+        SystemReduceFn.<K, V, BoundedWindow>buffering(this.valueCoder),
+        serializedOptions.get());
+  }
+
   /**
    * Returns the list of timers that are ready to fire. These are the timers
    * that are registered to be triggered at a time before the current watermark.
@@ -212,13 +244,11 @@ public class ApexGroupByKeyOperator<K, V> implements Operator {
         windowedValue.getTimestamp(),
         windowedValue.getWindows(),
         windowedValue.getPane());
-
-    KeyedWorkItem<K, V> kwi = KeyedWorkItems.elementsWorkItem(
-            kv.getKey(),
-            Collections.singletonList(updatedWindowedValue));
-
-    context.setElement(kwi, getStateInternalsForKey(kwi.key()));
-    fn.processElement(context);
+    timerInternals.setKey(kv.getKey());
+    ReduceFnRunner<K, V, Iterable<V>, BoundedWindow> reduceFnRunner =
+        newReduceFnRunner(kv.getKey());
+    reduceFnRunner.processElements(Collections.singletonList(updatedWindowedValue));
+    reduceFnRunner.persist();
   }
 
   private StateInternals<K> getStateInternalsForKey(K key) {
@@ -265,158 +295,29 @@ public class ApexGroupByKeyOperator<K, V> implements Operator {
     if (!timers.isEmpty()) {
       for (Slice keyBytes : timers.keySet()) {
         K key = CoderUtils.decodeFromByteArray(keyCoder, keyBytes.buffer);
-        KeyedWorkItem<K, V> kwi = KeyedWorkItems.<K, V>timersWorkItem(key, timers.get(keyBytes));
-        context.setElement(kwi, getStateInternalsForKey(kwi.key()));
-        fn.processElement(context);
+        timerInternals.setKey(key);
+        ReduceFnRunner<K, V, Iterable<V>, BoundedWindow> reduceFnRunner = newReduceFnRunner(key);
+        reduceFnRunner.onTimers(timers.get(keyBytes));
+        reduceFnRunner.persist();
       }
     }
   }
 
-  private class ProcessContext extends GroupAlsoByWindowViaWindowSetDoFn<K, V, Iterable<V>, ?,
-      KeyedWorkItem<K, V>>.ProcessContext {
-
-    private final ApexTimerInternals timerInternals;
-    private StateInternals<K> stateInternals;
-    private KeyedWorkItem<K, V> element;
-
-    public ProcessContext(OldDoFn<KeyedWorkItem<K, V>, KV<K, Iterable<V>>> function,
-                          ApexTimerInternals timerInternals) {
-      function.super();
-      this.timerInternals = checkNotNull(timerInternals);
-    }
-
-    public void setElement(KeyedWorkItem<K, V> element, StateInternals<K> stateForKey) {
-      this.element = element;
-      this.stateInternals = stateForKey;
-    }
-
-    @Override
-    public KeyedWorkItem<K, V> element() {
-      return this.element;
-    }
-
-    @Override
-    public Instant timestamp() {
-      throw new UnsupportedOperationException(
-          "timestamp() is not available when processing KeyedWorkItems.");
-    }
-
-    @Override
-    public PipelineOptions getPipelineOptions() {
-      return serializedOptions.get();
-    }
-
-    @Override
-    public void output(KV<K, Iterable<V>> output) {
-      throw new UnsupportedOperationException(
-          "output() is not available when processing KeyedWorkItems.");
-    }
-
-    @Override
-    public void outputWithTimestamp(KV<K, Iterable<V>> output, Instant timestamp) {
-      throw new UnsupportedOperationException(
-          "outputWithTimestamp() is not available when processing KeyedWorkItems.");
-    }
-
-    @Override
-    public PaneInfo pane() {
-      throw new UnsupportedOperationException(
-          "pane() is not available when processing KeyedWorkItems.");
-    }
-
-    @Override
-    public BoundedWindow window() {
-      throw new UnsupportedOperationException(
-          "window() is not available when processing KeyedWorkItems.");
-    }
-
-    @Override
-    public WindowingInternals<KeyedWorkItem<K, V>, KV<K, Iterable<V>>> windowingInternals() {
-      return new WindowingInternals<KeyedWorkItem<K, V>, KV<K, Iterable<V>>>() {
-
-        @Override
-        public StateInternals<K> stateInternals() {
-          return stateInternals;
-        }
-
-        @Override
-        public void outputWindowedValue(
-            KV<K, Iterable<V>> output,
-            Instant timestamp,
-            Collection<? extends BoundedWindow> windows,
-            PaneInfo pane) {
-          if (traceTuples) {
-            LOG.debug("\nemitting {} timestamp {}\n", output, timestamp);
-          }
-          ApexGroupByKeyOperator.this.output.emit(
-              ApexStreamTuple.DataTuple.of(WindowedValue.of(output, timestamp, windows, pane)));
-        }
-
-        @Override
-        public <AdditionalOutputT> void outputWindowedValue(
-            TupleTag<AdditionalOutputT> tag,
-            AdditionalOutputT output,
-            Instant timestamp,
-            Collection<? extends BoundedWindow> windows,
-            PaneInfo pane) {
-          throw new UnsupportedOperationException(
-              "GroupAlsoByWindow should not use tagged outputs");
-        }
-
-        @Override
-        public TimerInternals timerInternals() {
-          return timerInternals;
-        }
-
-        @Override
-        public Collection<? extends BoundedWindow> windows() {
-          throw new UnsupportedOperationException("windows() is not available in Streaming mode.");
-        }
-
-        @Override
-        public PaneInfo pane() {
-          throw new UnsupportedOperationException("pane() is not available in Streaming mode.");
-        }
-
-        @Override
-        public <T> T sideInput(PCollectionView<T> view, BoundedWindow mainInputWindow) {
-          throw new RuntimeException("sideInput() is not available in Streaming mode.");
-        }
-      };
-    }
-
-    @Override
-    public <T> T sideInput(PCollectionView<T> view) {
-      throw new RuntimeException("sideInput() is not supported in Streaming mode.");
-    }
-
-    @Override
-    public <T> void output(TupleTag<T> tag, T output) {
-      throw new RuntimeException("output() is not available when grouping by window.");
-    }
-
-    @Override
-    public <T> void outputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
-      output(tag, output);
-    }
-
-    @Override
-    public <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregatorInternal(
-        String name, Combine.CombineFn<AggInputT, ?, AggOutputT> combiner) {
-      throw new UnsupportedOperationException();
-    }
-  }
-
   /**
    * An implementation of Beam's {@link TimerInternals}.
    *
    */
   private class ApexTimerInternals implements TimerInternals {
+    private K key;
+
+    public void setKey(K key) {
+      this.key = key;
+    }
 
     @Deprecated
     @Override
     public void setTimer(TimerData timerData) {
-      registerActiveTimer(context.element().key(), timerData);
+      registerActiveTimer(key, timerData);
     }
 
     @Override
@@ -427,7 +328,7 @@ public class ApexGroupByKeyOperator<K, V> implements Operator {
     @Deprecated
     @Override
     public void deleteTimer(TimerData timerKey) {
-      unregisterActiveTimer(context.element().key(), timerKey);
+      unregisterActiveTimer(key, timerKey);
     }
 
     @Override