You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/08/25 18:55:31 UTC

[17/50] incubator-beam git commit: Port easy parts of DataflowRunner to new DoFn

Port easy parts of DataflowRunner to new DoFn


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/fcdd15b8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/fcdd15b8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/fcdd15b8

Branch: refs/heads/gearpump-runner
Commit: fcdd15b81b93f87de0aa02bfb3b09740bc259c4c
Parents: a1d601a
Author: Kenneth Knowles <kl...@google.com>
Authored: Mon Aug 8 20:35:59 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Tue Aug 9 12:41:52 2016 -0700

----------------------------------------------------------------------
 .../beam/runners/dataflow/DataflowRunner.java   | 92 ++++++++++----------
 1 file changed, 45 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fcdd15b8/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index fadd9c7..4d34ec4 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -761,31 +761,30 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
       PTransform<PCollection<T>, PCollection<KV<Integer, Iterable<KV<W, WindowedValue<T>>>>>> {
 
     /**
-     * A {@link OldDoFn} that for each element outputs a {@code KV} structure suitable for
+     * A {@link DoFn} that for each element outputs a {@code KV} structure suitable for
      * grouping by the hash of the window's byte representation and sorting the grouped values
      * using the window's byte representation.
      */
     @SystemDoFnInternal
     private static class UseWindowHashAsKeyAndWindowAsSortKeyDoFn<T, W extends BoundedWindow>
-        extends OldDoFn<T, KV<Integer, KV<W, WindowedValue<T>>>> implements
-        OldDoFn.RequiresWindowAccess {
+        extends DoFn<T, KV<Integer, KV<W, WindowedValue<T>>>> {
 
       private final IsmRecordCoder<?> ismCoderForHash;
       private UseWindowHashAsKeyAndWindowAsSortKeyDoFn(IsmRecordCoder<?> ismCoderForHash) {
         this.ismCoderForHash = ismCoderForHash;
       }
 
-      @Override
-      public void processElement(ProcessContext c) throws Exception {
+      @ProcessElement
+      public void processElement(ProcessContext c, BoundedWindow untypedWindow) throws Exception {
         @SuppressWarnings("unchecked")
-        W window = (W) c.window();
+        W window = (W) untypedWindow;
         c.output(
             KV.of(ismCoderForHash.hash(ImmutableList.of(window)),
                 KV.of(window,
                     WindowedValue.of(
                         c.element(),
                         c.timestamp(),
-                        c.window(),
+                        window,
                         c.pane()))));
       }
     }
@@ -828,14 +827,14 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
       extends PTransform<PCollection<T>, PCollectionView<T>> {
 
     /**
-     * A {@link OldDoFn} that outputs {@link IsmRecord}s. These records are structured as follows:
+     * A {@link DoFn} that outputs {@link IsmRecord}s. These records are structured as follows:
      * <ul>
      *   <li>Key 1: Window
      *   <li>Value: Windowed value
      * </ul>
      */
     static class IsmRecordForSingularValuePerWindowDoFn<T, W extends BoundedWindow>
-        extends OldDoFn<KV<Integer, Iterable<KV<W, WindowedValue<T>>>>,
+        extends DoFn<KV<Integer, Iterable<KV<W, WindowedValue<T>>>>,
                              IsmRecord<WindowedValue<T>>> {
 
       private final Coder<W> windowCoder;
@@ -843,7 +842,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
         this.windowCoder = windowCoder;
       }
 
-      @Override
+      @ProcessElement
       public void processElement(ProcessContext c) throws Exception {
         Optional<Object> previousWindowStructuralValue = Optional.absent();
         T previousValue = null;
@@ -902,7 +901,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
         applyForSingleton(
             DataflowRunner runner,
             PCollection<T> input,
-            OldDoFn<KV<Integer, Iterable<KV<W, WindowedValue<T>>>>,
+            DoFn<KV<Integer, Iterable<KV<W, WindowedValue<T>>>>,
                              IsmRecord<WindowedValue<FinalT>>> doFn,
             boolean hasDefault,
             FinalT defaultValue,
@@ -998,7 +997,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
   static class BatchViewAsList<T>
       extends PTransform<PCollection<T>, PCollectionView<List<T>>> {
     /**
-     * A {@link OldDoFn} which creates {@link IsmRecord}s assuming that each element is within the
+     * A {@link DoFn} which creates {@link IsmRecord}s assuming that each element is within the
      * global window. Each {@link IsmRecord} has
      * <ul>
      *   <li>Key 1: Global window</li>
@@ -1008,15 +1007,15 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
      */
     @SystemDoFnInternal
     static class ToIsmRecordForGlobalWindowDoFn<T>
-        extends OldDoFn<T, IsmRecord<WindowedValue<T>>> {
+        extends DoFn<T, IsmRecord<WindowedValue<T>>> {
 
       long indexInBundle;
-      @Override
+      @StartBundle
       public void startBundle(Context c) throws Exception {
         indexInBundle = 0;
       }
 
-      @Override
+      @ProcessElement
       public void processElement(ProcessContext c) throws Exception {
         c.output(IsmRecord.of(
             ImmutableList.of(GlobalWindow.INSTANCE, indexInBundle),
@@ -1030,7 +1029,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
     }
 
     /**
-     * A {@link OldDoFn} which creates {@link IsmRecord}s comparing successive elements windows
+     * A {@link DoFn} which creates {@link IsmRecord}s comparing successive elements windows
      * to locate the window boundaries. The {@link IsmRecord} has:
      * <ul>
      *   <li>Key 1: Window</li>
@@ -1040,7 +1039,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
      */
     @SystemDoFnInternal
     static class ToIsmRecordForNonGlobalWindowDoFn<T, W extends BoundedWindow>
-        extends OldDoFn<KV<Integer, Iterable<KV<W, WindowedValue<T>>>>,
+        extends DoFn<KV<Integer, Iterable<KV<W, WindowedValue<T>>>>,
                              IsmRecord<WindowedValue<T>>> {
 
       private final Coder<W> windowCoder;
@@ -1048,7 +1047,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
         this.windowCoder = windowCoder;
       }
 
-      @Override
+      @ProcessElement
       public void processElement(ProcessContext c) throws Exception {
         long elementsInWindow = 0;
         Optional<Object> previousWindowStructuralValue = Optional.absent();
@@ -1174,7 +1173,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
       extends PTransform<PCollection<KV<K, V>>, PCollectionView<Map<K, V>>> {
 
     /**
-     * A {@link OldDoFn} which groups elements by window boundaries. For each group,
+     * A {@link DoFn} which groups elements by window boundaries. For each group,
      * the group of elements is transformed into a {@link TransformedMap}.
      * The transformed {@code Map<K, V>} is backed by a {@code Map<K, WindowedValue<V>>}
      * and contains a function {@code WindowedValue<V> -> V}.
@@ -1188,7 +1187,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
      * </ul>
      */
     static class ToMapDoFn<K, V, W extends BoundedWindow>
-        extends OldDoFn<KV<Integer, Iterable<KV<W, WindowedValue<KV<K, V>>>>>,
+        extends DoFn<KV<Integer, Iterable<KV<W, WindowedValue<KV<K, V>>>>>,
                              IsmRecord<WindowedValue<TransformedMap<K,
                                                      WindowedValue<V>,
                                                      V>>>> {
@@ -1198,7 +1197,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
         this.windowCoder = windowCoder;
       }
 
-      @Override
+      @ProcessElement
       public void processElement(ProcessContext c)
           throws Exception {
         Optional<Object> previousWindowStructuralValue = Optional.absent();
@@ -1358,18 +1357,17 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
 
       @SystemDoFnInternal
       private static class GroupByKeyHashAndSortByKeyAndWindowDoFn<K, V, W>
-          extends OldDoFn<KV<K, V>, KV<Integer, KV<KV<K, W>, WindowedValue<V>>>>
-          implements OldDoFn.RequiresWindowAccess {
+          extends DoFn<KV<K, V>, KV<Integer, KV<KV<K, W>, WindowedValue<V>>>> {
 
         private final IsmRecordCoder<?> coder;
         private GroupByKeyHashAndSortByKeyAndWindowDoFn(IsmRecordCoder<?> coder) {
           this.coder = coder;
         }
 
-        @Override
-        public void processElement(ProcessContext c) throws Exception {
+        @ProcessElement
+        public void processElement(ProcessContext c, BoundedWindow untypedWindow) throws Exception {
           @SuppressWarnings("unchecked")
-          W window = (W) c.window();
+          W window = (W) untypedWindow;
 
           c.output(
               KV.of(coder.hash(ImmutableList.of(c.element().getKey())),
@@ -1377,7 +1375,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
                       WindowedValue.of(
                           c.element().getValue(),
                           c.timestamp(),
-                          (BoundedWindow) window,
+                          untypedWindow,
                           c.pane()))));
         }
       }
@@ -1412,7 +1410,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
     }
 
     /**
-     * A {@link OldDoFn} which creates {@link IsmRecord}s comparing successive elements windows
+     * A {@link DoFn} which creates {@link IsmRecord}s comparing successive elements windows
      * and keys to locate window and key boundaries. The main output {@link IsmRecord}s have:
      * <ul>
      *   <li>Key 1: Window</li>
@@ -1424,11 +1422,11 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
      * <p>Additionally, we output all the unique keys per window seen to {@code outputForEntrySet}
      * and the unique key count per window to {@code outputForSize}.
      *
-     * <p>Finally, if this OldDoFn has been requested to perform unique key checking, it will
+     * <p>Finally, if this {@link DoFn} has been requested to perform unique key checking, it will
      * throw an {@link IllegalStateException} if more than one key per window is found.
      */
     static class ToIsmRecordForMapLikeDoFn<K, V, W extends BoundedWindow>
-        extends OldDoFn<KV<Integer, Iterable<KV<KV<K, W>, WindowedValue<V>>>>,
+        extends DoFn<KV<Integer, Iterable<KV<KV<K, W>, WindowedValue<V>>>>,
                              IsmRecord<WindowedValue<V>>> {
 
       private final TupleTag<KV<Integer, KV<W, Long>>> outputForSize;
@@ -1452,7 +1450,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
         this.uniqueKeysExpected = uniqueKeysExpected;
       }
 
-      @Override
+      @ProcessElement
       public void processElement(ProcessContext c) throws Exception {
         long currentKeyIndex = 0;
         // We use one based indexing while counting
@@ -1557,7 +1555,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
     }
 
     /**
-     * A {@link OldDoFn} which outputs a metadata {@link IsmRecord} per window of:
+     * A {@link DoFn} which outputs a metadata {@link IsmRecord} per window of:
        * <ul>
        *   <li>Key 1: META key</li>
        *   <li>Key 2: window</li>
@@ -1565,17 +1563,17 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
        *   <li>Value: sum of values for window</li>
        * </ul>
        *
-       * <p>This {@link OldDoFn} is meant to be used to compute the number of unique keys
+       * <p>This {@link DoFn} is meant to be used to compute the number of unique keys
        * per window for map and multimap side inputs.
        */
     static class ToIsmMetadataRecordForSizeDoFn<K, V, W extends BoundedWindow>
-        extends OldDoFn<KV<Integer, Iterable<KV<W, Long>>>, IsmRecord<WindowedValue<V>>> {
+        extends DoFn<KV<Integer, Iterable<KV<W, Long>>>, IsmRecord<WindowedValue<V>>> {
       private final Coder<W> windowCoder;
       ToIsmMetadataRecordForSizeDoFn(Coder<W> windowCoder) {
         this.windowCoder = windowCoder;
       }
 
-      @Override
+      @ProcessElement
       public void processElement(ProcessContext c) throws Exception {
         Iterator<KV<W, Long>> iterator = c.element().getValue().iterator();
         KV<W, Long> currentValue = iterator.next();
@@ -1606,7 +1604,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
     }
 
     /**
-     * A {@link OldDoFn} which outputs a metadata {@link IsmRecord} per window and key pair of:
+     * A {@link DoFn} which outputs a metadata {@link IsmRecord} per window and key pair of:
        * <ul>
        *   <li>Key 1: META key</li>
        *   <li>Key 2: window</li>
@@ -1614,11 +1612,11 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
        *   <li>Value: key</li>
        * </ul>
        *
-       * <p>This {@link OldDoFn} is meant to be used to output index to key records
+       * <p>This {@link DoFn} is meant to be used to output index to key records
        * per window for map and multimap side inputs.
        */
     static class ToIsmMetadataRecordForKeyDoFn<K, V, W extends BoundedWindow>
-        extends OldDoFn<KV<Integer, Iterable<KV<W, K>>>, IsmRecord<WindowedValue<V>>> {
+        extends DoFn<KV<Integer, Iterable<KV<W, K>>>, IsmRecord<WindowedValue<V>>> {
 
       private final Coder<K> keyCoder;
       private final Coder<W> windowCoder;
@@ -1627,7 +1625,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
         this.windowCoder = windowCoder;
       }
 
-      @Override
+      @ProcessElement
       public void processElement(ProcessContext c) throws Exception {
         Iterator<KV<W, K>> iterator = c.element().getValue().iterator();
         KV<W, K> currentValue = iterator.next();
@@ -1658,7 +1656,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
     }
 
     /**
-     * A {@link OldDoFn} which partitions sets of elements by window boundaries. Within each
+     * A {@link DoFn} which partitions sets of elements by window boundaries. Within each
      * partition, the set of elements is transformed into a {@link TransformedMap}.
      * The transformed {@code Map<K, Iterable<V>>} is backed by a
      * {@code Map<K, Iterable<WindowedValue<V>>>} and contains a function
@@ -1673,7 +1671,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
      * </ul>
      */
     static class ToMultimapDoFn<K, V, W extends BoundedWindow>
-        extends OldDoFn<KV<Integer, Iterable<KV<W, WindowedValue<KV<K, V>>>>>,
+        extends DoFn<KV<Integer, Iterable<KV<W, WindowedValue<KV<K, V>>>>>,
                              IsmRecord<WindowedValue<TransformedMap<K,
                                                                     Iterable<WindowedValue<V>>,
                                                                     Iterable<V>>>>> {
@@ -1683,7 +1681,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
         this.windowCoder = windowCoder;
       }
 
-      @Override
+      @ProcessElement
       public void processElement(ProcessContext c)
           throws Exception {
         Optional<Object> previousWindowStructuralValue = Optional.absent();
@@ -2335,8 +2333,8 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
           // WindmillSink.
           .apply(Reshuffle.<Integer, ValueWithRecordId<T>>of())
           .apply("StripIds", ParDo.of(
-              new OldDoFn<KV<Integer, ValueWithRecordId<T>>, T>() {
-                @Override
+              new DoFn<KV<Integer, ValueWithRecordId<T>>, T>() {
+                @ProcessElement
                 public void processElement(ProcessContext c) {
                   c.output(c.element().getValue().getValue());
                 }
@@ -2372,7 +2370,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
   }
 
   /**
-   * A specialized {@link OldDoFn} for writing the contents of a {@link PCollection}
+   * A specialized {@link DoFn} for writing the contents of a {@link PCollection}
    * to a streaming {@link PCollectionView} backend implementation.
    */
   private static class StreamingPCollectionViewWriterFn<T>
@@ -2553,8 +2551,8 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
     }
   }
 
-  private static class WrapAsList<T> extends OldDoFn<T, List<T>> {
-    @Override
+  private static class WrapAsList<T> extends DoFn<T, List<T>> {
+    @ProcessElement
     public void processElement(ProcessContext c) {
       c.output(Arrays.asList(c.element()));
     }