You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2016/12/08 17:11:14 UTC

[1/2] incubator-beam git commit: Remove misc occurrences of OldDoFn

Repository: incubator-beam
Updated Branches:
  refs/heads/master 92ff63d3b -> 04a41ee54


Remove misc occurrences of OldDoFn


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/44e17d1c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/44e17d1c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/44e17d1c

Branch: refs/heads/master
Commit: 44e17d1c97babd487584cc78690505bdf57704b2
Parents: 92ff63d
Author: Kenneth Knowles <kl...@google.com>
Authored: Wed Dec 7 14:17:01 2016 -0800
Committer: Luke Cwik <lc...@google.com>
Committed: Thu Dec 8 09:03:59 2016 -0800

----------------------------------------------------------------------
 .../beam/sdk/AggregatorPipelineExtractor.java   |  5 ++--
 .../sdk/transforms/AggregatorRetriever.java     |  2 +-
 .../org/apache/beam/sdk/transforms/Combine.java |  4 +--
 .../apache/beam/sdk/util/ExecutionContext.java  |  8 +++---
 .../sdk/AggregatorPipelineExtractorTest.java    | 20 +++++++-------
 .../beam/sdk/transforms/DoFnTesterTest.java     |  2 +-
 .../beam/sdk/transforms/ParDoLifecycleTest.java | 28 ++++++++++----------
 7 files changed, 35 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/44e17d1c/sdks/java/core/src/main/java/org/apache/beam/sdk/AggregatorPipelineExtractor.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/AggregatorPipelineExtractor.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/AggregatorPipelineExtractor.java
index d2130d0..ade5978 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/AggregatorPipelineExtractor.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/AggregatorPipelineExtractor.java
@@ -70,9 +70,10 @@ class AggregatorPipelineExtractor {
     private Collection<Aggregator<?, ?>> getAggregators(PTransform<?, ?> transform) {
       if (transform != null) {
         if (transform instanceof ParDo.Bound) {
-          return AggregatorRetriever.getAggregators(((ParDo.Bound<?, ?>) transform).getFn());
+          return AggregatorRetriever.getAggregators(((ParDo.Bound<?, ?>) transform).getNewFn());
         } else if (transform instanceof ParDo.BoundMulti) {
-          return AggregatorRetriever.getAggregators(((ParDo.BoundMulti<?, ?>) transform).getFn());
+          return AggregatorRetriever.getAggregators(
+              ((ParDo.BoundMulti<?, ?>) transform).getNewFn());
         }
       }
       return Collections.emptyList();

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/44e17d1c/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/AggregatorRetriever.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/AggregatorRetriever.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/AggregatorRetriever.java
index abed843..ce47e22 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/AggregatorRetriever.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/AggregatorRetriever.java
@@ -30,7 +30,7 @@ public final class AggregatorRetriever {
   /**
    * Returns the {@link Aggregator Aggregators} created by the provided {@link OldDoFn}.
    */
-  public static Collection<Aggregator<?, ?>> getAggregators(OldDoFn<?, ?> fn) {
+  public static Collection<Aggregator<?, ?>> getAggregators(DoFn<?, ?> fn) {
     return fn.getAggregators();
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/44e17d1c/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
index be063e2..4127d94 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
@@ -2392,8 +2392,8 @@ public class Combine {
         PCollection<? extends KV<K, ? extends Iterable<InputT>>> input) {
 
       PCollection<KV<K, OutputT>> output = input.apply(ParDo.of(
-          new OldDoFn<KV<K, ? extends Iterable<InputT>>, KV<K, OutputT>>() {
-            @Override
+          new DoFn<KV<K, ? extends Iterable<InputT>>, KV<K, OutputT>>() {
+            @ProcessElement
             public void processElement(final ProcessContext c) {
               K key = c.element().getKey();
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/44e17d1c/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ExecutionContext.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ExecutionContext.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ExecutionContext.java
index f2a79bd..4429d76 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ExecutionContext.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ExecutionContext.java
@@ -41,14 +41,14 @@ public interface ExecutionContext {
 
   /**
    * Hook for subclasses to implement that will be called whenever
-   * {@link org.apache.beam.sdk.transforms.OldDoFn.Context#output}
+   * {@link org.apache.beam.sdk.transforms.DoFn.Context#output}
    * is called.
    */
   void noteOutput(WindowedValue<?> output);
 
   /**
    * Hook for subclasses to implement that will be called whenever
-   * {@link org.apache.beam.sdk.transforms.OldDoFn.Context#sideOutput}
+   * {@link org.apache.beam.sdk.transforms.DoFn.Context#sideOutput}
    * is called.
    */
   void noteSideOutput(TupleTag<?> tag, WindowedValue<?> output);
@@ -70,14 +70,14 @@ public interface ExecutionContext {
 
     /**
      * Hook for subclasses to implement that will be called whenever
-     * {@link org.apache.beam.sdk.transforms.OldDoFn.Context#output}
+     * {@link org.apache.beam.sdk.transforms.DoFn.Context#output}
      * is called.
      */
     void noteOutput(WindowedValue<?> output);
 
     /**
      * Hook for subclasses to implement that will be called whenever
-     * {@link org.apache.beam.sdk.transforms.OldDoFn.Context#sideOutput}
+     * {@link org.apache.beam.sdk.transforms.DoFn.Context#sideOutput}
      * is called.
      */
     void noteSideOutput(TupleTag<?> tag, WindowedValue<?> output);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/44e17d1c/sdks/java/core/src/test/java/org/apache/beam/sdk/AggregatorPipelineExtractorTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/AggregatorPipelineExtractorTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/AggregatorPipelineExtractorTest.java
index b4de768..c4e9b8a 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/AggregatorPipelineExtractorTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/AggregatorPipelineExtractorTest.java
@@ -33,9 +33,9 @@ import org.apache.beam.sdk.Pipeline.PipelineVisitor;
 import org.apache.beam.sdk.runners.TransformHierarchy;
 import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.Combine.CombineFn;
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.Max;
 import org.apache.beam.sdk.transforms.Min;
-import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.Sum;
@@ -68,7 +68,7 @@ public class AggregatorPipelineExtractorTest {
     @SuppressWarnings("rawtypes")
     ParDo.Bound bound = mock(ParDo.Bound.class, "Bound");
     AggregatorProvidingDoFn<ThreadGroup, StrictMath> fn = new AggregatorProvidingDoFn<>();
-    when(bound.getFn()).thenReturn(fn);
+    when(bound.getNewFn()).thenReturn(fn);
 
     Aggregator<Long, Long> aggregatorOne = fn.addAggregator(new Sum.SumLongFn());
     Aggregator<Integer, Integer> aggregatorTwo = fn.addAggregator(new Min.MinIntegerFn());
@@ -96,7 +96,7 @@ public class AggregatorPipelineExtractorTest {
     @SuppressWarnings("rawtypes")
     ParDo.BoundMulti bound = mock(ParDo.BoundMulti.class, "BoundMulti");
     AggregatorProvidingDoFn<Object, Void> fn = new AggregatorProvidingDoFn<>();
-    when(bound.getFn()).thenReturn(fn);
+    when(bound.getNewFn()).thenReturn(fn);
 
     Aggregator<Long, Long> aggregatorOne = fn.addAggregator(new Max.MaxLongFn());
     Aggregator<Double, Double> aggregatorTwo = fn.addAggregator(new Min.MinDoubleFn());
@@ -126,8 +126,8 @@ public class AggregatorPipelineExtractorTest {
     @SuppressWarnings("rawtypes")
     ParDo.BoundMulti otherBound = mock(ParDo.BoundMulti.class, "otherBound");
     AggregatorProvidingDoFn<String, Math> fn = new AggregatorProvidingDoFn<>();
-    when(bound.getFn()).thenReturn(fn);
-    when(otherBound.getFn()).thenReturn(fn);
+    when(bound.getNewFn()).thenReturn(fn);
+    when(otherBound.getNewFn()).thenReturn(fn);
 
     Aggregator<Long, Long> aggregatorOne = fn.addAggregator(new Sum.SumLongFn());
     Aggregator<Double, Double> aggregatorTwo = fn.addAggregator(new Min.MinDoubleFn());
@@ -162,7 +162,7 @@ public class AggregatorPipelineExtractorTest {
     AggregatorProvidingDoFn<ThreadGroup, Void> fn = new AggregatorProvidingDoFn<>();
     Aggregator<Long, Long> aggregatorOne = fn.addAggregator(new Sum.SumLongFn());
 
-    when(bound.getFn()).thenReturn(fn);
+    when(bound.getNewFn()).thenReturn(fn);
 
     @SuppressWarnings("rawtypes")
     ParDo.BoundMulti otherBound = mock(ParDo.BoundMulti.class, "otherBound");
@@ -170,7 +170,7 @@ public class AggregatorPipelineExtractorTest {
     AggregatorProvidingDoFn<Long, Long> otherFn = new AggregatorProvidingDoFn<>();
     Aggregator<Double, Double> aggregatorTwo = otherFn.addAggregator(new Sum.SumDoubleFn());
 
-    when(otherBound.getFn()).thenReturn(otherFn);
+    when(otherBound.getNewFn()).thenReturn(otherFn);
 
     TransformHierarchy.Node transformNode = mock(TransformHierarchy.Node.class);
     when(transformNode.getTransform()).thenReturn(bound);
@@ -208,7 +208,7 @@ public class AggregatorPipelineExtractorTest {
     }
   }
 
-  private static class AggregatorProvidingDoFn<InT, OuT> extends OldDoFn<InT, OuT> {
+  private static class AggregatorProvidingDoFn<InT, OuT> extends DoFn<InT, OuT> {
     public <InputT, OutT> Aggregator<InputT, OutT> addAggregator(
         CombineFn<InputT, ?, OutT> combiner) {
       return createAggregator(randomName(), combiner);
@@ -218,8 +218,8 @@ public class AggregatorPipelineExtractorTest {
       return UUID.randomUUID().toString();
     }
 
-    @Override
-    public void processElement(OldDoFn<InT, OuT>.ProcessContext c) throws Exception {
+    @ProcessElement
+    public void processElement(DoFn<InT, OuT>.ProcessContext c) throws Exception {
       fail();
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/44e17d1c/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java
index b47465e..2dafa27 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java
@@ -399,7 +399,7 @@ public class DoFnTesterTest {
 
   /**
    * A {@link DoFn} that adds values to an aggregator and converts input to String in
-   * {@link OldDoFn#processElement}.
+   * {@link DoFn.ProcessElement @ProcessElement}.
    */
   private static class CounterDoFn extends DoFn<Long, String> {
     Aggregator<Long, Long> agg = createAggregator("ctr", new Sum.SumLongFn());

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/44e17d1c/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoLifecycleTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoLifecycleTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoLifecycleTest.java
index f69c867..9bc8a64 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoLifecycleTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoLifecycleTest.java
@@ -50,7 +50,7 @@ public class ParDoLifecycleTest implements Serializable {
     PCollectionList.of(p.apply("Impolite", Create.of(1, 2, 4)))
         .and(p.apply("Polite", Create.of(3, 5, 6, 7)))
         .apply(Flatten.<Integer>pCollections())
-        .apply(ParDo.of(new CallSequenceEnforcingOldFn<Integer>()));
+        .apply(ParDo.of(new CallSequenceEnforcingDoFn<Integer>()));
 
     p.run();
   }
@@ -62,19 +62,19 @@ public class ParDoLifecycleTest implements Serializable {
     PCollectionList.of(p.apply("Impolite", Create.of(1, 2, 4)))
         .and(p.apply("Polite", Create.of(3, 5, 6, 7)))
         .apply(Flatten.<Integer>pCollections())
-        .apply(ParDo.of(new CallSequenceEnforcingOldFn<Integer>())
+        .apply(ParDo.of(new CallSequenceEnforcingDoFn<Integer>())
             .withOutputTags(new TupleTag<Integer>() {}, TupleTagList.empty()));
 
     p.run();
   }
 
-  private static class CallSequenceEnforcingOldFn<T> extends OldDoFn<T, T> {
+  private static class CallSequenceEnforcingDoFn<T> extends DoFn<T, T> {
     private boolean setupCalled = false;
     private int startBundleCalls = 0;
     private int finishBundleCalls = 0;
     private boolean teardownCalled = false;
 
-    @Override
+    @Setup
     public void setup() {
       assertThat("setup should not be called twice", setupCalled, is(false));
       assertThat("setup should be called before startBundle", startBundleCalls, equalTo(0));
@@ -83,7 +83,7 @@ public class ParDoLifecycleTest implements Serializable {
       setupCalled = true;
     }
 
-    @Override
+    @StartBundle
     public void startBundle(Context c) {
       assertThat("setup should have been called", setupCalled, is(true));
       assertThat(
@@ -94,7 +94,7 @@ public class ParDoLifecycleTest implements Serializable {
       startBundleCalls++;
     }
 
-    @Override
+    @ProcessElement
     public void processElement(ProcessContext c) throws Exception {
       assertThat("startBundle should have been called", startBundleCalls, greaterThan(0));
       assertThat(
@@ -104,7 +104,7 @@ public class ParDoLifecycleTest implements Serializable {
       assertThat("teardown should not have been called", teardownCalled, is(false));
     }
 
-    @Override
+    @FinishBundle
     public void finishBundle(Context c) {
       assertThat("startBundle should have been called", startBundleCalls, greaterThan(0));
       assertThat(
@@ -115,7 +115,7 @@ public class ParDoLifecycleTest implements Serializable {
       finishBundleCalls++;
     }
 
-    @Override
+    @Teardown
     public void teardown() {
       assertThat(setupCalled, is(true));
       assertThat(startBundleCalls, anyOf(equalTo(finishBundleCalls)));
@@ -345,7 +345,7 @@ public class ParDoLifecycleTest implements Serializable {
     }
   }
 
-  private static class ExceptionThrowingOldFn extends OldDoFn<Object, Object> {
+  private static class ExceptionThrowingOldFn extends DoFn<Object, Object> {
     static AtomicBoolean teardownCalled = new AtomicBoolean(false);
 
     private final MethodForException toThrow;
@@ -355,22 +355,22 @@ public class ParDoLifecycleTest implements Serializable {
       this.toThrow = toThrow;
     }
 
-    @Override
+    @Setup
     public void setup() throws Exception {
       throwIfNecessary(MethodForException.SETUP);
     }
 
-    @Override
+    @StartBundle
     public void startBundle(Context c) throws Exception {
       throwIfNecessary(MethodForException.START_BUNDLE);
     }
 
-    @Override
+    @ProcessElement
     public void processElement(ProcessContext c) throws Exception {
       throwIfNecessary(MethodForException.PROCESS_ELEMENT);
     }
 
-    @Override
+    @FinishBundle
     public void finishBundle(Context c) throws Exception {
       throwIfNecessary(MethodForException.FINISH_BUNDLE);
     }
@@ -382,7 +382,7 @@ public class ParDoLifecycleTest implements Serializable {
       }
     }
 
-    @Override
+    @Teardown
     public void teardown() {
       if (!thrown) {
         fail("Excepted to have a processing method throw an exception");


[2/2] incubator-beam git commit: [BEAM-498] Remove misc occurrences of OldDoFn

Posted by lc...@apache.org.
[BEAM-498] Remove misc occurrences of OldDoFn


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/04a41ee5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/04a41ee5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/04a41ee5

Branch: refs/heads/master
Commit: 04a41ee54ced13e7d896b7d31ffa99a3273af2dd
Parents: 92ff63d 44e17d1
Author: Luke Cwik <lc...@google.com>
Authored: Thu Dec 8 09:04:27 2016 -0800
Committer: Luke Cwik <lc...@google.com>
Committed: Thu Dec 8 09:04:27 2016 -0800

----------------------------------------------------------------------
 .../beam/sdk/AggregatorPipelineExtractor.java   |  5 ++--
 .../sdk/transforms/AggregatorRetriever.java     |  2 +-
 .../org/apache/beam/sdk/transforms/Combine.java |  4 +--
 .../apache/beam/sdk/util/ExecutionContext.java  |  8 +++---
 .../sdk/AggregatorPipelineExtractorTest.java    | 20 +++++++-------
 .../beam/sdk/transforms/DoFnTesterTest.java     |  2 +-
 .../beam/sdk/transforms/ParDoLifecycleTest.java | 28 ++++++++++----------
 7 files changed, 35 insertions(+), 34 deletions(-)
----------------------------------------------------------------------