You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/10/07 03:18:39 UTC
[1/2] incubator-beam git commit: Add DoFnInvoker for OldDoFn,
for migration ease
Repository: incubator-beam
Updated Branches:
refs/heads/master 9b71f1636 -> 03b89c065
Add DoFnInvoker for OldDoFn, for migration ease
This allows any runner to use DoFnInvokers.invokerFor(Object) to be
agnostic as to whether they are running a DoFn or OldDoFn. Thus,
the migration of the runner can occur in advance of further changes
to the SDK and deployment can be independent. For example, a backend
need not know whether it is deserializing a DoFn or OldDoFn.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/e2a24f3c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/e2a24f3c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/e2a24f3c
Branch: refs/heads/master
Commit: e2a24f3c2668c7341b38cc56d331cefd3a69f27f
Parents: 8462acb
Author: Kenneth Knowles <kl...@google.com>
Authored: Tue Oct 4 13:56:13 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Oct 6 20:16:09 2016 -0700
----------------------------------------------------------------------
.../org/apache/beam/sdk/transforms/DoFn.java | 40 ++++++
.../beam/sdk/transforms/DoFnAdapters.java | 142 ++++++++++++++++++-
.../sdk/transforms/reflect/DoFnInvokers.java | 86 +++++++++++
.../transforms/reflect/DoFnInvokersTest.java | 46 ++++++
.../transforms/DoFnInvokersBenchmark.java | 7 +
5 files changed, 318 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e2a24f3c/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
index 59c8323..fb7fbd4 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
@@ -29,6 +29,8 @@ import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
import java.util.HashMap;
import java.util.Map;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.Combine.CombineFn;
import org.apache.beam.sdk.transforms.OldDoFn.DelegatingAggregator;
@@ -37,6 +39,7 @@ import org.apache.beam.sdk.transforms.display.HasDisplayData;
import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.util.WindowingInternals;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TypeDescriptor;
@@ -185,6 +188,23 @@ public abstract class DoFn<InputT, OutputT> implements Serializable, HasDisplayD
*/
public abstract <T> void sideOutputWithTimestamp(
TupleTag<T> tag, T output, Instant timestamp);
+
+ /**
+ * Creates an {@link Aggregator} in the {@link DoFn} context with the specified name and
+ * aggregation logic specified by {@link CombineFn}. This is to be overridden by a particular
+ * runner context with an implementation that delivers the values as appropriate.
+ *
+ * <p>The aggregators declared on the {@link DoFn} will be wired up to aggregators allocated via
+ * this method.
+ *
+ * @param name the name of the aggregator
+ * @param combiner the {@link CombineFn} to use in the aggregator
+ * @return an aggregator for the provided name and {@link CombineFn} in this context
+ */
+ @Experimental(Kind.AGGREGATOR)
+ protected abstract <AggInputT, AggOutputT>
+ Aggregator<AggInputT, AggOutputT> createAggregator(
+ String name, CombineFn<AggInputT, ?, AggOutputT> combiner);
}
/**
@@ -306,6 +326,21 @@ public abstract class DoFn<InputT, OutputT> implements Serializable, HasDisplayD
* A placeholder for testing purposes.
*/
OutputReceiver<OutputT> outputReceiver();
+
+ /**
+ * For migration from {@link OldDoFn} to {@link DoFn}, provide
+ * a {@link WindowingInternals} so an {@link OldDoFn} can be run
+ * via {@link DoFnInvoker}.
+ *
+ * <p>This is <i>not</i> exposed via the reflective capabilities
+ * of {@link DoFn}.
+ *
+ * @deprecated Please port occurences of {@link OldDoFn} to {@link DoFn}. If they require
+ * state and timers, they will need to wait for the arrival of those features. Do not introduce
+ * new uses of this method.
+ */
+ @Deprecated
+ WindowingInternals<InputT, OutputT> windowingInternals();
}
/** A placeholder for testing handling of output types during {@link DoFn} reflection. */
@@ -335,6 +370,11 @@ public abstract class DoFn<InputT, OutputT> implements Serializable, HasDisplayD
public OutputReceiver<OutputT> outputReceiver() {
return null;
}
+
+ @Override
+ public WindowingInternals<InputT, OutputT> windowingInternals() {
+ return null;
+ }
}
/////////////////////////////////////////////////////////////////////////////
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e2a24f3c/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java
index 77a71e9..7b259aa 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java
@@ -19,6 +19,7 @@ package org.apache.beam.sdk.transforms;
import java.io.IOException;
import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.Combine.CombineFn;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
@@ -26,6 +27,7 @@ import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.util.WindowingInternals;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TypeDescriptor;
@@ -33,7 +35,7 @@ import org.joda.time.Duration;
import org.joda.time.Instant;
/**
- * Utility class containing adapters for running a {@link DoFn} as an {@link OldDoFn}.
+ * Utility class containing adapters to/from {@link DoFn} and {@link OldDoFn}.
*
* @deprecated This class will go away when we start running {@link DoFn}'s directly (using
* {@link DoFnInvoker}) rather than via {@link OldDoFn}.
@@ -65,6 +67,113 @@ public class DoFnAdapters {
}
}
+ /** Creates a {@link OldDoFn.ProcessContext} from a {@link DoFn.ProcessContext}. */
+ public static <InputT, OutputT> OldDoFn<InputT, OutputT>.ProcessContext adaptProcessContext(
+ OldDoFn<InputT, OutputT> fn,
+ final DoFn<InputT, OutputT>.ProcessContext c,
+ final DoFn.ExtraContextFactory<InputT, OutputT> extra) {
+ return fn.new ProcessContext() {
+ @Override
+ public InputT element() {
+ return c.element();
+ }
+
+ @Override
+ public <T> T sideInput(PCollectionView<T> view) {
+ return c.sideInput(view);
+ }
+
+ @Override
+ public Instant timestamp() {
+ return c.timestamp();
+ }
+
+ @Override
+ public BoundedWindow window() {
+ return extra.window();
+ }
+
+ @Override
+ public PaneInfo pane() {
+ return c.pane();
+ }
+
+ @Override
+ public WindowingInternals<InputT, OutputT> windowingInternals() {
+ return extra.windowingInternals();
+ }
+
+ @Override
+ public PipelineOptions getPipelineOptions() {
+ return c.getPipelineOptions();
+ }
+
+ @Override
+ public void output(OutputT output) {
+ c.output(output);
+ }
+
+ @Override
+ public void outputWithTimestamp(OutputT output, Instant timestamp) {
+ c.outputWithTimestamp(output, timestamp);
+ }
+
+ @Override
+ public <T> void sideOutput(TupleTag<T> tag, T output) {
+ c.sideOutput(tag, output);
+ }
+
+ @Override
+ public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
+ c.sideOutputWithTimestamp(tag, output, timestamp);
+ }
+
+ @Override
+ protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregatorInternal(
+ String name, CombineFn<AggInputT, ?, AggOutputT> combiner) {
+ return c.createAggregator(name, combiner);
+ }
+ };
+ }
+
+ /** Creates a {@link OldDoFn.ProcessContext} from a {@link DoFn.ProcessContext}. */
+ public static <InputT, OutputT> OldDoFn<InputT, OutputT>.Context adaptContext(
+ OldDoFn<InputT, OutputT> fn,
+ final DoFn<InputT, OutputT>.Context c) {
+ return fn.new Context() {
+ @Override
+ public PipelineOptions getPipelineOptions() {
+ return c.getPipelineOptions();
+ }
+
+ @Override
+ public void output(OutputT output) {
+ c.output(output);
+ }
+
+ @Override
+ public void outputWithTimestamp(OutputT output, Instant timestamp) {
+ c.outputWithTimestamp(output, timestamp);
+ }
+
+ @Override
+ public <T> void sideOutput(TupleTag<T> tag, T output) {
+ c.sideOutput(tag, output);
+ }
+
+ @Override
+ public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
+ c.sideOutputWithTimestamp(tag, output, timestamp);
+ }
+
+ @Override
+ protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregatorInternal(
+ String name, CombineFn<AggInputT, ?, AggOutputT> combiner) {
+ return c.createAggregator(name, combiner);
+ }
+ };
+ }
+
/**
* Wraps a {@link DoFn} that doesn't require access to {@link BoundedWindow} as an {@link
* OldDoFn}.
@@ -183,10 +292,26 @@ public class DoFnAdapters {
}
@Override
+ protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregator(
+ String name,
+ CombineFn<AggInputT, ?, AggOutputT> combiner) {
+ return context.createAggregatorInternal(name, combiner);
+ }
+
+ @Override
public BoundedWindow window() {
- // The DoFn doesn't allow us to ask for these outside ProcessElements, so this
+ // The OldDoFn doesn't allow us to ask for these outside processElement, so this
// should be unreachable.
- throw new UnsupportedOperationException("Can only get the window in ProcessElements");
+ throw new UnsupportedOperationException(
+ "Can only get the window in processElement; elsewhere there is no defined window.");
+ }
+
+ @Override
+ public WindowingInternals<InputT, OutputT> windowingInternals() {
+ // The OldDoFn doesn't allow us to ask for these outside ProcessElements, so this
+ // should be unreachable.
+ throw new UnsupportedOperationException(
+ "Can only get WindowingInternals in processElement");
}
@Override
@@ -247,6 +372,12 @@ public class DoFnAdapters {
}
@Override
+ protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregator(
+ String name, CombineFn<AggInputT, ?, AggOutputT> combiner) {
+ return context.createAggregatorInternal(name, combiner);
+ }
+
+ @Override
public InputT element() {
return context.element();
}
@@ -267,6 +398,11 @@ public class DoFnAdapters {
}
@Override
+ public WindowingInternals<InputT, OutputT> windowingInternals() {
+ return context.windowingInternals();
+ }
+
+ @Override
public DoFn.InputProvider<InputT> inputProvider() {
throw new UnsupportedOperationException("inputProvider() exists only for testing");
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e2a24f3c/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java
index edc1dc0..041eb60 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java
@@ -58,7 +58,10 @@ import net.bytebuddy.jar.asm.Opcodes;
import net.bytebuddy.jar.asm.Type;
import net.bytebuddy.matcher.ElementMatchers;
import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.DoFn.ExtraContextFactory;
import org.apache.beam.sdk.transforms.DoFn.ProcessElement;
+import org.apache.beam.sdk.transforms.DoFnAdapters;
+import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.util.UserCodeException;
/** Dynamically generates {@link DoFnInvoker} instances for invoking a {@link DoFn}. */
@@ -77,6 +80,89 @@ public class DoFnInvokers {
private DoFnInvokers() {}
+ /**
+ * Creates a {@link DoFnInvoker} for the given {@link Object}, which should be either a
+ * {@link DoFn} or an {@link OldDoFn}. The expected use would be to deserialize a user's
+ * function as an {@link Object} and then pass it to this method, so there is no need to
+ * statically specify what sort of object it is.
+ *
+ * @deprecated this is to be used only as a migration path for decoupling upgrades
+ */
+ @Deprecated
+ public DoFnInvoker<?, ?> invokerFor(Object deserializedFn) {
+ if (deserializedFn instanceof DoFn) {
+ return newByteBuddyInvoker((DoFn<?, ?>) deserializedFn);
+ } else if (deserializedFn instanceof OldDoFn){
+ return new OldDoFnInvoker<>((OldDoFn<?, ?>) deserializedFn);
+ } else {
+ throw new IllegalArgumentException(String.format(
+ "Cannot create a %s for %s; it should be either a %s or an %s.",
+ DoFnInvoker.class.getSimpleName(),
+ deserializedFn.toString(),
+ DoFn.class.getSimpleName(),
+ OldDoFn.class.getSimpleName()));
+ }
+ }
+
+ static class OldDoFnInvoker<InputT, OutputT> implements DoFnInvoker<InputT, OutputT> {
+
+ private final OldDoFn<InputT, OutputT> fn;
+
+ public OldDoFnInvoker(OldDoFn<InputT, OutputT> fn) {
+ this.fn = fn;
+ }
+
+ @Override
+ public void invokeProcessElement(
+ DoFn<InputT, OutputT>.ProcessContext c, ExtraContextFactory<InputT, OutputT> extra) {
+ OldDoFn<InputT, OutputT>.ProcessContext oldCtx =
+ DoFnAdapters.adaptProcessContext(fn, c, extra);
+ try {
+ fn.processElement(oldCtx);
+ } catch (Throwable exc) {
+ throw UserCodeException.wrap(exc);
+ }
+ }
+
+ @Override
+ public void invokeStartBundle(DoFn.Context c) {
+ OldDoFn<InputT, OutputT>.Context oldCtx = DoFnAdapters.adaptContext(fn, c);
+ try {
+ fn.startBundle(oldCtx);
+ } catch (Throwable exc) {
+ throw UserCodeException.wrap(exc);
+ }
+ }
+
+ @Override
+ public void invokeFinishBundle(DoFn.Context c) {
+ OldDoFn<InputT, OutputT>.Context oldCtx = DoFnAdapters.adaptContext(fn, c);
+ try {
+ fn.finishBundle(oldCtx);
+ } catch (Throwable exc) {
+ throw UserCodeException.wrap(exc);
+ }
+ }
+
+ @Override
+ public void invokeSetup() {
+ try {
+ fn.setup();
+ } catch (Throwable exc) {
+ throw UserCodeException.wrap(exc);
+ }
+ }
+
+ @Override
+ public void invokeTeardown() {
+ try {
+ fn.teardown();
+ } catch (Throwable exc) {
+ throw UserCodeException.wrap(exc);
+ }
+ }
+ }
+
/** @return the {@link DoFnInvoker} for the given {@link DoFn}. */
public <InputT, OutputT> DoFnInvoker<InputT, OutputT> newByteBuddyInvoker(
DoFn<InputT, OutputT> fn) {
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e2a24f3c/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
index e59cce8..97d810c 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
@@ -18,13 +18,16 @@
package org.apache.beam.sdk.transforms.reflect;
import static org.junit.Assert.assertSame;
+import static org.mockito.Mockito.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.reflect.testhelper.DoFnInvokersTestHelper;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.UserCodeException;
+import org.apache.beam.sdk.util.WindowingInternals;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
@@ -43,6 +46,9 @@ public class DoFnInvokersTest {
@Mock private BoundedWindow mockWindow;
@Mock private DoFn.InputProvider<String> mockInputProvider;
@Mock private DoFn.OutputReceiver<String> mockOutputReceiver;
+ @Mock private WindowingInternals<String, String> mockWindowingInternals;
+
+ @Mock private OldDoFn<String, String> mockOldDoFn;
private DoFn.ExtraContextFactory<String, String> extraContextFactory;
@@ -65,6 +71,11 @@ public class DoFnInvokersTest {
public DoFn.OutputReceiver<String> outputReceiver() {
return mockOutputReceiver;
}
+
+ @Override
+ public WindowingInternals<String, String> windowingInternals() {
+ return mockWindowingInternals;
+ }
};
}
@@ -326,4 +337,39 @@ public class DoFnInvokersTest {
thrown.expectMessage("bogus");
invoker.invokeFinishBundle(null);
}
+
+ private class OldDoFnIdentity extends OldDoFn<String, String> {
+ public void processElement(ProcessContext c) {}
+ }
+
+ @Test
+ public void testOldDoFnProcessElement() throws Exception {
+ new DoFnInvokers.OldDoFnInvoker<>(mockOldDoFn)
+ .invokeProcessElement(mockContext, extraContextFactory);
+ verify(mockOldDoFn).processElement(any(OldDoFn.ProcessContext.class));
+ }
+
+ @Test
+ public void testOldDoFnStartBundle() throws Exception {
+ new DoFnInvokers.OldDoFnInvoker<>(mockOldDoFn).invokeStartBundle(mockContext);
+ verify(mockOldDoFn).startBundle(any(OldDoFn.Context.class));
+ }
+
+ @Test
+ public void testOldDoFnFinishBundle() throws Exception {
+ new DoFnInvokers.OldDoFnInvoker<>(mockOldDoFn).invokeFinishBundle(mockContext);
+ verify(mockOldDoFn).finishBundle(any(OldDoFn.Context.class));
+ }
+
+ @Test
+ public void testOldDoFnSetup() throws Exception {
+ new DoFnInvokers.OldDoFnInvoker<>(mockOldDoFn).invokeSetup();
+ verify(mockOldDoFn).setup();
+ }
+
+ @Test
+ public void testOldDoFnTeardown() throws Exception {
+ new DoFnInvokers.OldDoFnInvoker<>(mockOldDoFn).invokeTeardown();
+ verify(mockOldDoFn).teardown();
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e2a24f3c/sdks/java/microbenchmarks/src/main/java/org/apache/beam/sdk/microbenchmarks/transforms/DoFnInvokersBenchmark.java
----------------------------------------------------------------------
diff --git a/sdks/java/microbenchmarks/src/main/java/org/apache/beam/sdk/microbenchmarks/transforms/DoFnInvokersBenchmark.java b/sdks/java/microbenchmarks/src/main/java/org/apache/beam/sdk/microbenchmarks/transforms/DoFnInvokersBenchmark.java
index a574ed8..80324b9 100644
--- a/sdks/java/microbenchmarks/src/main/java/org/apache/beam/sdk/microbenchmarks/transforms/DoFnInvokersBenchmark.java
+++ b/sdks/java/microbenchmarks/src/main/java/org/apache/beam/sdk/microbenchmarks/transforms/DoFnInvokersBenchmark.java
@@ -220,5 +220,12 @@ public class DoFnInvokersBenchmark {
@Override
public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {}
+
+ @Override
+ protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregator(
+ String name,
+ CombineFn<AggInputT, ?, AggOutputT> combiner) {
+ return null;
+ }
}
}
[2/2] incubator-beam git commit: This closes #1049
Posted by ke...@apache.org.
This closes #1049
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/03b89c06
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/03b89c06
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/03b89c06
Branch: refs/heads/master
Commit: 03b89c065aa7a0a6d86aeb105dc9164f071e28a5
Parents: 9b71f16 e2a24f3
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu Oct 6 20:18:33 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Oct 6 20:18:33 2016 -0700
----------------------------------------------------------------------
.../org/apache/beam/sdk/transforms/DoFn.java | 40 ++++++
.../beam/sdk/transforms/DoFnAdapters.java | 142 ++++++++++++++++++-
.../sdk/transforms/reflect/DoFnInvokers.java | 86 +++++++++++
.../transforms/reflect/DoFnInvokersTest.java | 46 ++++++
.../transforms/DoFnInvokersBenchmark.java | 7 +
5 files changed, 318 insertions(+), 3 deletions(-)
----------------------------------------------------------------------