You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/12/06 16:41:24 UTC

[34/50] [abbrv] incubator-beam git commit: Supports window parameter in DoFnTester

Supports window parameter in DoFnTester

Also prohibits other parameters, and prohibits output from bundle
methods (whereas previously it was silently dropped).


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/78ac009b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/78ac009b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/78ac009b

Branch: refs/heads/gearpump-runner
Commit: 78ac009be743a2e053580e9966f841174b636e88
Parents: 9645576
Author: Eugene Kirpichov <ki...@google.com>
Authored: Fri Dec 2 11:39:48 2016 -0800
Committer: Kenneth Knowles <kl...@google.com>
Committed: Fri Dec 2 15:42:33 2016 -0800

----------------------------------------------------------------------
 .../apache/beam/sdk/transforms/DoFnTester.java  | 166 ++++++++++++++-----
 .../beam/sdk/transforms/DoFnTesterTest.java     |  34 ++++
 2 files changed, 158 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/78ac009b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
index a9f93dd..7c1abef 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
@@ -38,13 +38,18 @@ import org.apache.beam.sdk.testing.ValueInSingleWindow;
 import org.apache.beam.sdk.transforms.Combine.CombineFn;
 import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
 import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.util.SerializableUtils;
+import org.apache.beam.sdk.util.Timer;
 import org.apache.beam.sdk.util.TimerInternals;
 import org.apache.beam.sdk.util.UserCodeException;
 import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowingInternals;
 import org.apache.beam.sdk.util.state.InMemoryStateInternals;
 import org.apache.beam.sdk.util.state.InMemoryTimerInternals;
 import org.apache.beam.sdk.util.state.StateInternals;
@@ -84,6 +89,9 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable {
   /**
    * Returns a {@code DoFnTester} supporting unit-testing of the given
    * {@link DoFn}. By default, uses {@link CloningBehavior#CLONE_ONCE}.
+   *
+   * <p>The only supported extra parameter of the {@link DoFn.ProcessElement} method is
+   * {@link BoundedWindow}.
    */
   @SuppressWarnings("unchecked")
   public static <InputT, OutputT> DoFnTester<InputT, OutputT> of(DoFn<InputT, OutputT> fn) {
@@ -236,7 +244,7 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable {
     if (state == State.UNINITIALIZED) {
       initializeState();
     }
-    TestContext context = createContext(fn);
+    TestContext context = new TestContext();
     context.setupDelegateAggregators();
     // State and timer internals are per-bundle.
     stateInternals = InMemoryStateInternals.forKey(new Object());
@@ -262,7 +270,7 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable {
   /**
    * Calls the {@link DoFn.ProcessElement} method on the {@link DoFn} under test, in a
    * context where {@link DoFn.ProcessContext#element} returns the
-   * given element.
+   * given element and the element is in the global window.
    *
    * <p>Will call {@link #startBundle} automatically, if it hasn't
    * already been called.
@@ -277,26 +285,86 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable {
   /**
    * Calls {@link DoFn.ProcessElement} on the {@code DoFn} under test, in a
    * context where {@link DoFn.ProcessContext#element} returns the
-   * given element and timestamp.
+   * given element and timestamp and the element is in the global window.
    *
    * <p>Will call {@link #startBundle} automatically, if it hasn't
    * already been called.
-   *
-   * <p>If the input timestamp is {@literal null}, the minimum timestamp will be used.
    */
   public void processTimestampedElement(TimestampedValue<InputT> element) throws Exception {
     checkNotNull(element, "Timestamped element cannot be null");
+    processWindowedElement(
+        element.getValue(), element.getTimestamp(), GlobalWindow.INSTANCE);
+  }
+
+  /**
+   * Calls {@link DoFn.ProcessElement} on the {@code DoFn} under test, in a
+   * context where {@link DoFn.ProcessContext#element} returns the
+   * given element and timestamp and the element is in the given window.
+   *
+   * <p>Will call {@link #startBundle} automatically, if it hasn't
+   * already been called.
+   */
+  public void processWindowedElement(
+      InputT element, Instant timestamp, final BoundedWindow window) throws Exception {
     if (state != State.BUNDLE_STARTED) {
       startBundle();
     }
     try {
-      final TestProcessContext processContext = createProcessContext(element);
-      fnInvoker.invokeProcessElement(new DoFnInvoker.FakeArgumentProvider<InputT, OutputT>() {
-        @Override
-        public DoFn<InputT, OutputT>.ProcessContext processContext(DoFn<InputT, OutputT> doFn) {
-          return processContext;
-        }
-      });
+      final TestProcessContext processContext =
+          new TestProcessContext(
+              ValueInSingleWindow.of(element, timestamp, window, PaneInfo.NO_FIRING));
+      fnInvoker.invokeProcessElement(
+          new DoFnInvoker.ArgumentProvider<InputT, OutputT>() {
+            @Override
+            public BoundedWindow window() {
+              return window;
+            }
+
+            @Override
+            public DoFn<InputT, OutputT>.Context context(DoFn<InputT, OutputT> doFn) {
+              throw new UnsupportedOperationException(
+                  "Not expected to access DoFn.Context from @ProcessElement");
+            }
+
+            @Override
+            public DoFn<InputT, OutputT>.ProcessContext processContext(DoFn<InputT, OutputT> doFn) {
+              return processContext;
+            }
+
+            @Override
+            public DoFn.InputProvider<InputT> inputProvider() {
+              throw new UnsupportedOperationException(
+                  "Not expected to access InputProvider from DoFnTester");
+            }
+
+            @Override
+            public DoFn.OutputReceiver<OutputT> outputReceiver() {
+              throw new UnsupportedOperationException(
+                  "Not expected to access OutputReceiver from DoFnTester");
+            }
+
+            @Override
+            public WindowingInternals<InputT, OutputT> windowingInternals() {
+              throw new UnsupportedOperationException(
+                  "Not expected to access WindowingInternals from a new DoFn");
+            }
+
+            @Override
+            public <RestrictionT> RestrictionTracker<RestrictionT> restrictionTracker() {
+              throw new UnsupportedOperationException(
+                  "Not expected to access RestrictionTracker from a regular DoFn in DoFnTester");
+            }
+
+            @Override
+            public org.apache.beam.sdk.util.state.State state(String stateId) {
+              throw new UnsupportedOperationException("DoFnTester doesn't support state yet");
+            }
+
+            @Override
+            public Timer timer(String timerId) {
+              throw new UnsupportedOperationException("DoFnTester doesn't support timers yet");
+            }
+          });
     } catch (UserCodeException e) {
       unwrapUserCodeException(e);
     }
@@ -318,7 +386,7 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable {
         "Must be inside bundle to call finishBundle, but was: %s",
         state);
     try {
-      fnInvoker.invokeFinishBundle(createContext(fn));
+      fnInvoker.invokeFinishBundle(new TestContext());
     } catch (UserCodeException e) {
       unwrapUserCodeException(e);
     }
@@ -543,10 +611,6 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable {
     return mainOutputTag;
   }
 
-  private TestContext createContext(DoFn<InputT, OutputT> fn) {
-    return new TestContext();
-  }
-
   private class TestContext extends DoFn<InputT, OutputT>.Context {
     TestContext() {
       fn.super();
@@ -559,12 +623,27 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable {
 
     @Override
     public void output(OutputT output) {
-      sideOutput(mainOutputTag, output);
+      throwUnsupportedOutputFromBundleMethods();
     }
 
     @Override
     public void outputWithTimestamp(OutputT output, Instant timestamp) {
-      sideOutputWithTimestamp(mainOutputTag, output, timestamp);
+      throwUnsupportedOutputFromBundleMethods();
+    }
+
+    @Override
+    public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
+      throwUnsupportedOutputFromBundleMethods();
+    }
+
+    @Override
+    public <T> void sideOutput(TupleTag<T> tag, T output) {
+      throwUnsupportedOutputFromBundleMethods();
+    }
+
+    private void throwUnsupportedOutputFromBundleMethods() {
+      throw new UnsupportedOperationException(
+          "DoFnTester doesn't support output from bundle methods");
     }
 
     @Override
@@ -613,26 +692,6 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable {
       }
       return aggregator;
     }
-
-    @Override
-    public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
-
-    }
-
-    @Override
-    public <T> void sideOutput(TupleTag<T> tag, T output) {
-      sideOutputWithTimestamp(tag, output, BoundedWindow.TIMESTAMP_MIN_VALUE);
-    }
-
-    public <T> void noteOutput(TupleTag<T> tag, ValueInSingleWindow<T> output) {
-      getMutableOutput(tag).add(output);
-    }
-  }
-
-  private TestProcessContext createProcessContext(TimestampedValue<InputT> elem) {
-    return new TestProcessContext(
-        ValueInSingleWindow.of(
-            elem.getValue(), elem.getTimestamp(), GlobalWindow.INSTANCE, PaneInfo.NO_FIRING));
   }
 
   private class TestProcessContext extends DoFn<InputT, OutputT>.ProcessContext {
@@ -641,7 +700,7 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable {
 
     private TestProcessContext(ValueInSingleWindow<InputT> element) {
       fn.super();
-      this.context = createContext(fn);
+      this.context = new TestContext();
       this.element = element;
     }
 
@@ -699,8 +758,8 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable {
 
     @Override
     public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
-      context.noteOutput(
-          tag, ValueInSingleWindow.of(output, timestamp, element.getWindow(), element.getPane()));
+      getMutableOutput(tag)
+          .add(ValueInSingleWindow.of(output, timestamp, element.getWindow(), element.getPane()));
     }
 
     @Override
@@ -772,6 +831,29 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable {
 
   private DoFnTester(DoFn<InputT, OutputT> origFn) {
     this.origFn = origFn;
+    DoFnSignature signature = DoFnSignatures.signatureForDoFn(origFn);
+    for (DoFnSignature.Parameter param : signature.processElement().extraParameters()) {
+      param.match(
+          new DoFnSignature.Parameter.Cases.WithDefault<Void>() {
+            @Override
+            public Void dispatch(DoFnSignature.Parameter.ProcessContextParameter p) {
+              // ProcessContext parameter is obviously supported.
+              return null;
+            }
+
+            @Override
+            public Void dispatch(DoFnSignature.Parameter.WindowParameter p) {
+              // We also support the BoundedWindow parameter.
+              return null;
+            }
+
+            @Override
+            protected Void dispatchDefault(DoFnSignature.Parameter p) {
+              throw new UnsupportedOperationException(
+                  "Parameter " + p + " not supported by DoFnTester");
+            }
+          });
+    }
   }
 
   @SuppressWarnings("unchecked")

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/78ac009b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java
index ff8a9bc..b47465e 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java
@@ -30,13 +30,16 @@ import java.util.List;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
 import org.apache.beam.sdk.util.PCollectionViews;
 import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TimestampedValue;
 import org.hamcrest.Matchers;
+import org.joda.time.Duration;
 import org.joda.time.Instant;
 import org.junit.Rule;
 import org.junit.Test;
@@ -350,6 +353,37 @@ public class DoFnTesterTest {
     }
   }
 
+  @Test
+  public void testSupportsWindowParameter() throws Exception {
+    Instant now = Instant.now();
+    try (DoFnTester<Integer, KV<Integer, BoundedWindow>> tester =
+        DoFnTester.of(new DoFnWithWindowParameter())) {
+      BoundedWindow firstWindow = new IntervalWindow(now, now.plus(Duration.standardMinutes(1)));
+      tester.processWindowedElement(1, now, firstWindow);
+      tester.processWindowedElement(2, now, firstWindow);
+      BoundedWindow secondWindow = new IntervalWindow(now, now.plus(Duration.standardMinutes(4)));
+      tester.processWindowedElement(3, now, secondWindow);
+      tester.finishBundle();
+
+      assertThat(
+          tester.peekOutputElementsInWindow(firstWindow),
+          containsInAnyOrder(
+              TimestampedValue.of(KV.of(1, firstWindow), now),
+              TimestampedValue.of(KV.of(2, firstWindow), now)));
+      assertThat(
+          tester.peekOutputElementsInWindow(secondWindow),
+          containsInAnyOrder(
+              TimestampedValue.of(KV.of(3, secondWindow), now)));
+    }
+  }
+
+  private static class DoFnWithWindowParameter extends DoFn<Integer, KV<Integer, BoundedWindow>> {
+    @ProcessElement
+    public void processElement(ProcessContext c, BoundedWindow window) {
+      c.output(KV.of(c.element(), window));
+    }
+  }
+
   private static class SideInputDoFn extends DoFn<Integer, Integer> {
     private final PCollectionView<Integer> value;