You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/08/25 18:55:46 UTC

[32/50] incubator-beam git commit: Remove WindowingInternals support from DoFnReflector

Remove WindowingInternals support from DoFnReflector

The test themselves are replaced by mostly-hidden placeholders, to
ensure that our code for handling generic parameters remains in place
until new context parameters that use generics are added back.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/20208d68
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/20208d68
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/20208d68

Branch: refs/heads/gearpump-runner
Commit: 20208d68142e756800507048d9b8339041f2db70
Parents: 063ff2f
Author: Kenneth Knowles <kl...@google.com>
Authored: Tue Aug 9 20:42:04 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Wed Aug 10 10:00:40 2016 -0700

----------------------------------------------------------------------
 .../org/apache/beam/sdk/transforms/DoFn.java    |  44 +++++-
 .../beam/sdk/transforms/DoFnReflector.java      |  92 +++++++----
 .../beam/sdk/transforms/DoFnReflectorTest.java  | 157 ++++++++++++++-----
 .../transforms/DoFnReflectorBenchmark.java      |  13 +-
 4 files changed, 214 insertions(+), 92 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/20208d68/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
index 6f9a6b6..a06467e 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
@@ -28,7 +28,6 @@ import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.display.HasDisplayData;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.util.WindowingInternals;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.TypeDescriptor;
@@ -302,11 +301,43 @@ public abstract class DoFn<InputT, OutputT> implements Serializable, HasDisplayD
     BoundedWindow window();
 
     /**
-     * Construct the {@link WindowingInternals} to use within a {@link DoFn} that
-     * needs it. This is called if the {@link ProcessElement} method has a parameter of type
-     * {@link WindowingInternals}.
+     * A placeholder for testing purposes. The return type itself is package-private and not
+     * implemented.
      */
-    WindowingInternals<InputT, OutputT> windowingInternals();
+    InputProvider<InputT> inputProvider();
+
+    /**
+     * A placeholder for testing purposes. The return type itself is package-private and not
+     * implemented.
+     */
+    OutputReceiver<OutputT> outputReceiver();
+  }
+
+  static interface OutputReceiver<T> {
+    void output(T output);
+  }
+
+  static interface InputProvider<T> {
+    T get();
+  }
+
+  /** For testing only, this {@link ExtraContextFactory} returns {@code null} for all parameters. */
+  public static class FakeExtraContextFactory<InputT, OutputT>
+      implements ExtraContextFactory<InputT, OutputT> {
+    @Override
+    public BoundedWindow window() {
+      return null;
+    }
+
+    @Override
+    public InputProvider<InputT> inputProvider() {
+      return null;
+    }
+
+    @Override
+    public OutputReceiver<OutputT> outputReceiver() {
+      return null;
+    }
   }
 
   /////////////////////////////////////////////////////////////////////////////
@@ -331,8 +362,7 @@ public abstract class DoFn<InputT, OutputT> implements Serializable, HasDisplayD
    * <ul>
    *   <li>It must have at least one argument.
    *   <li>Its first argument must be a {@link DoFn.ProcessContext}.
-   *   <li>Its remaining arguments must be {@link BoundedWindow}, or
-   *   {@link WindowingInternals WindowingInternals&lt;InputT, OutputT&gt;}.
+   *   <li>Its remaining argument, if any, must be {@link BoundedWindow}.
    * </ul>
    */
   @Documented

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/20208d68/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnReflector.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnReflector.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnReflector.java
index c6168b3..3dfda55 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnReflector.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnReflector.java
@@ -26,7 +26,6 @@ import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.util.UserCodeException;
-import org.apache.beam.sdk.util.WindowingInternals;
 import org.apache.beam.sdk.util.common.ReflectHelpers;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TupleTag;
@@ -35,6 +34,7 @@ import org.apache.beam.sdk.values.TypeDescriptor;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Function;
 import com.google.common.base.Preconditions;
+import com.google.common.base.Predicate;
 import com.google.common.collect.FluentIterable;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
@@ -91,6 +91,7 @@ import java.util.LinkedHashMap;
 import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
+import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
 
@@ -121,20 +122,35 @@ public abstract class DoFnReflector {
     /** Any {@link BoundedWindow} parameter is populated by the window of the current element. */
     WINDOW_OF_ELEMENT(Availability.PROCESS_ELEMENT_ONLY, BoundedWindow.class, "window") {
       @Override
-      public <InputT, OutputT> TypeToken<?>
-          tokenFor(TypeToken<InputT> in, TypeToken<OutputT> out) {
+      public <InputT, OutputT> TypeToken<?> tokenFor(TypeToken<InputT> in, TypeToken<OutputT> out) {
         return TypeToken.of(BoundedWindow.class);
       }
     },
 
-    WINDOWING_INTERNALS(Availability.PROCESS_ELEMENT_ONLY,
-        WindowingInternals.class, "windowingInternals") {
+    INPUT_PROVIDER(Availability.PROCESS_ELEMENT_ONLY, DoFn.InputProvider.class, "inputProvider") {
       @Override
-      public <InputT, OutputT> TypeToken<?> tokenFor(
-          TypeToken<InputT> in, TypeToken<OutputT> out) {
-        return new TypeToken<WindowingInternals<InputT, OutputT>>() {}
-            .where(new TypeParameter<InputT>() {}, in)
-            .where(new TypeParameter<OutputT>() {}, out);
+      public <InputT, OutputT> TypeToken<?> tokenFor(TypeToken<InputT> in, TypeToken<OutputT> out) {
+        return new TypeToken<DoFn.InputProvider<InputT>>() {}.where(
+            new TypeParameter<InputT>() {}, in);
+      }
+
+      @Override
+      public boolean isHidden() {
+        return true;
+      }
+    },
+
+    OUTPUT_RECEIVER(
+        Availability.PROCESS_ELEMENT_ONLY, DoFn.OutputReceiver.class, "outputReceiver") {
+      @Override
+      public <InputT, OutputT> TypeToken<?> tokenFor(TypeToken<InputT> in, TypeToken<OutputT> out) {
+        return new TypeToken<DoFn.OutputReceiver<OutputT>>() {}.where(
+            new TypeParameter<OutputT>() {}, out);
+      }
+
+      @Override
+      public boolean isHidden() {
+        return true;
       }
     };
 
@@ -146,6 +162,14 @@ public abstract class DoFnReflector {
     abstract <InputT, OutputT> TypeToken<?> tokenFor(
         TypeToken<InputT> in, TypeToken<OutputT> out);
 
+    /**
+     * Indicates whether this enum is for testing only, hence should not appear in error messages,
+     * etc. Defaults to {@code false}.
+     */
+    boolean isHidden() {
+      return false;
+    }
+
     private final Class<?> rawType;
     private final Availability availability;
     private final transient MethodDescription method;
@@ -241,16 +265,17 @@ public abstract class DoFnReflector {
       final TypeToken<?> in, final TypeToken<?> out) {
     return FluentIterable
         .from(extraProcessContexts.values())
+        .filter(new Predicate<AdditionalParameter>() {
+          @Override
+          public boolean apply(@Nonnull AdditionalParameter additionalParameter) {
+            return !additionalParameter.isHidden();
+          }
+        })
         .transform(new Function<AdditionalParameter, String>() {
-
           @Override
-          @Nullable
-          public String apply(@Nullable AdditionalParameter input) {
-            if (input == null) {
-              return null;
-            } else {
-              return formatType(input.tokenFor(in, out));
-            }
+          @Nonnull
+          public String apply(@Nonnull AdditionalParameter input) {
+            return formatType(input.tokenFor(in, out));
           }
         })
         .toSortedSet(String.CASE_INSENSITIVE_ORDER);
@@ -285,10 +310,9 @@ public abstract class DoFnReflector {
    * <li>The method has at least one argument.
    * <li>The first argument is of type firstContextArg.
    * <li>The remaining arguments have raw types that appear in {@code contexts}
-   * <li>Any generics on the extra context arguments match what is expected. Eg.,
-   *     {@code WindowingInternals<InputT, OutputT>} either matches the
-   *     {@code InputT} and {@code OutputT} parameters of the
-   *     {@code OldDoFn<InputT, OutputT>.ProcessContext}, or it uses a wildcard, etc.
+   * <li>Any generics on the extra context arguments match what is expected. Currently, this
+   * is exercised only by placeholders. For example, {@code InputReceiver<InputT> must either match
+   * the {@code InputT} {@code OldDoFn<InputT, OutputT>.ProcessContext} or use a wildcard, etc.
    * </ol>
    *
    * @param m the method to verify
@@ -298,7 +322,8 @@ public abstract class DoFnReflector {
    * @param iParam TypeParameter representing the input type
    * @param oParam TypeParameter representing the output type
    */
-  @VisibleForTesting static <InputT, OutputT> List<AdditionalParameter> verifyMethodArguments(
+  @VisibleForTesting
+  static <InputT, OutputT> List<AdditionalParameter> verifyMethodArguments(
       Method m,
       Map<Class<?>, AdditionalParameter> contexts,
       TypeToken<?> firstContextArg,
@@ -607,11 +632,13 @@ public abstract class DoFnReflector {
     }
 
     @Override
-    public WindowingInternals<InputT, OutputT> windowingInternals() {
-      // The DoFn doesn't allow us to ask for these outside ProcessElements, so this
-      // should be unreachable.
-      throw new UnsupportedOperationException(
-          "Can only get the windowingInternals in ProcessElements");
+    public DoFn.InputProvider<InputT> inputProvider() {
+      throw new UnsupportedOperationException("inputProvider() exists only for testing");
+    }
+
+    @Override
+    public DoFn.OutputReceiver<OutputT> outputReceiver() {
+      throw new UnsupportedOperationException("outputReceiver() exists only for testing");
     }
   }
 
@@ -679,8 +706,13 @@ public abstract class DoFnReflector {
     }
 
     @Override
-    public WindowingInternals<InputT, OutputT> windowingInternals() {
-      return context.windowingInternals();
+    public DoFn.InputProvider<InputT> inputProvider() {
+      throw new UnsupportedOperationException("inputProvider() exists only for testing");
+    }
+
+    @Override
+    public DoFn.OutputReceiver<OutputT> outputReceiver() {
+      throw new UnsupportedOperationException("outputReceiver() exists only for testing");
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/20208d68/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnReflectorTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnReflectorTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnReflectorTest.java
index df9e441..c47e0cf 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnReflectorTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnReflectorTest.java
@@ -28,7 +28,6 @@ import org.apache.beam.sdk.transforms.DoFn.ProcessElement;
 import org.apache.beam.sdk.transforms.dofnreflector.DoFnReflectorTestHelper;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.UserCodeException;
-import org.apache.beam.sdk.util.WindowingInternals;
 
 import org.junit.Before;
 import org.junit.Rule;
@@ -71,7 +70,9 @@ public class DoFnReflectorTest {
   @Mock
   private BoundedWindow mockWindow;
   @Mock
-  private WindowingInternals<String, String> mockWindowingInternals;
+  private DoFn.InputProvider<String> mockInputProvider;
+  @Mock
+  private DoFn.OutputReceiver<String> mockOutputReceiver;
 
   private ExtraContextFactory<String, String> extraContextFactory;
 
@@ -85,8 +86,13 @@ public class DoFnReflectorTest {
       }
 
       @Override
-      public WindowingInternals<String, String> windowingInternals() {
-        return mockWindowingInternals;
+      public DoFn.InputProvider<String> inputProvider() {
+        return mockInputProvider;
+      }
+
+      @Override
+      public DoFn.OutputReceiver<String> outputReceiver() {
+        return mockOutputReceiver;
       }
     };
   }
@@ -257,16 +263,35 @@ public class DoFnReflectorTest {
   }
 
   @Test
-  public void testDoFnWithWindowingInternals() throws Exception {
+  public void testDoFnWithOutputReceiver() throws Exception {
+    final Invocations invocations = new Invocations("AnonymousClass");
+    DoFnReflector reflector = underTest(new DoFn<String, String>() {
+
+      @ProcessElement
+      public void processElement(ProcessContext c, DoFn.OutputReceiver<String> o)
+          throws Exception {
+        invocations.wasProcessElementInvoked = true;
+        assertSame(c, mockContext);
+        assertSame(o, mockOutputReceiver);
+      }
+    });
+
+    assertFalse(reflector.usesSingleWindow());
+
+    checkInvokeProcessElementWorks(reflector, invocations);
+  }
+
+  @Test
+  public void testDoFnWithInputProvider() throws Exception {
     final Invocations invocations = new Invocations("AnonymousClass");
     DoFnReflector reflector = underTest(new DoFn<String, String>() {
 
       @ProcessElement
-      public void processElement(ProcessContext c, WindowingInternals<String, String> w)
+      public void processElement(ProcessContext c, DoFn.InputProvider<String> i)
           throws Exception {
         invocations.wasProcessElementInvoked = true;
         assertSame(c, mockContext);
-        assertSame(w, mockWindowingInternals);
+        assertSame(i, mockInputProvider);
       }
     });
 
@@ -513,7 +538,7 @@ public class DoFnReflectorTest {
     thrown.expectMessage(
         "Integer is not a valid context parameter for method "
         + getClass().getName() + "#badExtraProcessContext(ProcessContext, Integer)"
-        + ". Should be one of [BoundedWindow, WindowingInternals<Integer, String>]");
+        + ". Should be one of [BoundedWindow]");
 
     DoFnReflector.verifyProcessMethodArguments(
         getClass().getDeclaredMethod("badExtraProcessContext",
@@ -534,102 +559,148 @@ public class DoFnReflectorTest {
   }
 
   @SuppressWarnings("unused")
-  private void goodGenerics(DoFn<Integer, String>.ProcessContext c,
-      WindowingInternals<Integer, String> i1) {}
+  private void goodGenerics(
+      DoFn<Integer, String>.ProcessContext c,
+      DoFn.InputProvider<Integer> input,
+      DoFn.OutputReceiver<String> output) {}
 
   @Test
   public void testValidGenerics() throws Exception {
-    Method method = getClass().getDeclaredMethod("goodGenerics",
-        DoFn.ProcessContext.class, WindowingInternals.class);
+    Method method =
+        getClass()
+            .getDeclaredMethod(
+                "goodGenerics",
+                DoFn.ProcessContext.class,
+                DoFn.InputProvider.class,
+                DoFn.OutputReceiver.class);
     DoFnReflector.verifyProcessMethodArguments(method);
   }
 
   @SuppressWarnings("unused")
-  private void goodWildcards(DoFn<Integer, String>.ProcessContext c,
-      WindowingInternals<?, ?> i1) {}
+  private void goodWildcards(
+      DoFn<Integer, String>.ProcessContext c,
+      DoFn.InputProvider<?> input,
+      DoFn.OutputReceiver<?> output) {}
 
   @Test
   public void testGoodWildcards() throws Exception {
-    Method method = getClass().getDeclaredMethod("goodWildcards",
-        DoFn.ProcessContext.class, WindowingInternals.class);
+    Method method =
+        getClass()
+            .getDeclaredMethod(
+                "goodWildcards",
+                DoFn.ProcessContext.class,
+                DoFn.InputProvider.class,
+                DoFn.OutputReceiver.class);
     DoFnReflector.verifyProcessMethodArguments(method);
   }
 
   @SuppressWarnings("unused")
-  private void goodBoundedWildcards(DoFn<Integer, String>.ProcessContext c,
-      WindowingInternals<? super Integer, ? super String> i1) {}
+  private void goodBoundedWildcards(
+      DoFn<Integer, String>.ProcessContext c,
+      DoFn.InputProvider<? super Integer> input,
+      DoFn.OutputReceiver<? super String> output) {}
 
   @Test
   public void testGoodBoundedWildcards() throws Exception {
-    Method method = getClass().getDeclaredMethod("goodBoundedWildcards",
-        DoFn.ProcessContext.class, WindowingInternals.class);
+    Method method =
+        getClass()
+            .getDeclaredMethod(
+                "goodBoundedWildcards",
+                DoFn.ProcessContext.class,
+                DoFn.InputProvider.class,
+                DoFn.OutputReceiver.class);
     DoFnReflector.verifyProcessMethodArguments(method);
   }
 
   @SuppressWarnings("unused")
   private <InputT, OutputT> void goodTypeVariables(
       DoFn<InputT, OutputT>.ProcessContext c,
-      WindowingInternals<InputT, OutputT> i1) {}
+      DoFn.InputProvider<InputT> input,
+      DoFn.OutputReceiver<OutputT> output) {}
 
   @Test
   public void testGoodTypeVariables() throws Exception {
-    Method method = getClass().getDeclaredMethod("goodTypeVariables",
-        DoFn.ProcessContext.class, WindowingInternals.class);
+    Method method =
+        getClass()
+            .getDeclaredMethod(
+                "goodTypeVariables",
+                DoFn.ProcessContext.class,
+                DoFn.InputProvider.class,
+                DoFn.OutputReceiver.class);
     DoFnReflector.verifyProcessMethodArguments(method);
   }
 
   @SuppressWarnings("unused")
-  private void badGenericTwoArgs(DoFn<Integer, String>.ProcessContext c,
-      WindowingInternals<Integer, Integer> i1) {}
+  private void badGenericTwoArgs(
+      DoFn<Integer, String>.ProcessContext c,
+      DoFn.InputProvider<Integer> input,
+      DoFn.OutputReceiver<Integer> output) {}
 
   @Test
   public void testBadGenericsTwoArgs() throws Exception {
-    Method method = getClass().getDeclaredMethod("badGenericTwoArgs",
-        DoFn.ProcessContext.class, WindowingInternals.class);
+    Method method =
+        getClass()
+            .getDeclaredMethod(
+                "badGenericTwoArgs",
+                DoFn.ProcessContext.class,
+                DoFn.InputProvider.class,
+                DoFn.OutputReceiver.class);
 
     thrown.expect(IllegalStateException.class);
     thrown.expectMessage("Incompatible generics in context parameter "
-        + "WindowingInternals<Integer, Integer> "
+        + "OutputReceiver<Integer> "
         + "for method " + getClass().getName()
-        + "#badGenericTwoArgs(ProcessContext, WindowingInternals). Should be "
-        + "WindowingInternals<Integer, String>");
+        + "#badGenericTwoArgs(ProcessContext, InputProvider, OutputReceiver). Should be "
+        + "OutputReceiver<String>");
 
     DoFnReflector.verifyProcessMethodArguments(method);
   }
 
   @SuppressWarnings("unused")
-  private void badGenericWildCards(DoFn<Integer, String>.ProcessContext c,
-      WindowingInternals<Integer, ? super Integer> i1) {}
+  private void badGenericWildCards(
+      DoFn<Integer, String>.ProcessContext c,
+      DoFn.InputProvider<Integer> input,
+      DoFn.OutputReceiver<? super Integer> output) {}
 
   @Test
   public void testBadGenericWildCards() throws Exception {
-    Method method = getClass().getDeclaredMethod("badGenericWildCards",
-        DoFn.ProcessContext.class, WindowingInternals.class);
+    Method method =
+        getClass()
+            .getDeclaredMethod(
+                "badGenericWildCards",
+                DoFn.ProcessContext.class,
+                DoFn.InputProvider.class,
+                DoFn.OutputReceiver.class);
 
     thrown.expect(IllegalStateException.class);
     thrown.expectMessage("Incompatible generics in context parameter "
-        + "WindowingInternals<Integer, ? super Integer> for method "
+        + "OutputReceiver<? super Integer> for method "
         + getClass().getName()
-        + "#badGenericWildCards(ProcessContext, WindowingInternals). Should be "
-        + "WindowingInternals<Integer, String>");
+        + "#badGenericWildCards(ProcessContext, InputProvider, OutputReceiver). Should be "
+        + "OutputReceiver<String>");
 
     DoFnReflector.verifyProcessMethodArguments(method);
   }
 
   @SuppressWarnings("unused")
   private <InputT, OutputT> void badTypeVariables(DoFn<InputT, OutputT>.ProcessContext c,
-      WindowingInternals<InputT, InputT> i1) {}
+      DoFn.InputProvider<InputT> input, DoFn.OutputReceiver<InputT> output) {}
 
   @Test
   public void testBadTypeVariables() throws Exception {
-    Method method = getClass().getDeclaredMethod("badTypeVariables",
-        DoFn.ProcessContext.class, WindowingInternals.class);
+    Method method =
+        getClass()
+            .getDeclaredMethod(
+                "badTypeVariables",
+                DoFn.ProcessContext.class,
+                DoFn.InputProvider.class,
+                DoFn.OutputReceiver.class);
 
     thrown.expect(IllegalStateException.class);
     thrown.expectMessage("Incompatible generics in context parameter "
-        + "WindowingInternals<InputT, InputT> for method " + getClass().getName()
-        + "#badTypeVariables(ProcessContext, WindowingInternals). Should be "
-        + "WindowingInternals<InputT, OutputT>");
+        + "OutputReceiver<InputT> for method " + getClass().getName()
+        + "#badTypeVariables(ProcessContext, InputProvider, OutputReceiver). Should be "
+        + "OutputReceiver<OutputT>");
 
     DoFnReflector.verifyProcessMethodArguments(method);
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/20208d68/sdks/java/microbenchmarks/src/main/java/org/apache/beam/sdk/microbenchmarks/transforms/DoFnReflectorBenchmark.java
----------------------------------------------------------------------
diff --git a/sdks/java/microbenchmarks/src/main/java/org/apache/beam/sdk/microbenchmarks/transforms/DoFnReflectorBenchmark.java b/sdks/java/microbenchmarks/src/main/java/org/apache/beam/sdk/microbenchmarks/transforms/DoFnReflectorBenchmark.java
index 233b8be..91ecd16 100644
--- a/sdks/java/microbenchmarks/src/main/java/org/apache/beam/sdk/microbenchmarks/transforms/DoFnReflectorBenchmark.java
+++ b/sdks/java/microbenchmarks/src/main/java/org/apache/beam/sdk/microbenchmarks/transforms/DoFnReflectorBenchmark.java
@@ -58,18 +58,7 @@ public class DoFnReflectorBenchmark {
   private StubDoFnProcessContext stubDoFnContext =
       new StubDoFnProcessContext(doFn, ELEMENT);
   private ExtraContextFactory<String, String> extraContextFactory =
-      new ExtraContextFactory<String, String>() {
-
-    @Override
-    public BoundedWindow window() {
-      return null;
-    }
-
-    @Override
-    public WindowingInternals<String, String> windowingInternals() {
-      return null;
-    }
-  };
+      new DoFn.FakeExtraContextFactory<>();
 
   private DoFnReflector doFnReflector;
   private OldDoFn<String, String> adaptedDoFnWithContext;