You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/08/25 18:55:46 UTC
[32/50] incubator-beam git commit: Remove WindowingInternals support
from DoFnReflector
Remove WindowingInternals support from DoFnReflector
The test themselves are replaced by mostly-hidden placeholders, to
ensure that our code for handling generic parameters remains in place
until new context parameters that use generics are added back.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/20208d68
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/20208d68
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/20208d68
Branch: refs/heads/gearpump-runner
Commit: 20208d68142e756800507048d9b8339041f2db70
Parents: 063ff2f
Author: Kenneth Knowles <kl...@google.com>
Authored: Tue Aug 9 20:42:04 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Wed Aug 10 10:00:40 2016 -0700
----------------------------------------------------------------------
.../org/apache/beam/sdk/transforms/DoFn.java | 44 +++++-
.../beam/sdk/transforms/DoFnReflector.java | 92 +++++++----
.../beam/sdk/transforms/DoFnReflectorTest.java | 157 ++++++++++++++-----
.../transforms/DoFnReflectorBenchmark.java | 13 +-
4 files changed, 214 insertions(+), 92 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/20208d68/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
index 6f9a6b6..a06467e 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
@@ -28,7 +28,6 @@ import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.display.HasDisplayData;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.util.WindowingInternals;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TypeDescriptor;
@@ -302,11 +301,43 @@ public abstract class DoFn<InputT, OutputT> implements Serializable, HasDisplayD
BoundedWindow window();
/**
- * Construct the {@link WindowingInternals} to use within a {@link DoFn} that
- * needs it. This is called if the {@link ProcessElement} method has a parameter of type
- * {@link WindowingInternals}.
+ * A placeholder for testing purposes. The return type itself is package-private and not
+ * implemented.
*/
- WindowingInternals<InputT, OutputT> windowingInternals();
+ InputProvider<InputT> inputProvider();
+
+ /**
+ * A placeholder for testing purposes. The return type itself is package-private and not
+ * implemented.
+ */
+ OutputReceiver<OutputT> outputReceiver();
+ }
+
+ static interface OutputReceiver<T> {
+ void output(T output);
+ }
+
+ static interface InputProvider<T> {
+ T get();
+ }
+
+ /** For testing only, this {@link ExtraContextFactory} returns {@code null} for all parameters. */
+ public static class FakeExtraContextFactory<InputT, OutputT>
+ implements ExtraContextFactory<InputT, OutputT> {
+ @Override
+ public BoundedWindow window() {
+ return null;
+ }
+
+ @Override
+ public InputProvider<InputT> inputProvider() {
+ return null;
+ }
+
+ @Override
+ public OutputReceiver<OutputT> outputReceiver() {
+ return null;
+ }
}
/////////////////////////////////////////////////////////////////////////////
@@ -331,8 +362,7 @@ public abstract class DoFn<InputT, OutputT> implements Serializable, HasDisplayD
* <ul>
* <li>It must have at least one argument.
* <li>Its first argument must be a {@link DoFn.ProcessContext}.
- * <li>Its remaining arguments must be {@link BoundedWindow}, or
- * {@link WindowingInternals WindowingInternals<InputT, OutputT>}.
+ * <li>Its remaining argument, if any, must be {@link BoundedWindow}.
* </ul>
*/
@Documented
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/20208d68/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnReflector.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnReflector.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnReflector.java
index c6168b3..3dfda55 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnReflector.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnReflector.java
@@ -26,7 +26,6 @@ import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.UserCodeException;
-import org.apache.beam.sdk.util.WindowingInternals;
import org.apache.beam.sdk.util.common.ReflectHelpers;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
@@ -35,6 +34,7 @@ import org.apache.beam.sdk.values.TypeDescriptor;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
+import com.google.common.base.Predicate;
import com.google.common.collect.FluentIterable;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
@@ -91,6 +91,7 @@ import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
+import javax.annotation.Nonnull;
import javax.annotation.Nullable;
@@ -121,20 +122,35 @@ public abstract class DoFnReflector {
/** Any {@link BoundedWindow} parameter is populated by the window of the current element. */
WINDOW_OF_ELEMENT(Availability.PROCESS_ELEMENT_ONLY, BoundedWindow.class, "window") {
@Override
- public <InputT, OutputT> TypeToken<?>
- tokenFor(TypeToken<InputT> in, TypeToken<OutputT> out) {
+ public <InputT, OutputT> TypeToken<?> tokenFor(TypeToken<InputT> in, TypeToken<OutputT> out) {
return TypeToken.of(BoundedWindow.class);
}
},
- WINDOWING_INTERNALS(Availability.PROCESS_ELEMENT_ONLY,
- WindowingInternals.class, "windowingInternals") {
+ INPUT_PROVIDER(Availability.PROCESS_ELEMENT_ONLY, DoFn.InputProvider.class, "inputProvider") {
@Override
- public <InputT, OutputT> TypeToken<?> tokenFor(
- TypeToken<InputT> in, TypeToken<OutputT> out) {
- return new TypeToken<WindowingInternals<InputT, OutputT>>() {}
- .where(new TypeParameter<InputT>() {}, in)
- .where(new TypeParameter<OutputT>() {}, out);
+ public <InputT, OutputT> TypeToken<?> tokenFor(TypeToken<InputT> in, TypeToken<OutputT> out) {
+ return new TypeToken<DoFn.InputProvider<InputT>>() {}.where(
+ new TypeParameter<InputT>() {}, in);
+ }
+
+ @Override
+ public boolean isHidden() {
+ return true;
+ }
+ },
+
+ OUTPUT_RECEIVER(
+ Availability.PROCESS_ELEMENT_ONLY, DoFn.OutputReceiver.class, "outputReceiver") {
+ @Override
+ public <InputT, OutputT> TypeToken<?> tokenFor(TypeToken<InputT> in, TypeToken<OutputT> out) {
+ return new TypeToken<DoFn.OutputReceiver<OutputT>>() {}.where(
+ new TypeParameter<OutputT>() {}, out);
+ }
+
+ @Override
+ public boolean isHidden() {
+ return true;
}
};
@@ -146,6 +162,14 @@ public abstract class DoFnReflector {
abstract <InputT, OutputT> TypeToken<?> tokenFor(
TypeToken<InputT> in, TypeToken<OutputT> out);
+ /**
+ * Indicates whether this enum is for testing only, hence should not appear in error messages,
+ * etc. Defaults to {@code false}.
+ */
+ boolean isHidden() {
+ return false;
+ }
+
private final Class<?> rawType;
private final Availability availability;
private final transient MethodDescription method;
@@ -241,16 +265,17 @@ public abstract class DoFnReflector {
final TypeToken<?> in, final TypeToken<?> out) {
return FluentIterable
.from(extraProcessContexts.values())
+ .filter(new Predicate<AdditionalParameter>() {
+ @Override
+ public boolean apply(@Nonnull AdditionalParameter additionalParameter) {
+ return !additionalParameter.isHidden();
+ }
+ })
.transform(new Function<AdditionalParameter, String>() {
-
@Override
- @Nullable
- public String apply(@Nullable AdditionalParameter input) {
- if (input == null) {
- return null;
- } else {
- return formatType(input.tokenFor(in, out));
- }
+ @Nonnull
+ public String apply(@Nonnull AdditionalParameter input) {
+ return formatType(input.tokenFor(in, out));
}
})
.toSortedSet(String.CASE_INSENSITIVE_ORDER);
@@ -285,10 +310,9 @@ public abstract class DoFnReflector {
* <li>The method has at least one argument.
* <li>The first argument is of type firstContextArg.
* <li>The remaining arguments have raw types that appear in {@code contexts}
- * <li>Any generics on the extra context arguments match what is expected. Eg.,
- * {@code WindowingInternals<InputT, OutputT>} either matches the
- * {@code InputT} and {@code OutputT} parameters of the
- * {@code OldDoFn<InputT, OutputT>.ProcessContext}, or it uses a wildcard, etc.
+ * <li>Any generics on the extra context arguments match what is expected. Currently, this
+ * is exercised only by placeholders. For example, {@code InputReceiver<InputT> must either match
+ * the {@code InputT} {@code OldDoFn<InputT, OutputT>.ProcessContext} or use a wildcard, etc.
* </ol>
*
* @param m the method to verify
@@ -298,7 +322,8 @@ public abstract class DoFnReflector {
* @param iParam TypeParameter representing the input type
* @param oParam TypeParameter representing the output type
*/
- @VisibleForTesting static <InputT, OutputT> List<AdditionalParameter> verifyMethodArguments(
+ @VisibleForTesting
+ static <InputT, OutputT> List<AdditionalParameter> verifyMethodArguments(
Method m,
Map<Class<?>, AdditionalParameter> contexts,
TypeToken<?> firstContextArg,
@@ -607,11 +632,13 @@ public abstract class DoFnReflector {
}
@Override
- public WindowingInternals<InputT, OutputT> windowingInternals() {
- // The DoFn doesn't allow us to ask for these outside ProcessElements, so this
- // should be unreachable.
- throw new UnsupportedOperationException(
- "Can only get the windowingInternals in ProcessElements");
+ public DoFn.InputProvider<InputT> inputProvider() {
+ throw new UnsupportedOperationException("inputProvider() exists only for testing");
+ }
+
+ @Override
+ public DoFn.OutputReceiver<OutputT> outputReceiver() {
+ throw new UnsupportedOperationException("outputReceiver() exists only for testing");
}
}
@@ -679,8 +706,13 @@ public abstract class DoFnReflector {
}
@Override
- public WindowingInternals<InputT, OutputT> windowingInternals() {
- return context.windowingInternals();
+ public DoFn.InputProvider<InputT> inputProvider() {
+ throw new UnsupportedOperationException("inputProvider() exists only for testing");
+ }
+
+ @Override
+ public DoFn.OutputReceiver<OutputT> outputReceiver() {
+ throw new UnsupportedOperationException("outputReceiver() exists only for testing");
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/20208d68/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnReflectorTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnReflectorTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnReflectorTest.java
index df9e441..c47e0cf 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnReflectorTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnReflectorTest.java
@@ -28,7 +28,6 @@ import org.apache.beam.sdk.transforms.DoFn.ProcessElement;
import org.apache.beam.sdk.transforms.dofnreflector.DoFnReflectorTestHelper;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.UserCodeException;
-import org.apache.beam.sdk.util.WindowingInternals;
import org.junit.Before;
import org.junit.Rule;
@@ -71,7 +70,9 @@ public class DoFnReflectorTest {
@Mock
private BoundedWindow mockWindow;
@Mock
- private WindowingInternals<String, String> mockWindowingInternals;
+ private DoFn.InputProvider<String> mockInputProvider;
+ @Mock
+ private DoFn.OutputReceiver<String> mockOutputReceiver;
private ExtraContextFactory<String, String> extraContextFactory;
@@ -85,8 +86,13 @@ public class DoFnReflectorTest {
}
@Override
- public WindowingInternals<String, String> windowingInternals() {
- return mockWindowingInternals;
+ public DoFn.InputProvider<String> inputProvider() {
+ return mockInputProvider;
+ }
+
+ @Override
+ public DoFn.OutputReceiver<String> outputReceiver() {
+ return mockOutputReceiver;
}
};
}
@@ -257,16 +263,35 @@ public class DoFnReflectorTest {
}
@Test
- public void testDoFnWithWindowingInternals() throws Exception {
+ public void testDoFnWithOutputReceiver() throws Exception {
+ final Invocations invocations = new Invocations("AnonymousClass");
+ DoFnReflector reflector = underTest(new DoFn<String, String>() {
+
+ @ProcessElement
+ public void processElement(ProcessContext c, DoFn.OutputReceiver<String> o)
+ throws Exception {
+ invocations.wasProcessElementInvoked = true;
+ assertSame(c, mockContext);
+ assertSame(o, mockOutputReceiver);
+ }
+ });
+
+ assertFalse(reflector.usesSingleWindow());
+
+ checkInvokeProcessElementWorks(reflector, invocations);
+ }
+
+ @Test
+ public void testDoFnWithInputProvider() throws Exception {
final Invocations invocations = new Invocations("AnonymousClass");
DoFnReflector reflector = underTest(new DoFn<String, String>() {
@ProcessElement
- public void processElement(ProcessContext c, WindowingInternals<String, String> w)
+ public void processElement(ProcessContext c, DoFn.InputProvider<String> i)
throws Exception {
invocations.wasProcessElementInvoked = true;
assertSame(c, mockContext);
- assertSame(w, mockWindowingInternals);
+ assertSame(i, mockInputProvider);
}
});
@@ -513,7 +538,7 @@ public class DoFnReflectorTest {
thrown.expectMessage(
"Integer is not a valid context parameter for method "
+ getClass().getName() + "#badExtraProcessContext(ProcessContext, Integer)"
- + ". Should be one of [BoundedWindow, WindowingInternals<Integer, String>]");
+ + ". Should be one of [BoundedWindow]");
DoFnReflector.verifyProcessMethodArguments(
getClass().getDeclaredMethod("badExtraProcessContext",
@@ -534,102 +559,148 @@ public class DoFnReflectorTest {
}
@SuppressWarnings("unused")
- private void goodGenerics(DoFn<Integer, String>.ProcessContext c,
- WindowingInternals<Integer, String> i1) {}
+ private void goodGenerics(
+ DoFn<Integer, String>.ProcessContext c,
+ DoFn.InputProvider<Integer> input,
+ DoFn.OutputReceiver<String> output) {}
@Test
public void testValidGenerics() throws Exception {
- Method method = getClass().getDeclaredMethod("goodGenerics",
- DoFn.ProcessContext.class, WindowingInternals.class);
+ Method method =
+ getClass()
+ .getDeclaredMethod(
+ "goodGenerics",
+ DoFn.ProcessContext.class,
+ DoFn.InputProvider.class,
+ DoFn.OutputReceiver.class);
DoFnReflector.verifyProcessMethodArguments(method);
}
@SuppressWarnings("unused")
- private void goodWildcards(DoFn<Integer, String>.ProcessContext c,
- WindowingInternals<?, ?> i1) {}
+ private void goodWildcards(
+ DoFn<Integer, String>.ProcessContext c,
+ DoFn.InputProvider<?> input,
+ DoFn.OutputReceiver<?> output) {}
@Test
public void testGoodWildcards() throws Exception {
- Method method = getClass().getDeclaredMethod("goodWildcards",
- DoFn.ProcessContext.class, WindowingInternals.class);
+ Method method =
+ getClass()
+ .getDeclaredMethod(
+ "goodWildcards",
+ DoFn.ProcessContext.class,
+ DoFn.InputProvider.class,
+ DoFn.OutputReceiver.class);
DoFnReflector.verifyProcessMethodArguments(method);
}
@SuppressWarnings("unused")
- private void goodBoundedWildcards(DoFn<Integer, String>.ProcessContext c,
- WindowingInternals<? super Integer, ? super String> i1) {}
+ private void goodBoundedWildcards(
+ DoFn<Integer, String>.ProcessContext c,
+ DoFn.InputProvider<? super Integer> input,
+ DoFn.OutputReceiver<? super String> output) {}
@Test
public void testGoodBoundedWildcards() throws Exception {
- Method method = getClass().getDeclaredMethod("goodBoundedWildcards",
- DoFn.ProcessContext.class, WindowingInternals.class);
+ Method method =
+ getClass()
+ .getDeclaredMethod(
+ "goodBoundedWildcards",
+ DoFn.ProcessContext.class,
+ DoFn.InputProvider.class,
+ DoFn.OutputReceiver.class);
DoFnReflector.verifyProcessMethodArguments(method);
}
@SuppressWarnings("unused")
private <InputT, OutputT> void goodTypeVariables(
DoFn<InputT, OutputT>.ProcessContext c,
- WindowingInternals<InputT, OutputT> i1) {}
+ DoFn.InputProvider<InputT> input,
+ DoFn.OutputReceiver<OutputT> output) {}
@Test
public void testGoodTypeVariables() throws Exception {
- Method method = getClass().getDeclaredMethod("goodTypeVariables",
- DoFn.ProcessContext.class, WindowingInternals.class);
+ Method method =
+ getClass()
+ .getDeclaredMethod(
+ "goodTypeVariables",
+ DoFn.ProcessContext.class,
+ DoFn.InputProvider.class,
+ DoFn.OutputReceiver.class);
DoFnReflector.verifyProcessMethodArguments(method);
}
@SuppressWarnings("unused")
- private void badGenericTwoArgs(DoFn<Integer, String>.ProcessContext c,
- WindowingInternals<Integer, Integer> i1) {}
+ private void badGenericTwoArgs(
+ DoFn<Integer, String>.ProcessContext c,
+ DoFn.InputProvider<Integer> input,
+ DoFn.OutputReceiver<Integer> output) {}
@Test
public void testBadGenericsTwoArgs() throws Exception {
- Method method = getClass().getDeclaredMethod("badGenericTwoArgs",
- DoFn.ProcessContext.class, WindowingInternals.class);
+ Method method =
+ getClass()
+ .getDeclaredMethod(
+ "badGenericTwoArgs",
+ DoFn.ProcessContext.class,
+ DoFn.InputProvider.class,
+ DoFn.OutputReceiver.class);
thrown.expect(IllegalStateException.class);
thrown.expectMessage("Incompatible generics in context parameter "
- + "WindowingInternals<Integer, Integer> "
+ + "OutputReceiver<Integer> "
+ "for method " + getClass().getName()
- + "#badGenericTwoArgs(ProcessContext, WindowingInternals). Should be "
- + "WindowingInternals<Integer, String>");
+ + "#badGenericTwoArgs(ProcessContext, InputProvider, OutputReceiver). Should be "
+ + "OutputReceiver<String>");
DoFnReflector.verifyProcessMethodArguments(method);
}
@SuppressWarnings("unused")
- private void badGenericWildCards(DoFn<Integer, String>.ProcessContext c,
- WindowingInternals<Integer, ? super Integer> i1) {}
+ private void badGenericWildCards(
+ DoFn<Integer, String>.ProcessContext c,
+ DoFn.InputProvider<Integer> input,
+ DoFn.OutputReceiver<? super Integer> output) {}
@Test
public void testBadGenericWildCards() throws Exception {
- Method method = getClass().getDeclaredMethod("badGenericWildCards",
- DoFn.ProcessContext.class, WindowingInternals.class);
+ Method method =
+ getClass()
+ .getDeclaredMethod(
+ "badGenericWildCards",
+ DoFn.ProcessContext.class,
+ DoFn.InputProvider.class,
+ DoFn.OutputReceiver.class);
thrown.expect(IllegalStateException.class);
thrown.expectMessage("Incompatible generics in context parameter "
- + "WindowingInternals<Integer, ? super Integer> for method "
+ + "OutputReceiver<? super Integer> for method "
+ getClass().getName()
- + "#badGenericWildCards(ProcessContext, WindowingInternals). Should be "
- + "WindowingInternals<Integer, String>");
+ + "#badGenericWildCards(ProcessContext, InputProvider, OutputReceiver). Should be "
+ + "OutputReceiver<String>");
DoFnReflector.verifyProcessMethodArguments(method);
}
@SuppressWarnings("unused")
private <InputT, OutputT> void badTypeVariables(DoFn<InputT, OutputT>.ProcessContext c,
- WindowingInternals<InputT, InputT> i1) {}
+ DoFn.InputProvider<InputT> input, DoFn.OutputReceiver<InputT> output) {}
@Test
public void testBadTypeVariables() throws Exception {
- Method method = getClass().getDeclaredMethod("badTypeVariables",
- DoFn.ProcessContext.class, WindowingInternals.class);
+ Method method =
+ getClass()
+ .getDeclaredMethod(
+ "badTypeVariables",
+ DoFn.ProcessContext.class,
+ DoFn.InputProvider.class,
+ DoFn.OutputReceiver.class);
thrown.expect(IllegalStateException.class);
thrown.expectMessage("Incompatible generics in context parameter "
- + "WindowingInternals<InputT, InputT> for method " + getClass().getName()
- + "#badTypeVariables(ProcessContext, WindowingInternals). Should be "
- + "WindowingInternals<InputT, OutputT>");
+ + "OutputReceiver<InputT> for method " + getClass().getName()
+ + "#badTypeVariables(ProcessContext, InputProvider, OutputReceiver). Should be "
+ + "OutputReceiver<OutputT>");
DoFnReflector.verifyProcessMethodArguments(method);
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/20208d68/sdks/java/microbenchmarks/src/main/java/org/apache/beam/sdk/microbenchmarks/transforms/DoFnReflectorBenchmark.java
----------------------------------------------------------------------
diff --git a/sdks/java/microbenchmarks/src/main/java/org/apache/beam/sdk/microbenchmarks/transforms/DoFnReflectorBenchmark.java b/sdks/java/microbenchmarks/src/main/java/org/apache/beam/sdk/microbenchmarks/transforms/DoFnReflectorBenchmark.java
index 233b8be..91ecd16 100644
--- a/sdks/java/microbenchmarks/src/main/java/org/apache/beam/sdk/microbenchmarks/transforms/DoFnReflectorBenchmark.java
+++ b/sdks/java/microbenchmarks/src/main/java/org/apache/beam/sdk/microbenchmarks/transforms/DoFnReflectorBenchmark.java
@@ -58,18 +58,7 @@ public class DoFnReflectorBenchmark {
private StubDoFnProcessContext stubDoFnContext =
new StubDoFnProcessContext(doFn, ELEMENT);
private ExtraContextFactory<String, String> extraContextFactory =
- new ExtraContextFactory<String, String>() {
-
- @Override
- public BoundedWindow window() {
- return null;
- }
-
- @Override
- public WindowingInternals<String, String> windowingInternals() {
- return null;
- }
- };
+ new DoFn.FakeExtraContextFactory<>();
private DoFnReflector doFnReflector;
private OldDoFn<String, String> adaptedDoFnWithContext;