You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/12/21 20:23:14 UTC
[06/50] incubator-beam git commit: Revert "Moves DoFnAdapters to
runners-core"
Revert "Moves DoFnAdapters to runners-core"
This reverts commit 33ed3238e2b3899cff061be3056c5cc29fc60a04.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/954e57d7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/954e57d7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/954e57d7
Branch: refs/heads/gearpump-runner
Commit: 954e57d7696fd14f7d1015f4e40f025ef8538802
Parents: 4aa0ee1
Author: Eugene Kirpichov <ki...@google.com>
Authored: Fri Dec 16 15:37:02 2016 -0800
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Fri Dec 16 16:39:20 2016 -0800
----------------------------------------------------------------------
.../apex/translation/WindowBoundTranslator.java | 2 +-
.../operators/ApexGroupByKeyOperator.java | 2 +-
.../operators/ApexParDoOperator.java | 2 +-
.../apache/beam/runners/core/DoFnAdapters.java | 508 -------------------
.../beam/runners/core/SimpleOldDoFnRunner.java | 4 +-
.../core/GroupAlsoByWindowsProperties.java | 2 +-
.../functions/FlinkDoFnFunction.java | 2 +-
.../functions/FlinkMultiOutputDoFnFunction.java | 2 +-
.../functions/FlinkProcessContextBase.java | 2 +-
.../wrappers/streaming/DoFnOperator.java | 2 +-
.../sdk/transforms/AggregatorRetriever.java | 13 +-
.../beam/sdk/transforms/DoFnAdapters.java | 504 ++++++++++++++++++
.../org/apache/beam/sdk/transforms/OldDoFn.java | 2 +-
.../apache/beam/sdk/transforms/NoOpOldDoFn.java | 2 +-
14 files changed, 518 insertions(+), 531 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/954e57d7/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/WindowBoundTranslator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/WindowBoundTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/WindowBoundTranslator.java
index ef049e1..33b9269 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/WindowBoundTranslator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/WindowBoundTranslator.java
@@ -22,8 +22,8 @@ import java.util.Collections;
import org.apache.beam.runners.apex.ApexPipelineOptions;
import org.apache.beam.runners.apex.translation.operators.ApexParDoOperator;
import org.apache.beam.runners.core.AssignWindowsDoFn;
-import org.apache.beam.runners.core.DoFnAdapters;
import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.DoFnAdapters;
import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.Window;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/954e57d7/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java
index 4af7ff0..48ac177 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java
@@ -413,7 +413,7 @@ public class ApexGroupByKeyOperator<K, V> implements Operator {
}
@Override
- public <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregatorInternal(
+ protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregatorInternal(
String name, Combine.CombineFn<AggInputT, ?, AggOutputT> combiner) {
throw new UnsupportedOperationException();
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/954e57d7/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
index 4538fb5..a3d3a97 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
@@ -38,7 +38,6 @@ import org.apache.beam.runners.apex.translation.utils.NoOpStepContext;
import org.apache.beam.runners.apex.translation.utils.SerializablePipelineOptions;
import org.apache.beam.runners.apex.translation.utils.ValueAndCoderKryoSerializable;
import org.apache.beam.runners.core.AggregatorFactory;
-import org.apache.beam.runners.core.DoFnAdapters;
import org.apache.beam.runners.core.DoFnRunner;
import org.apache.beam.runners.core.DoFnRunners;
import org.apache.beam.runners.core.DoFnRunners.OutputManager;
@@ -49,6 +48,7 @@ import org.apache.beam.sdk.coders.ListCoder;
import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.Combine.CombineFn;
import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.DoFnAdapters;
import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.util.ExecutionContext;
import org.apache.beam.sdk.util.NullSideInputReader;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/954e57d7/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnAdapters.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnAdapters.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnAdapters.java
deleted file mode 100644
index fc5847c..0000000
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnAdapters.java
+++ /dev/null
@@ -1,508 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.core;
-
-import java.io.IOException;
-import java.util.Collection;
-import javax.annotation.Nullable;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.AggregatorRetriever;
-import org.apache.beam.sdk.transforms.Combine.CombineFn;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFn.Context;
-import org.apache.beam.sdk.transforms.DoFn.OnTimerContext;
-import org.apache.beam.sdk.transforms.DoFn.ProcessContext;
-import org.apache.beam.sdk.transforms.OldDoFn;
-import org.apache.beam.sdk.transforms.display.DisplayData;
-import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
-import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
-import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
-import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
-import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.util.Timer;
-import org.apache.beam.sdk.util.WindowingInternals;
-import org.apache.beam.sdk.util.state.State;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.TupleTag;
-import org.apache.beam.sdk.values.TypeDescriptor;
-import org.joda.time.Duration;
-import org.joda.time.Instant;
-
-/**
- * Utility class containing adapters to/from {@link DoFn} and {@link OldDoFn}.
- *
- * @deprecated This class will go away when we start running {@link DoFn}'s directly (using {@link
- * DoFnInvoker}) rather than via {@link OldDoFn}.
- */
-@Deprecated
-public class DoFnAdapters {
- /** Should not be instantiated. */
- private DoFnAdapters() {}
-
- /**
- * If this is an {@link OldDoFn} produced via {@link #toOldDoFn}, returns the class of the
- * original {@link DoFn}, otherwise returns {@code fn.getClass()}.
- */
- public static Class<?> getDoFnClass(OldDoFn<?, ?> fn) {
- if (fn instanceof SimpleDoFnAdapter) {
- return ((SimpleDoFnAdapter<?, ?>) fn).fn.getClass();
- } else {
- return fn.getClass();
- }
- }
-
- /** Creates an {@link OldDoFn} that delegates to the {@link DoFn}. */
- @SuppressWarnings({"unchecked", "rawtypes"})
- public static <InputT, OutputT> OldDoFn<InputT, OutputT> toOldDoFn(DoFn<InputT, OutputT> fn) {
- DoFnSignature signature = DoFnSignatures.getSignature((Class) fn.getClass());
- if (signature.processElement().observesWindow()) {
- return new WindowDoFnAdapter<>(fn);
- } else {
- return new SimpleDoFnAdapter<>(fn);
- }
- }
-
- /** Creates a {@link OldDoFn.ProcessContext} from a {@link DoFn.ProcessContext}. */
- public static <InputT, OutputT> OldDoFn<InputT, OutputT>.ProcessContext adaptProcessContext(
- OldDoFn<InputT, OutputT> fn,
- final DoFn<InputT, OutputT>.ProcessContext c,
- final DoFnInvoker.ArgumentProvider<InputT, OutputT> extra) {
- return fn.new ProcessContext() {
- @Override
- public InputT element() {
- return c.element();
- }
-
- @Override
- public <T> T sideInput(PCollectionView<T> view) {
- return c.sideInput(view);
- }
-
- @Override
- public Instant timestamp() {
- return c.timestamp();
- }
-
- @Override
- public BoundedWindow window() {
- return extra.window();
- }
-
- @Override
- public PaneInfo pane() {
- return c.pane();
- }
-
- @Override
- public WindowingInternals<InputT, OutputT> windowingInternals() {
- return extra.windowingInternals();
- }
-
- @Override
- public PipelineOptions getPipelineOptions() {
- return c.getPipelineOptions();
- }
-
- @Override
- public void output(OutputT output) {
- c.output(output);
- }
-
- @Override
- public void outputWithTimestamp(OutputT output, Instant timestamp) {
- c.outputWithTimestamp(output, timestamp);
- }
-
- @Override
- public <T> void sideOutput(TupleTag<T> tag, T output) {
- c.sideOutput(tag, output);
- }
-
- @Override
- public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
- c.sideOutputWithTimestamp(tag, output, timestamp);
- }
-
- @Override
- protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregatorInternal(
- String name, CombineFn<AggInputT, ?, AggOutputT> combiner) {
- return c.createAggregator(name, combiner);
- }
- };
- }
-
- /** Creates a {@link OldDoFn.ProcessContext} from a {@link DoFn.ProcessContext}. */
- public static <InputT, OutputT> OldDoFn<InputT, OutputT>.Context adaptContext(
- OldDoFn<InputT, OutputT> fn,
- final DoFn<InputT, OutputT>.Context c) {
- return fn.new Context() {
- @Override
- public PipelineOptions getPipelineOptions() {
- return c.getPipelineOptions();
- }
-
- @Override
- public void output(OutputT output) {
- c.output(output);
- }
-
- @Override
- public void outputWithTimestamp(OutputT output, Instant timestamp) {
- c.outputWithTimestamp(output, timestamp);
- }
-
- @Override
- public <T> void sideOutput(TupleTag<T> tag, T output) {
- c.sideOutput(tag, output);
- }
-
- @Override
- public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
- c.sideOutputWithTimestamp(tag, output, timestamp);
- }
-
- @Override
- protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregatorInternal(
- String name, CombineFn<AggInputT, ?, AggOutputT> combiner) {
- return c.createAggregator(name, combiner);
- }
- };
- }
-
- /**
- * If the fn was created using {@link #toOldDoFn}, returns the original {@link DoFn}. Otherwise,
- * returns {@code null}.
- */
- @Nullable
- public static <InputT, OutputT> DoFn<InputT, OutputT> getDoFn(OldDoFn<InputT, OutputT> fn) {
- if (fn instanceof SimpleDoFnAdapter) {
- return ((SimpleDoFnAdapter<InputT, OutputT>) fn).fn;
- } else {
- return null;
- }
- }
-
- /**
- * Wraps a {@link DoFn} that doesn't require access to {@link BoundedWindow} as an {@link
- * OldDoFn}.
- */
- private static class SimpleDoFnAdapter<InputT, OutputT> extends OldDoFn<InputT, OutputT> {
- private final DoFn<InputT, OutputT> fn;
- private transient DoFnInvoker<InputT, OutputT> invoker;
-
- SimpleDoFnAdapter(DoFn<InputT, OutputT> fn) {
- super(AggregatorRetriever.getDelegatingAggregators(fn));
- this.fn = fn;
- this.invoker = DoFnInvokers.invokerFor(fn);
- }
-
- @Override
- public void setup() throws Exception {
- this.invoker.invokeSetup();
- }
-
- @Override
- public void startBundle(Context c) throws Exception {
- fn.prepareForProcessing();
- invoker.invokeStartBundle(new ContextAdapter<>(fn, c));
- }
-
- @Override
- public void finishBundle(Context c) throws Exception {
- invoker.invokeFinishBundle(new ContextAdapter<>(fn, c));
- }
-
- @Override
- public void teardown() throws Exception {
- this.invoker.invokeTeardown();
- }
-
- @Override
- public void processElement(ProcessContext c) throws Exception {
- ProcessContextAdapter<InputT, OutputT> adapter = new ProcessContextAdapter<>(fn, c);
- invoker.invokeProcessElement(adapter);
- }
-
- @Override
- protected TypeDescriptor<InputT> getInputTypeDescriptor() {
- return fn.getInputTypeDescriptor();
- }
-
- @Override
- protected TypeDescriptor<OutputT> getOutputTypeDescriptor() {
- return fn.getOutputTypeDescriptor();
- }
-
- @Override
- Collection<Aggregator<?, ?>> getAggregators() {
- return fn.getAggregators();
- }
-
- @Override
- public Duration getAllowedTimestampSkew() {
- return fn.getAllowedTimestampSkew();
- }
-
- @Override
- public void populateDisplayData(DisplayData.Builder builder) {
- builder.delegate(fn);
- }
-
- private void readObject(java.io.ObjectInputStream in)
- throws IOException, ClassNotFoundException {
- in.defaultReadObject();
- this.invoker = DoFnInvokers.invokerFor(fn);
- }
- }
-
- /** Wraps a {@link DoFn} that requires access to {@link BoundedWindow} as an {@link OldDoFn}. */
- private static class WindowDoFnAdapter<InputT, OutputT> extends SimpleDoFnAdapter<InputT, OutputT>
- implements OldDoFn.RequiresWindowAccess {
-
- WindowDoFnAdapter(DoFn<InputT, OutputT> fn) {
- super(fn);
- }
- }
-
- /**
- * Wraps an {@link OldDoFn.Context} as a {@link DoFnInvoker.ArgumentProvider} inside a {@link
- * DoFn.StartBundle} or {@link DoFn.FinishBundle} method, which means the extra context is
- * unavailable.
- */
- private static class ContextAdapter<InputT, OutputT> extends DoFn<InputT, OutputT>.Context
- implements DoFnInvoker.ArgumentProvider<InputT, OutputT> {
-
- private OldDoFn<InputT, OutputT>.Context context;
-
- private ContextAdapter(DoFn<InputT, OutputT> fn, OldDoFn<InputT, OutputT>.Context context) {
- fn.super();
- this.context = context;
- super.setupDelegateAggregators();
- }
-
- @Override
- public PipelineOptions getPipelineOptions() {
- return context.getPipelineOptions();
- }
-
- @Override
- public void output(OutputT output) {
- context.output(output);
- }
-
- @Override
- public void outputWithTimestamp(OutputT output, Instant timestamp) {
- context.outputWithTimestamp(output, timestamp);
- }
-
- @Override
- public <T> void sideOutput(TupleTag<T> tag, T output) {
- context.sideOutput(tag, output);
- }
-
- @Override
- public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
- context.sideOutputWithTimestamp(tag, output, timestamp);
- }
-
- @Override
- protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregator(
- String name,
- CombineFn<AggInputT, ?, AggOutputT> combiner) {
- return context.createAggregatorInternal(name, combiner);
- }
-
- @Override
- public BoundedWindow window() {
- // The OldDoFn doesn't allow us to ask for these outside processElement, so this
- // should be unreachable.
- throw new UnsupportedOperationException(
- "Can only get the window in processElement; elsewhere there is no defined window.");
- }
-
- @Override
- public Context context(DoFn<InputT, OutputT> doFn) {
- return this;
- }
-
- @Override
- public ProcessContext processContext(DoFn<InputT, OutputT> doFn) {
- throw new UnsupportedOperationException(
- "Can only get a ProcessContext in processElement");
- }
-
- @Override
- public OnTimerContext onTimerContext(DoFn<InputT, OutputT> doFn) {
- throw new UnsupportedOperationException(
- "Timers are not supported for OldDoFn");
- }
-
- @Override
- public WindowingInternals<InputT, OutputT> windowingInternals() {
- // The OldDoFn doesn't allow us to ask for these outside ProcessElements, so this
- // should be unreachable.
- throw new UnsupportedOperationException(
- "Can only get WindowingInternals in processElement");
- }
-
- @Override
- public DoFn.InputProvider<InputT> inputProvider() {
- throw new UnsupportedOperationException("inputProvider() exists only for testing");
- }
-
- @Override
- public DoFn.OutputReceiver<OutputT> outputReceiver() {
- throw new UnsupportedOperationException("outputReceiver() exists only for testing");
- }
-
- @Override
- public <RestrictionT> RestrictionTracker<RestrictionT> restrictionTracker() {
- throw new UnsupportedOperationException("This is a non-splittable DoFn");
- }
-
- @Override
- public State state(String stateId) {
- throw new UnsupportedOperationException("State is not supported by this runner");
- }
-
- @Override
- public Timer timer(String timerId) {
- throw new UnsupportedOperationException("Timers are not supported by this runner");
- }
- }
-
- /**
- * Wraps an {@link OldDoFn.ProcessContext} as a {@link DoFnInvoker.ArgumentProvider} method.
- */
- private static class ProcessContextAdapter<InputT, OutputT>
- extends DoFn<InputT, OutputT>.ProcessContext
- implements DoFnInvoker.ArgumentProvider<InputT, OutputT> {
-
- private OldDoFn<InputT, OutputT>.ProcessContext context;
-
- private ProcessContextAdapter(
- DoFn<InputT, OutputT> fn, OldDoFn<InputT, OutputT>.ProcessContext context) {
- fn.super();
- this.context = context;
- }
-
- @Override
- public PipelineOptions getPipelineOptions() {
- return context.getPipelineOptions();
- }
-
- @Override
- public <T> T sideInput(PCollectionView<T> view) {
- return context.sideInput(view);
- }
-
- @Override
- public void output(OutputT output) {
- context.output(output);
- }
-
- @Override
- public void outputWithTimestamp(OutputT output, Instant timestamp) {
- context.outputWithTimestamp(output, timestamp);
- }
-
- @Override
- public <T> void sideOutput(TupleTag<T> tag, T output) {
- context.sideOutput(tag, output);
- }
-
- @Override
- public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
- context.sideOutputWithTimestamp(tag, output, timestamp);
- }
-
- @Override
- protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregator(
- String name, CombineFn<AggInputT, ?, AggOutputT> combiner) {
- return context.createAggregatorInternal(name, combiner);
- }
-
- @Override
- public InputT element() {
- return context.element();
- }
-
- @Override
- public Instant timestamp() {
- return context.timestamp();
- }
-
- @Override
- public PaneInfo pane() {
- return context.pane();
- }
-
- @Override
- public BoundedWindow window() {
- return context.window();
- }
-
- @Override
- public Context context(DoFn<InputT, OutputT> doFn) {
- return this;
- }
-
- @Override
- public ProcessContext processContext(DoFn<InputT, OutputT> doFn) {
- return this;
- }
-
- @Override
- public OnTimerContext onTimerContext(DoFn<InputT, OutputT> doFn) {
- throw new UnsupportedOperationException("Timers are not supported for OldDoFn");
- }
-
- @Override
- public WindowingInternals<InputT, OutputT> windowingInternals() {
- return context.windowingInternals();
- }
-
- @Override
- public DoFn.InputProvider<InputT> inputProvider() {
- throw new UnsupportedOperationException("inputProvider() exists only for testing");
- }
-
- @Override
- public DoFn.OutputReceiver<OutputT> outputReceiver() {
- throw new UnsupportedOperationException("outputReceiver() exists only for testing");
- }
-
- @Override
- public <RestrictionT> RestrictionTracker<RestrictionT> restrictionTracker() {
- throw new UnsupportedOperationException("This is a non-splittable DoFn");
- }
-
- @Override
- public State state(String stateId) {
- throw new UnsupportedOperationException("State is not supported by this runner");
- }
-
- @Override
- public Timer timer(String timerId) {
- throw new UnsupportedOperationException("Timers are not supported by this runner");
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/954e57d7/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java
index 7d93200..1048fdc 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java
@@ -322,7 +322,7 @@ class SimpleOldDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, OutputT
}
@Override
- public <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregatorInternal(
+ protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregatorInternal(
String name, CombineFn<AggInputT, ?, AggOutputT> combiner) {
checkNotNull(combiner, "Combiner passed to createAggregatorInternal cannot be null");
return aggregatorFactory.createAggregatorForDoFn(fn.getClass(), stepContext, name, combiner);
@@ -504,7 +504,7 @@ class SimpleOldDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, OutputT
}
@Override
- public <AggregatorInputT, AggregatorOutputT> Aggregator<AggregatorInputT, AggregatorOutputT>
+ protected <AggregatorInputT, AggregatorOutputT> Aggregator<AggregatorInputT, AggregatorOutputT>
createAggregatorInternal(
String name, CombineFn<AggregatorInputT, ?, AggregatorOutputT> combiner) {
return context.createAggregatorInternal(name, combiner);
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/954e57d7/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java
index ef01106..97b67c6 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java
@@ -744,7 +744,7 @@ public class GroupAlsoByWindowsProperties {
}
@Override
- public <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregatorInternal(
+ protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregatorInternal(
String name, CombineFn<AggInputT, ?, AggOutputT> combiner) {
throw new UnsupportedOperationException();
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/954e57d7/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java
index 2a4a68e..ed200d5 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java
@@ -18,10 +18,10 @@
package org.apache.beam.runners.flink.translation.functions;
import java.util.Map;
-import org.apache.beam.runners.core.DoFnAdapters;
import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.DoFnAdapters;
import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
import org.apache.beam.sdk.util.WindowedValue;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/954e57d7/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputDoFnFunction.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputDoFnFunction.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputDoFnFunction.java
index a97bd46..7f6a436 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputDoFnFunction.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputDoFnFunction.java
@@ -18,10 +18,10 @@
package org.apache.beam.runners.flink.translation.functions;
import java.util.Map;
-import org.apache.beam.runners.core.DoFnAdapters;
import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.DoFnAdapters;
import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.join.RawUnionValue;
import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/954e57d7/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContextBase.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContextBase.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContextBase.java
index 53b9803..6afca38 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContextBase.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContextBase.java
@@ -252,7 +252,7 @@ abstract class FlinkProcessContextBase<InputT, OutputT>
public abstract <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp);
@Override
- public <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT>
+ protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT>
createAggregatorInternal(String name, Combine.CombineFn<AggInputT, ?, AggOutputT> combiner) {
@SuppressWarnings("unchecked")
SerializableFnAggregatorWrapper<AggInputT, AggOutputT> result =
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/954e57d7/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
index 001e3b6..8704308 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
@@ -29,7 +29,6 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.beam.runners.core.AggregatorFactory;
-import org.apache.beam.runners.core.DoFnAdapters;
import org.apache.beam.runners.core.DoFnRunner;
import org.apache.beam.runners.core.DoFnRunners;
import org.apache.beam.runners.core.PushbackSideInputDoFnRunner;
@@ -42,6 +41,7 @@ import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.DoFnAdapters;
import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.join.RawUnionValue;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/954e57d7/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/AggregatorRetriever.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/AggregatorRetriever.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/AggregatorRetriever.java
index b1d3ead..ce47e22 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/AggregatorRetriever.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/AggregatorRetriever.java
@@ -18,10 +18,9 @@
package org.apache.beam.sdk.transforms;
import java.util.Collection;
-import java.util.Map;
/**
- * An internal class for extracting {@link Aggregator Aggregators} from {@link DoFn DoFns}.
+ * An internal class for extracting {@link Aggregator Aggregators} from {@link OldDoFn DoFns}.
*/
public final class AggregatorRetriever {
private AggregatorRetriever() {
@@ -29,17 +28,9 @@ public final class AggregatorRetriever {
}
/**
- * Returns the {@link Aggregator Aggregators} created by the provided {@link DoFn}.
+ * Returns the {@link Aggregator Aggregators} created by the provided {@link OldDoFn}.
*/
public static Collection<Aggregator<?, ?>> getAggregators(DoFn<?, ?> fn) {
return fn.getAggregators();
}
-
- /**
- * Returns the {@link DelegatingAggregator delegating aggregators} created by the provided {@link
- * DoFn}.
- */
- public static Map<String, DelegatingAggregator<?, ?>> getDelegatingAggregators(DoFn<?, ?> fn) {
- return fn.aggregators;
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/954e57d7/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java
new file mode 100644
index 0000000..e15b08b
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java
@@ -0,0 +1,504 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms;
+
+import java.io.IOException;
+import java.util.Collection;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.Combine.CombineFn;
+import org.apache.beam.sdk.transforms.DoFn.Context;
+import org.apache.beam.sdk.transforms.DoFn.OnTimerContext;
+import org.apache.beam.sdk.transforms.DoFn.ProcessContext;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
+import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.util.Timer;
+import org.apache.beam.sdk.util.WindowingInternals;
+import org.apache.beam.sdk.util.state.State;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+/**
+ * Utility class containing adapters to/from {@link DoFn} and {@link OldDoFn}.
+ *
+ * @deprecated This class will go away when we start running {@link DoFn}'s directly (using {@link
+ * DoFnInvoker}) rather than via {@link OldDoFn}.
+ */
+@Deprecated
+public class DoFnAdapters {
+ /** Should not be instantiated. */
+ private DoFnAdapters() {}
+
+ /**
+ * If this is an {@link OldDoFn} produced via {@link #toOldDoFn}, returns the class of the
+ * original {@link DoFn}, otherwise returns {@code fn.getClass()}.
+ */
+ public static Class<?> getDoFnClass(OldDoFn<?, ?> fn) {
+ if (fn instanceof SimpleDoFnAdapter) {
+ return ((SimpleDoFnAdapter<?, ?>) fn).fn.getClass();
+ } else {
+ return fn.getClass();
+ }
+ }
+
+ /** Creates an {@link OldDoFn} that delegates to the {@link DoFn}. */
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ public static <InputT, OutputT> OldDoFn<InputT, OutputT> toOldDoFn(DoFn<InputT, OutputT> fn) {
+ DoFnSignature signature = DoFnSignatures.getSignature((Class) fn.getClass());
+ if (signature.processElement().observesWindow()) {
+ return new WindowDoFnAdapter<>(fn);
+ } else {
+ return new SimpleDoFnAdapter<>(fn);
+ }
+ }
+
+ /** Creates a {@link OldDoFn.ProcessContext} from a {@link DoFn.ProcessContext}. */
+ public static <InputT, OutputT> OldDoFn<InputT, OutputT>.ProcessContext adaptProcessContext(
+ OldDoFn<InputT, OutputT> fn,
+ final DoFn<InputT, OutputT>.ProcessContext c,
+ final DoFnInvoker.ArgumentProvider<InputT, OutputT> extra) {
+ return fn.new ProcessContext() {
+ @Override
+ public InputT element() {
+ return c.element();
+ }
+
+ @Override
+ public <T> T sideInput(PCollectionView<T> view) {
+ return c.sideInput(view);
+ }
+
+ @Override
+ public Instant timestamp() {
+ return c.timestamp();
+ }
+
+ @Override
+ public BoundedWindow window() {
+ return extra.window();
+ }
+
+ @Override
+ public PaneInfo pane() {
+ return c.pane();
+ }
+
+ @Override
+ public WindowingInternals<InputT, OutputT> windowingInternals() {
+ return extra.windowingInternals();
+ }
+
+ @Override
+ public PipelineOptions getPipelineOptions() {
+ return c.getPipelineOptions();
+ }
+
+ @Override
+ public void output(OutputT output) {
+ c.output(output);
+ }
+
+ @Override
+ public void outputWithTimestamp(OutputT output, Instant timestamp) {
+ c.outputWithTimestamp(output, timestamp);
+ }
+
+ @Override
+ public <T> void sideOutput(TupleTag<T> tag, T output) {
+ c.sideOutput(tag, output);
+ }
+
+ @Override
+ public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
+ c.sideOutputWithTimestamp(tag, output, timestamp);
+ }
+
+ @Override
+ protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregatorInternal(
+ String name, CombineFn<AggInputT, ?, AggOutputT> combiner) {
+ return c.createAggregator(name, combiner);
+ }
+ };
+ }
+
+ /** Creates a {@link OldDoFn.ProcessContext} from a {@link DoFn.ProcessContext}. */
+ public static <InputT, OutputT> OldDoFn<InputT, OutputT>.Context adaptContext(
+ OldDoFn<InputT, OutputT> fn,
+ final DoFn<InputT, OutputT>.Context c) {
+ return fn.new Context() {
+ @Override
+ public PipelineOptions getPipelineOptions() {
+ return c.getPipelineOptions();
+ }
+
+ @Override
+ public void output(OutputT output) {
+ c.output(output);
+ }
+
+ @Override
+ public void outputWithTimestamp(OutputT output, Instant timestamp) {
+ c.outputWithTimestamp(output, timestamp);
+ }
+
+ @Override
+ public <T> void sideOutput(TupleTag<T> tag, T output) {
+ c.sideOutput(tag, output);
+ }
+
+ @Override
+ public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
+ c.sideOutputWithTimestamp(tag, output, timestamp);
+ }
+
+ @Override
+ protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregatorInternal(
+ String name, CombineFn<AggInputT, ?, AggOutputT> combiner) {
+ return c.createAggregator(name, combiner);
+ }
+ };
+ }
+
+ /**
+ * If the fn was created using {@link #toOldDoFn}, returns the original {@link DoFn}. Otherwise,
+ * returns {@code null}.
+ */
+ @Nullable
+ public static <InputT, OutputT> DoFn<InputT, OutputT> getDoFn(OldDoFn<InputT, OutputT> fn) {
+ if (fn instanceof SimpleDoFnAdapter) {
+ return ((SimpleDoFnAdapter<InputT, OutputT>) fn).fn;
+ } else {
+ return null;
+ }
+ }
+
+ /**
+ * Wraps a {@link DoFn} that doesn't require access to {@link BoundedWindow} as an {@link
+ * OldDoFn}.
+ */
+ private static class SimpleDoFnAdapter<InputT, OutputT> extends OldDoFn<InputT, OutputT> {
+ private final DoFn<InputT, OutputT> fn;
+ private transient DoFnInvoker<InputT, OutputT> invoker;
+
+ SimpleDoFnAdapter(DoFn<InputT, OutputT> fn) {
+ super(fn.aggregators);
+ this.fn = fn;
+ this.invoker = DoFnInvokers.invokerFor(fn);
+ }
+
+ @Override
+ public void setup() throws Exception {
+ this.invoker.invokeSetup();
+ }
+
+ @Override
+ public void startBundle(Context c) throws Exception {
+ fn.prepareForProcessing();
+ invoker.invokeStartBundle(new ContextAdapter<>(fn, c));
+ }
+
+ @Override
+ public void finishBundle(Context c) throws Exception {
+ invoker.invokeFinishBundle(new ContextAdapter<>(fn, c));
+ }
+
+ @Override
+ public void teardown() throws Exception {
+ this.invoker.invokeTeardown();
+ }
+
+ @Override
+ public void processElement(ProcessContext c) throws Exception {
+ ProcessContextAdapter<InputT, OutputT> adapter = new ProcessContextAdapter<>(fn, c);
+ invoker.invokeProcessElement(adapter);
+ }
+
+ @Override
+ protected TypeDescriptor<InputT> getInputTypeDescriptor() {
+ return fn.getInputTypeDescriptor();
+ }
+
+ @Override
+ protected TypeDescriptor<OutputT> getOutputTypeDescriptor() {
+ return fn.getOutputTypeDescriptor();
+ }
+
+ @Override
+ Collection<Aggregator<?, ?>> getAggregators() {
+ return fn.getAggregators();
+ }
+
+ @Override
+ public Duration getAllowedTimestampSkew() {
+ return fn.getAllowedTimestampSkew();
+ }
+
+ @Override
+ public void populateDisplayData(DisplayData.Builder builder) {
+ builder.delegate(fn);
+ }
+
+ private void readObject(java.io.ObjectInputStream in)
+ throws IOException, ClassNotFoundException {
+ in.defaultReadObject();
+ this.invoker = DoFnInvokers.invokerFor(fn);
+ }
+ }
+
+ /** Wraps a {@link DoFn} that requires access to {@link BoundedWindow} as an {@link OldDoFn}. */
+ private static class WindowDoFnAdapter<InputT, OutputT> extends SimpleDoFnAdapter<InputT, OutputT>
+ implements OldDoFn.RequiresWindowAccess {
+
+ WindowDoFnAdapter(DoFn<InputT, OutputT> fn) {
+ super(fn);
+ }
+ }
+
+ /**
+ * Wraps an {@link OldDoFn.Context} as a {@link DoFnInvoker.ArgumentProvider} inside a {@link
+ * DoFn.StartBundle} or {@link DoFn.FinishBundle} method, which means the extra context is
+ * unavailable.
+ */
+ private static class ContextAdapter<InputT, OutputT> extends DoFn<InputT, OutputT>.Context
+ implements DoFnInvoker.ArgumentProvider<InputT, OutputT> {
+
+ private OldDoFn<InputT, OutputT>.Context context;
+
+ private ContextAdapter(DoFn<InputT, OutputT> fn, OldDoFn<InputT, OutputT>.Context context) {
+ fn.super();
+ this.context = context;
+ super.setupDelegateAggregators();
+ }
+
+ @Override
+ public PipelineOptions getPipelineOptions() {
+ return context.getPipelineOptions();
+ }
+
+ @Override
+ public void output(OutputT output) {
+ context.output(output);
+ }
+
+ @Override
+ public void outputWithTimestamp(OutputT output, Instant timestamp) {
+ context.outputWithTimestamp(output, timestamp);
+ }
+
+ @Override
+ public <T> void sideOutput(TupleTag<T> tag, T output) {
+ context.sideOutput(tag, output);
+ }
+
+ @Override
+ public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
+ context.sideOutputWithTimestamp(tag, output, timestamp);
+ }
+
+ @Override
+ protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregator(
+ String name,
+ CombineFn<AggInputT, ?, AggOutputT> combiner) {
+ return context.createAggregatorInternal(name, combiner);
+ }
+
+ @Override
+ public BoundedWindow window() {
+ // The OldDoFn doesn't allow us to ask for these outside processElement, so this
+ // should be unreachable.
+ throw new UnsupportedOperationException(
+ "Can only get the window in processElement; elsewhere there is no defined window.");
+ }
+
+ @Override
+ public Context context(DoFn<InputT, OutputT> doFn) {
+ return this;
+ }
+
+ @Override
+ public ProcessContext processContext(DoFn<InputT, OutputT> doFn) {
+ throw new UnsupportedOperationException(
+ "Can only get a ProcessContext in processElement");
+ }
+
+ @Override
+ public OnTimerContext onTimerContext(DoFn<InputT, OutputT> doFn) {
+ throw new UnsupportedOperationException(
+ "Timers are not supported for OldDoFn");
+ }
+
+ @Override
+ public WindowingInternals<InputT, OutputT> windowingInternals() {
+ // The OldDoFn doesn't allow us to ask for these outside ProcessElements, so this
+ // should be unreachable.
+ throw new UnsupportedOperationException(
+ "Can only get WindowingInternals in processElement");
+ }
+
+ @Override
+ public DoFn.InputProvider<InputT> inputProvider() {
+ throw new UnsupportedOperationException("inputProvider() exists only for testing");
+ }
+
+ @Override
+ public DoFn.OutputReceiver<OutputT> outputReceiver() {
+ throw new UnsupportedOperationException("outputReceiver() exists only for testing");
+ }
+
+ @Override
+ public <RestrictionT> RestrictionTracker<RestrictionT> restrictionTracker() {
+ throw new UnsupportedOperationException("This is a non-splittable DoFn");
+ }
+
+ @Override
+ public State state(String stateId) {
+ throw new UnsupportedOperationException("State is not supported by this runner");
+ }
+
+ @Override
+ public Timer timer(String timerId) {
+ throw new UnsupportedOperationException("Timers are not supported by this runner");
+ }
+ }
+
+ /**
+ * Wraps an {@link OldDoFn.ProcessContext} as a {@link DoFnInvoker.ArgumentProvider} method.
+ */
+ private static class ProcessContextAdapter<InputT, OutputT>
+ extends DoFn<InputT, OutputT>.ProcessContext
+ implements DoFnInvoker.ArgumentProvider<InputT, OutputT> {
+
+ private OldDoFn<InputT, OutputT>.ProcessContext context;
+
+ private ProcessContextAdapter(
+ DoFn<InputT, OutputT> fn, OldDoFn<InputT, OutputT>.ProcessContext context) {
+ fn.super();
+ this.context = context;
+ }
+
+ @Override
+ public PipelineOptions getPipelineOptions() {
+ return context.getPipelineOptions();
+ }
+
+ @Override
+ public <T> T sideInput(PCollectionView<T> view) {
+ return context.sideInput(view);
+ }
+
+ @Override
+ public void output(OutputT output) {
+ context.output(output);
+ }
+
+ @Override
+ public void outputWithTimestamp(OutputT output, Instant timestamp) {
+ context.outputWithTimestamp(output, timestamp);
+ }
+
+ @Override
+ public <T> void sideOutput(TupleTag<T> tag, T output) {
+ context.sideOutput(tag, output);
+ }
+
+ @Override
+ public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
+ context.sideOutputWithTimestamp(tag, output, timestamp);
+ }
+
+ @Override
+ protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregator(
+ String name, CombineFn<AggInputT, ?, AggOutputT> combiner) {
+ return context.createAggregatorInternal(name, combiner);
+ }
+
+ @Override
+ public InputT element() {
+ return context.element();
+ }
+
+ @Override
+ public Instant timestamp() {
+ return context.timestamp();
+ }
+
+ @Override
+ public PaneInfo pane() {
+ return context.pane();
+ }
+
+ @Override
+ public BoundedWindow window() {
+ return context.window();
+ }
+
+ @Override
+ public Context context(DoFn<InputT, OutputT> doFn) {
+ return this;
+ }
+
+ @Override
+ public ProcessContext processContext(DoFn<InputT, OutputT> doFn) {
+ return this;
+ }
+
+ @Override
+ public OnTimerContext onTimerContext(DoFn<InputT, OutputT> doFn) {
+ throw new UnsupportedOperationException("Timers are not supported for OldDoFn");
+ }
+
+ @Override
+ public WindowingInternals<InputT, OutputT> windowingInternals() {
+ return context.windowingInternals();
+ }
+
+ @Override
+ public DoFn.InputProvider<InputT> inputProvider() {
+ throw new UnsupportedOperationException("inputProvider() exists only for testing");
+ }
+
+ @Override
+ public DoFn.OutputReceiver<OutputT> outputReceiver() {
+ throw new UnsupportedOperationException("outputReceiver() exists only for testing");
+ }
+
+ @Override
+ public <RestrictionT> RestrictionTracker<RestrictionT> restrictionTracker() {
+ throw new UnsupportedOperationException("This is a non-splittable DoFn");
+ }
+
+ @Override
+ public State state(String stateId) {
+ throw new UnsupportedOperationException("State is not supported by this runner");
+ }
+
+ @Override
+ public Timer timer(String timerId) {
+ throw new UnsupportedOperationException("Timers are not supported by this runner");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/954e57d7/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java
index d1bb42b..2d2c1fd 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java
@@ -208,7 +208,7 @@ public abstract class OldDoFn<InputT, OutputT> implements Serializable, HasDispl
* context
*/
@Experimental(Kind.AGGREGATOR)
- public abstract <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT>
+ protected abstract <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT>
createAggregatorInternal(String name, CombineFn<AggInputT, ?, AggOutputT> combiner);
/**
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/954e57d7/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/NoOpOldDoFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/NoOpOldDoFn.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/NoOpOldDoFn.java
index 0db130d..504480b 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/NoOpOldDoFn.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/NoOpOldDoFn.java
@@ -63,7 +63,7 @@ class NoOpOldDoFn<InputT, OutputT> extends OldDoFn<InputT, OutputT> {
Instant timestamp) {
}
@Override
- public <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT>
+ protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT>
createAggregatorInternal(String name, CombineFn<AggInputT, ?, AggOutputT> combiner) {
return null;
}