You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/12/15 22:29:04 UTC
[04/10] incubator-beam git commit: Removes code for wrapping DoFn as
an OldDoFn
Removes code for wrapping DoFn as an OldDoFn
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/a22de150
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/a22de150
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/a22de150
Branch: refs/heads/master
Commit: a22de15012c51e8b7e31143021f0a298e093bf51
Parents: e9e53c5
Author: Eugene Kirpichov <ki...@google.com>
Authored: Fri Dec 9 17:21:40 2016 -0800
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Thu Dec 15 13:58:43 2016 -0800
----------------------------------------------------------------------
.../beam/sdk/transforms/DoFnAdapters.java | 150 ----------
.../org/apache/beam/sdk/transforms/OldDoFn.java | 295 +------------------
.../sdk/transforms/reflect/DoFnInvokers.java | 141 +--------
.../transforms/reflect/DoFnInvokersTest.java | 36 ---
4 files changed, 11 insertions(+), 611 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a22de150/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java
index e15b08b..d1c40a6 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java
@@ -18,8 +18,6 @@
package org.apache.beam.sdk.transforms;
import java.io.IOException;
-import java.util.Collection;
-import javax.annotation.Nullable;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.Combine.CombineFn;
import org.apache.beam.sdk.transforms.DoFn.Context;
@@ -38,7 +36,6 @@ import org.apache.beam.sdk.util.WindowingInternals;
import org.apache.beam.sdk.util.state.State;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
-import org.apache.beam.sdk.values.TypeDescriptor;
import org.joda.time.Duration;
import org.joda.time.Instant;
@@ -53,18 +50,6 @@ public class DoFnAdapters {
/** Should not be instantiated. */
private DoFnAdapters() {}
- /**
- * If this is an {@link OldDoFn} produced via {@link #toOldDoFn}, returns the class of the
- * original {@link DoFn}, otherwise returns {@code fn.getClass()}.
- */
- public static Class<?> getDoFnClass(OldDoFn<?, ?> fn) {
- if (fn instanceof SimpleDoFnAdapter) {
- return ((SimpleDoFnAdapter<?, ?>) fn).fn.getClass();
- } else {
- return fn.getClass();
- }
- }
-
/** Creates an {@link OldDoFn} that delegates to the {@link DoFn}. */
@SuppressWarnings({"unchecked", "rawtypes"})
public static <InputT, OutputT> OldDoFn<InputT, OutputT> toOldDoFn(DoFn<InputT, OutputT> fn) {
@@ -76,126 +61,6 @@ public class DoFnAdapters {
}
}
- /** Creates a {@link OldDoFn.ProcessContext} from a {@link DoFn.ProcessContext}. */
- public static <InputT, OutputT> OldDoFn<InputT, OutputT>.ProcessContext adaptProcessContext(
- OldDoFn<InputT, OutputT> fn,
- final DoFn<InputT, OutputT>.ProcessContext c,
- final DoFnInvoker.ArgumentProvider<InputT, OutputT> extra) {
- return fn.new ProcessContext() {
- @Override
- public InputT element() {
- return c.element();
- }
-
- @Override
- public <T> T sideInput(PCollectionView<T> view) {
- return c.sideInput(view);
- }
-
- @Override
- public Instant timestamp() {
- return c.timestamp();
- }
-
- @Override
- public BoundedWindow window() {
- return extra.window();
- }
-
- @Override
- public PaneInfo pane() {
- return c.pane();
- }
-
- @Override
- public WindowingInternals<InputT, OutputT> windowingInternals() {
- return extra.windowingInternals();
- }
-
- @Override
- public PipelineOptions getPipelineOptions() {
- return c.getPipelineOptions();
- }
-
- @Override
- public void output(OutputT output) {
- c.output(output);
- }
-
- @Override
- public void outputWithTimestamp(OutputT output, Instant timestamp) {
- c.outputWithTimestamp(output, timestamp);
- }
-
- @Override
- public <T> void sideOutput(TupleTag<T> tag, T output) {
- c.sideOutput(tag, output);
- }
-
- @Override
- public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
- c.sideOutputWithTimestamp(tag, output, timestamp);
- }
-
- @Override
- protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregatorInternal(
- String name, CombineFn<AggInputT, ?, AggOutputT> combiner) {
- return c.createAggregator(name, combiner);
- }
- };
- }
-
- /** Creates a {@link OldDoFn.ProcessContext} from a {@link DoFn.ProcessContext}. */
- public static <InputT, OutputT> OldDoFn<InputT, OutputT>.Context adaptContext(
- OldDoFn<InputT, OutputT> fn,
- final DoFn<InputT, OutputT>.Context c) {
- return fn.new Context() {
- @Override
- public PipelineOptions getPipelineOptions() {
- return c.getPipelineOptions();
- }
-
- @Override
- public void output(OutputT output) {
- c.output(output);
- }
-
- @Override
- public void outputWithTimestamp(OutputT output, Instant timestamp) {
- c.outputWithTimestamp(output, timestamp);
- }
-
- @Override
- public <T> void sideOutput(TupleTag<T> tag, T output) {
- c.sideOutput(tag, output);
- }
-
- @Override
- public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
- c.sideOutputWithTimestamp(tag, output, timestamp);
- }
-
- @Override
- protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregatorInternal(
- String name, CombineFn<AggInputT, ?, AggOutputT> combiner) {
- return c.createAggregator(name, combiner);
- }
- };
- }
-
- /**
- * If the fn was created using {@link #toOldDoFn}, returns the original {@link DoFn}. Otherwise,
- * returns {@code null}.
- */
- @Nullable
- public static <InputT, OutputT> DoFn<InputT, OutputT> getDoFn(OldDoFn<InputT, OutputT> fn) {
- if (fn instanceof SimpleDoFnAdapter) {
- return ((SimpleDoFnAdapter<InputT, OutputT>) fn).fn;
- } else {
- return null;
- }
- }
-
/**
* Wraps a {@link DoFn} that doesn't require access to {@link BoundedWindow} as an {@link
* OldDoFn}.
@@ -238,21 +103,6 @@ public class DoFnAdapters {
}
@Override
- protected TypeDescriptor<InputT> getInputTypeDescriptor() {
- return fn.getInputTypeDescriptor();
- }
-
- @Override
- protected TypeDescriptor<OutputT> getOutputTypeDescriptor() {
- return fn.getOutputTypeDescriptor();
- }
-
- @Override
- Collection<Aggregator<?, ?>> getAggregators() {
- return fn.getAggregators();
- }
-
- @Override
public Duration getAllowedTimestampSkew() {
return fn.getAllowedTimestampSkew();
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a22de150/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java
index 2d2c1fd..0aef552 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java
@@ -38,7 +38,6 @@ import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.WindowingInternals;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
-import org.apache.beam.sdk.values.TypeDescriptor;
import org.joda.time.Duration;
import org.joda.time.Instant;
@@ -71,21 +70,6 @@ import org.joda.time.Instant;
*/
@Deprecated
public abstract class OldDoFn<InputT, OutputT> implements Serializable, HasDisplayData {
-
- public DoFn<InputT, OutputT> toDoFn() {
- DoFn<InputT, OutputT> doFn = DoFnAdapters.getDoFn(this);
- if (doFn != null) {
- return doFn;
- }
- if (this instanceof RequiresWindowAccess) {
- // No parameters as it just accesses `this`
- return new AdaptedRequiresWindowAccessDoFn();
- } else {
- // No parameters as it just accesses `this`
- return new AdaptedDoFn();
- }
- }
-
/**
* Information accessible to all methods in this {@code OldDoFn}.
* Used primarily to output elements.
@@ -334,7 +318,7 @@ public abstract class OldDoFn<InputT, OutputT> implements Serializable, HasDispl
this(new HashMap<String, DelegatingAggregator<?, ?>>());
}
- OldDoFn(Map<String, DelegatingAggregator<?, ?>> aggregators) {
+ public OldDoFn(Map<String, DelegatingAggregator<?, ?>> aggregators) {
this.aggregators = aggregators;
}
@@ -419,32 +403,6 @@ public abstract class OldDoFn<InputT, OutputT> implements Serializable, HasDispl
/////////////////////////////////////////////////////////////////////////////
/**
- * Returns a {@link TypeDescriptor} capturing what is known statically
- * about the input type of this {@code OldDoFn} instance's most-derived
- * class.
- *
- * <p>See {@link #getOutputTypeDescriptor} for more discussion.
- */
- protected TypeDescriptor<InputT> getInputTypeDescriptor() {
- return new TypeDescriptor<InputT>(getClass()) {};
- }
-
- /**
- * Returns a {@link TypeDescriptor} capturing what is known statically
- * about the output type of this {@code OldDoFn} instance's
- * most-derived class.
- *
- * <p>In the normal case of a concrete {@code OldDoFn} subclass with
- * no generic type parameters of its own (including anonymous inner
- * classes), this will be a complete non-generic type, which is good
- * for choosing a default output {@code Coder<OutputT>} for the output
- * {@code PCollection<OutputT>}.
- */
- protected TypeDescriptor<OutputT> getOutputTypeDescriptor() {
- return new TypeDescriptor<OutputT>(getClass()) {};
- }
-
- /**
* Returns an {@link Aggregator} with aggregation logic specified by the
* {@link CombineFn} argument. The name provided must be unique across
* {@link Aggregator}s created within the OldDoFn. Aggregators can only be created
@@ -504,255 +462,4 @@ public abstract class OldDoFn<InputT, OutputT> implements Serializable, HasDispl
Collection<Aggregator<?, ?>> getAggregators() {
return Collections.<Aggregator<?, ?>>unmodifiableCollection(aggregators.values());
}
-
- /**
- * A {@link Context} for an {@link OldDoFn} via a context for a proper {@link DoFn}.
- */
- private class AdaptedContext extends Context {
-
- private final DoFn<InputT, OutputT>.Context newContext;
-
- public AdaptedContext(
- DoFn<InputT, OutputT>.Context newContext) {
- this.newContext = newContext;
- super.setupDelegateAggregators();
- }
-
- @Override
- public PipelineOptions getPipelineOptions() {
- return newContext.getPipelineOptions();
- }
-
- @Override
- public void output(OutputT output) {
- newContext.output(output);
- }
-
- @Override
- public void outputWithTimestamp(OutputT output, Instant timestamp) {
- newContext.outputWithTimestamp(output, timestamp);
- }
-
- @Override
- public <T> void sideOutput(TupleTag<T> tag, T output) {
- newContext.sideOutput(tag, output);
- }
-
- @Override
- public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
- newContext.sideOutputWithTimestamp(tag, output, timestamp);
- }
-
- @Override
- protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregatorInternal(
- String name, CombineFn<AggInputT, ?, AggOutputT> combiner) {
- return newContext.createAggregator(name, combiner);
- }
- }
-
- /**
- * A {@link ProcessContext} for an {@link OldDoFn} via a context for a proper {@link DoFn}.
- */
- private class AdaptedProcessContext extends ProcessContext {
-
- private final DoFn<InputT, OutputT>.ProcessContext newContext;
-
- public AdaptedProcessContext(DoFn<InputT, OutputT>.ProcessContext newContext) {
- this.newContext = newContext;
- }
-
- @Override
- public InputT element() {
- return newContext.element();
- }
-
- @Override
- public <T> T sideInput(PCollectionView<T> view) {
- return newContext.sideInput(view);
- }
-
- @Override
- public Instant timestamp() {
- return newContext.timestamp();
- }
-
- @Override
- public BoundedWindow window() {
- throw new UnsupportedOperationException(String.format(
- "%s.%s.windowingInternals() is no longer supported. Please convert your %s to a %s",
- OldDoFn.class.getSimpleName(),
- OldDoFn.ProcessContext.class.getSimpleName(),
- OldDoFn.class.getSimpleName(),
- DoFn.class.getSimpleName()));
- }
-
- @Override
- public PaneInfo pane() {
- return newContext.pane();
- }
-
- @Override
- public WindowingInternals<InputT, OutputT> windowingInternals() {
- throw new UnsupportedOperationException(String.format(
- "%s.%s.windowingInternals() is no longer supported. Please convert your %s to a %s",
- OldDoFn.class.getSimpleName(),
- OldDoFn.ProcessContext.class.getSimpleName(),
- OldDoFn.class.getSimpleName(),
- DoFn.class.getSimpleName()));
- }
-
- @Override
- public PipelineOptions getPipelineOptions() {
- return newContext.getPipelineOptions();
- }
-
- @Override
- public void output(OutputT output) {
- newContext.output(output);
- }
-
- @Override
- public void outputWithTimestamp(OutputT output, Instant timestamp) {
- newContext.outputWithTimestamp(output, timestamp);
- }
-
- @Override
- public <T> void sideOutput(TupleTag<T> tag, T output) {
- newContext.sideOutput(tag, output);
- }
-
- @Override
- public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
- newContext.sideOutputWithTimestamp(tag, output, timestamp);
- }
-
- @Override
- protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregatorInternal(
- String name, CombineFn<AggInputT, ?, AggOutputT> combiner) {
- return newContext.createAggregator(name, combiner);
- }
- }
-
- private class AdaptedDoFn extends DoFn<InputT, OutputT> {
-
- @Setup
- public void setup() throws Exception {
- OldDoFn.this.setup();
- }
-
- @StartBundle
- public void startBundle(Context c) throws Exception {
- OldDoFn.this.startBundle(OldDoFn.this.new AdaptedContext(c));
- }
-
- @ProcessElement
- public void processElement(ProcessContext c) throws Exception {
- OldDoFn.this.processElement(OldDoFn.this.new AdaptedProcessContext(c));
- }
-
- @FinishBundle
- public void finishBundle(Context c) throws Exception {
- OldDoFn.this.finishBundle(OldDoFn.this.new AdaptedContext(c));
- }
-
- @Teardown
- public void teardown() throws Exception {
- OldDoFn.this.teardown();
- }
-
- @Override
- public Duration getAllowedTimestampSkew() {
- return OldDoFn.this.getAllowedTimestampSkew();
- }
-
- @Override
- public void populateDisplayData(DisplayData.Builder builder) {
- OldDoFn.this.populateDisplayData(builder);
- }
-
- @Override
- public TypeDescriptor<InputT> getInputTypeDescriptor() {
- return OldDoFn.this.getInputTypeDescriptor();
- }
-
- @Override
- Collection<Aggregator<?, ?>> getAggregators() {
- return OldDoFn.this.getAggregators();
- }
-
- @Override
- public TypeDescriptor<OutputT> getOutputTypeDescriptor() {
- return OldDoFn.this.getOutputTypeDescriptor();
- }
- }
-
- /**
- * A {@link ProcessContext} for an {@link OldDoFn} that implements
- * {@link RequiresWindowAccess}, via a context for a proper {@link DoFn}.
- */
- private class AdaptedRequiresWindowAccessProcessContext extends AdaptedProcessContext {
-
- private final BoundedWindow window;
-
- public AdaptedRequiresWindowAccessProcessContext(
- DoFn<InputT, OutputT>.ProcessContext newContext,
- BoundedWindow window) {
- super(newContext);
- this.window = window;
- }
-
- @Override
- public BoundedWindow window() {
- return window;
- }
- }
-
- private class AdaptedRequiresWindowAccessDoFn extends DoFn<InputT, OutputT> {
-
- @Setup
- public void setup() throws Exception {
- OldDoFn.this.setup();
- }
-
- @StartBundle
- public void startBundle(Context c) throws Exception {
- OldDoFn.this.startBundle(OldDoFn.this.new AdaptedContext(c));
- }
-
- @ProcessElement
- public void processElement(ProcessContext c, BoundedWindow window) throws Exception {
- OldDoFn.this.processElement(
- OldDoFn.this.new AdaptedRequiresWindowAccessProcessContext(c, window));
- }
-
- @FinishBundle
- public void finishBundle(Context c) throws Exception {
- OldDoFn.this.finishBundle(OldDoFn.this.new AdaptedContext(c));
- }
-
- @Teardown
- public void teardown() throws Exception {
- OldDoFn.this.teardown();
- }
-
- @Override
- public Duration getAllowedTimestampSkew() {
- return OldDoFn.this.getAllowedTimestampSkew();
- }
-
- @Override
- public void populateDisplayData(DisplayData.Builder builder) {
- OldDoFn.this.populateDisplayData(builder);
- }
-
- @Override
- public TypeDescriptor<InputT> getInputTypeDescriptor() {
- return OldDoFn.this.getInputTypeDescriptor();
- }
-
- @Override
- public TypeDescriptor<OutputT> getOutputTypeDescriptor() {
- return OldDoFn.this.getOutputTypeDescriptor();
- }
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a22de150/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java
index 50a7082..b141d51 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java
@@ -18,13 +18,7 @@
package org.apache.beam.sdk.transforms.reflect;
import java.io.Serializable;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFnAdapters;
-import org.apache.beam.sdk.transforms.OldDoFn;
-import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
-import org.apache.beam.sdk.util.UserCodeException;
/** Static utilities for working with {@link DoFnInvoker}. */
public class DoFnInvokers {
@@ -42,137 +36,22 @@ public class DoFnInvokers {
return ByteBuddyDoFnInvokerFactory.only().newByteBuddyInvoker(fn);
}
- private DoFnInvokers() {}
-
/**
- * Returns a {@link DoFnInvoker} for the given {@link Object}, which should be either a {@link
- * DoFn} or an {@link OldDoFn}. The expected use would be to deserialize a user's function as an
- * {@link Object} and then pass it to this method, so there is no need to statically specify what
- * sort of object it is.
+ * Temporarily retained for compatibility with Dataflow worker.
+ * TODO: delete this when Dataflow worker is fixed to call {@link #invokerFor(DoFn)}.
*
- * @deprecated this is to be used only as a migration path for decoupling upgrades
+ * @deprecated Use {@link #invokerFor(DoFn)}.
*/
+ @SuppressWarnings("unchecked")
@Deprecated
- public static DoFnInvoker<?, ?> invokerFor(Serializable deserializedFn) {
+ public static <InputT, OutputT> DoFnInvoker<InputT, OutputT> invokerFor(
+ Serializable deserializedFn) {
if (deserializedFn instanceof DoFn) {
- return invokerFor((DoFn<?, ?>) deserializedFn);
- } else if (deserializedFn instanceof OldDoFn) {
- return new OldDoFnInvoker<>((OldDoFn<?, ?>) deserializedFn);
- } else {
- throw new IllegalArgumentException(
- String.format(
- "Cannot create a %s for %s; it should be either a %s or an %s.",
- DoFnInvoker.class.getSimpleName(),
- deserializedFn.toString(),
- DoFn.class.getSimpleName(),
- OldDoFn.class.getSimpleName()));
+ return invokerFor((DoFn<InputT, OutputT>) deserializedFn);
}
+ throw new UnsupportedOperationException(
+ "Only DoFn supported, was: " + deserializedFn.getClass());
}
- /** @deprecated use {@link DoFnInvokers#invokerFor(DoFn)}. */
- @Deprecated public static final DoFnInvokers INSTANCE = new DoFnInvokers();
-
- /** @deprecated use {@link DoFnInvokers#invokerFor(DoFn)}. */
- @Deprecated
- public <InputT, OutputT> DoFnInvoker<InputT, OutputT> invokerFor(Object deserializedFn) {
- return (DoFnInvoker<InputT, OutputT>) DoFnInvokers.invokerFor((Serializable) deserializedFn);
- }
-
-
- static class OldDoFnInvoker<InputT, OutputT> implements DoFnInvoker<InputT, OutputT> {
-
- private final OldDoFn<InputT, OutputT> fn;
-
- public OldDoFnInvoker(OldDoFn<InputT, OutputT> fn) {
- this.fn = fn;
- }
-
- @Override
- public DoFn.ProcessContinuation invokeProcessElement(
- ArgumentProvider<InputT, OutputT> extra) {
- // The outer DoFn is immaterial - it exists only to avoid typing InputT and OutputT repeatedly
- DoFn<InputT, OutputT>.ProcessContext newCtx =
- extra.processContext(new DoFn<InputT, OutputT>() {});
- OldDoFn<InputT, OutputT>.ProcessContext oldCtx =
- DoFnAdapters.adaptProcessContext(fn, newCtx, extra);
- try {
- fn.processElement(oldCtx);
- return DoFn.ProcessContinuation.stop();
- } catch (Throwable exc) {
- throw UserCodeException.wrap(exc);
- }
- }
-
- @Override
- public void invokeOnTimer(String timerId, ArgumentProvider<InputT, OutputT> arguments) {
- throw new UnsupportedOperationException(
- String.format("Timers are not supported for %s", OldDoFn.class.getSimpleName()));
- }
-
- @Override
- public void invokeStartBundle(DoFn.Context c) {
- OldDoFn<InputT, OutputT>.Context oldCtx = DoFnAdapters.adaptContext(fn, c);
- try {
- fn.startBundle(oldCtx);
- } catch (Throwable exc) {
- throw UserCodeException.wrap(exc);
- }
- }
-
- @Override
- public void invokeFinishBundle(DoFn.Context c) {
- OldDoFn<InputT, OutputT>.Context oldCtx = DoFnAdapters.adaptContext(fn, c);
- try {
- fn.finishBundle(oldCtx);
- } catch (Throwable exc) {
- throw UserCodeException.wrap(exc);
- }
- }
-
- @Override
- public void invokeSetup() {
- try {
- fn.setup();
- } catch (Throwable exc) {
- throw UserCodeException.wrap(exc);
- }
- }
-
- @Override
- public void invokeTeardown() {
- try {
- fn.teardown();
- } catch (Throwable exc) {
- throw UserCodeException.wrap(exc);
- }
- }
-
- @Override
- public <RestrictionT> RestrictionT invokeGetInitialRestriction(InputT element) {
- throw new UnsupportedOperationException("OldDoFn is not splittable");
- }
-
- @Override
- public <RestrictionT> Coder<RestrictionT> invokeGetRestrictionCoder(
- CoderRegistry coderRegistry) {
- throw new UnsupportedOperationException("OldDoFn is not splittable");
- }
-
- @Override
- public <RestrictionT> void invokeSplitRestriction(
- InputT element, RestrictionT restriction, DoFn.OutputReceiver<RestrictionT> receiver) {
- throw new UnsupportedOperationException("OldDoFn is not splittable");
- }
-
- @Override
- public <RestrictionT, TrackerT extends RestrictionTracker<RestrictionT>>
- TrackerT invokeNewTracker(RestrictionT restriction) {
- throw new UnsupportedOperationException("OldDoFn is not splittable");
- }
-
- @Override
- public DoFn<InputT, OutputT> getFn() {
- throw new UnsupportedOperationException("getFn is not supported for OldDoFn");
- }
- }
+ private DoFnInvokers() {}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a22de150/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
index 456a6eb..55b8e7e 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
@@ -25,7 +25,6 @@ import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertThat;
import static org.mockito.Matchers.eq;
import static org.mockito.Matchers.same;
-import static org.mockito.Mockito.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
@@ -738,39 +737,4 @@ public class DoFnInvokersTest {
invoker.invokeOnTimer(timerId, mockArgumentProvider);
assertThat(fn.window, equalTo(testWindow));
}
-
- private class OldDoFnIdentity extends OldDoFn<String, String> {
- public void processElement(ProcessContext c) {}
- }
-
- @Test
- public void testOldDoFnProcessElement() throws Exception {
- new DoFnInvokers.OldDoFnInvoker<>(mockOldDoFn)
- .invokeProcessElement(mockArgumentProvider);
- verify(mockOldDoFn).processElement(any(OldDoFn.ProcessContext.class));
- }
-
- @Test
- public void testOldDoFnStartBundle() throws Exception {
- new DoFnInvokers.OldDoFnInvoker<>(mockOldDoFn).invokeStartBundle(mockProcessContext);
- verify(mockOldDoFn).startBundle(any(OldDoFn.Context.class));
- }
-
- @Test
- public void testOldDoFnFinishBundle() throws Exception {
- new DoFnInvokers.OldDoFnInvoker<>(mockOldDoFn).invokeFinishBundle(mockProcessContext);
- verify(mockOldDoFn).finishBundle(any(OldDoFn.Context.class));
- }
-
- @Test
- public void testOldDoFnSetup() throws Exception {
- new DoFnInvokers.OldDoFnInvoker<>(mockOldDoFn).invokeSetup();
- verify(mockOldDoFn).setup();
- }
-
- @Test
- public void testOldDoFnTeardown() throws Exception {
- new DoFnInvokers.OldDoFnInvoker<>(mockOldDoFn).invokeTeardown();
- verify(mockOldDoFn).teardown();
- }
}