You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/01/20 22:38:34 UTC
[1/4] beam git commit: Moves OldDoFn to runners-core
Repository: beam
Updated Branches:
refs/heads/master 77c7505b8 -> a6810372b
http://git-wip-us.apache.org/repos/asf/beam/blob/5f8b8c5b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java
deleted file mode 100644
index 7b04533..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java
+++ /dev/null
@@ -1,465 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.transforms;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
-import static com.google.common.base.Preconditions.checkState;
-
-import java.io.Serializable;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-import org.apache.beam.sdk.annotations.Experimental;
-import org.apache.beam.sdk.annotations.Experimental.Kind;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.runners.PipelineRunner;
-import org.apache.beam.sdk.transforms.Combine.CombineFn;
-import org.apache.beam.sdk.transforms.display.DisplayData;
-import org.apache.beam.sdk.transforms.display.HasDisplayData;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.util.WindowingInternals;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.TupleTag;
-import org.joda.time.Duration;
-import org.joda.time.Instant;
-
-/**
- * The argument to {@link ParDo} providing the code to use to process
- * elements of the input
- * {@link org.apache.beam.sdk.values.PCollection}.
- *
- * <p>See {@link ParDo} for more explanation, examples of use, and
- * discussion of constraints on {@code OldDoFn}s, including their
- * serializability, lack of access to global shared mutable state,
- * requirements for failure tolerance, and benefits of optimization.
- *
- * <p>{@code OldDoFn}s can be tested in the context of a particular
- * {@code Pipeline} by running that {@code Pipeline} on sample input
- * and then checking its output. Unit testing of a {@code OldDoFn},
- * separately from any {@code ParDo} transform or {@code Pipeline},
- * can be done via the {@link DoFnTester} harness.
- *
- * <p>{@link DoFn} (currently experimental) offers an alternative
- * mechanism for accessing {@link ProcessContext#window()} without the need
- * to implement {@link RequiresWindowAccess}.
- *
- * <p>See also {@link #processElement} for details on implementing the transformation
- * from {@code InputT} to {@code OutputT}.
- *
- * @param <InputT> the type of the (main) input elements
- * @param <OutputT> the type of the (main) output elements
- * @deprecated Uses of {@link OldDoFn} should be replaced by the new {@link DoFn}.
- */
-@Deprecated
-public abstract class OldDoFn<InputT, OutputT> implements Serializable, HasDisplayData {
- /**
- * Information accessible to all methods in this {@code OldDoFn}.
- * Used primarily to output elements.
- */
- public abstract class Context {
-
- /**
- * Returns the {@code PipelineOptions} specified with the
- * {@link org.apache.beam.sdk.runners.PipelineRunner}
- * invoking this {@code OldDoFn}. The {@code PipelineOptions} will
- * be the default running via {@link DoFnTester}.
- */
- public abstract PipelineOptions getPipelineOptions();
-
- /**
- * Adds the given element to the main output {@code PCollection}.
- *
- * <p>Once passed to {@code output} the element should be considered
- * immutable and not be modified in any way. It may be cached or retained
- * by a Beam runner or later steps in the pipeline, or used in
- * other unspecified ways.
- *
- * <p>If invoked from {@link OldDoFn#processElement processElement}, the output
- * element will have the same timestamp and be in the same windows
- * as the input element passed to {@link OldDoFn#processElement processElement}.
- *
- * <p>If invoked from {@link #startBundle startBundle} or {@link #finishBundle finishBundle},
- * this will attempt to use the
- * {@link org.apache.beam.sdk.transforms.windowing.WindowFn}
- * of the input {@code PCollection} to determine what windows the element
- * should be in, throwing an exception if the {@code WindowFn} attempts
- * to access any information about the input element. The output element
- * will have a timestamp of negative infinity.
- */
- public abstract void output(OutputT output);
-
- /**
- * Adds the given element to the main output {@code PCollection},
- * with the given timestamp.
- *
- * <p>Once passed to {@code outputWithTimestamp} the element should not be
- * modified in any way.
- *
- * <p>If invoked from {@link OldDoFn#processElement processElement}, the timestamp
- * must not be older than the input element's timestamp minus
- * {@link OldDoFn#getAllowedTimestampSkew getAllowedTimestampSkew}. The output element will
- * be in the same windows as the input element.
- *
- * <p>If invoked from {@link #startBundle startBundle} or {@link #finishBundle finishBundle},
- * this will attempt to use the
- * {@link org.apache.beam.sdk.transforms.windowing.WindowFn}
- * of the input {@code PCollection} to determine what windows the element
- * should be in, throwing an exception if the {@code WindowFn} attempts
- * to access any information about the input element except for the
- * timestamp.
- */
- public abstract void outputWithTimestamp(OutputT output, Instant timestamp);
-
- /**
- * Adds the given element to the side output {@code PCollection} with the
- * given tag.
- *
- * <p>Once passed to {@code sideOutput} the element should not be modified
- * in any way.
- *
- * <p>The caller of {@code ParDo} uses {@link ParDo#withOutputTags withOutputTags} to
- * specify the tags of side outputs that it consumes. Non-consumed side
- * outputs, e.g., outputs for monitoring purposes only, don't necessarily
- * need to be specified.
- *
- * <p>The output element will have the same timestamp and be in the same
- * windows as the input element passed to {@link OldDoFn#processElement processElement}.
- *
- * <p>If invoked from {@link #startBundle startBundle} or {@link #finishBundle finishBundle},
- * this will attempt to use the
- * {@link org.apache.beam.sdk.transforms.windowing.WindowFn}
- * of the input {@code PCollection} to determine what windows the element
- * should be in, throwing an exception if the {@code WindowFn} attempts
- * to access any information about the input element. The output element
- * will have a timestamp of negative infinity.
- *
- * @see ParDo#withOutputTags
- */
- public abstract <T> void sideOutput(TupleTag<T> tag, T output);
-
- /**
- * Adds the given element to the specified side output {@code PCollection},
- * with the given timestamp.
- *
- * <p>Once passed to {@code sideOutputWithTimestamp} the element should not be
- * modified in any way.
- *
- * <p>If invoked from {@link OldDoFn#processElement processElement}, the timestamp
- * must not be older than the input element's timestamp minus
- * {@link OldDoFn#getAllowedTimestampSkew getAllowedTimestampSkew}. The output element will
- * be in the same windows as the input element.
- *
- * <p>If invoked from {@link #startBundle startBundle} or {@link #finishBundle finishBundle},
- * this will attempt to use the
- * {@link org.apache.beam.sdk.transforms.windowing.WindowFn}
- * of the input {@code PCollection} to determine what windows the element
- * should be in, throwing an exception if the {@code WindowFn} attempts
- * to access any information about the input element except for the
- * timestamp.
- *
- * @see ParDo#withOutputTags
- */
- public abstract <T> void sideOutputWithTimestamp(
- TupleTag<T> tag, T output, Instant timestamp);
-
- /**
- * Creates an {@link Aggregator} in the {@link OldDoFn} context with the
- * specified name and aggregation logic specified by {@link CombineFn}.
- *
- * <p>For internal use only.
- *
- * @param name the name of the aggregator
- * @param combiner the {@link CombineFn} to use in the aggregator
- * @return an aggregator for the provided name and {@link CombineFn} in this
- * context
- */
- @Experimental(Kind.AGGREGATOR)
- public abstract <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT>
- createAggregatorInternal(String name, CombineFn<AggInputT, ?, AggOutputT> combiner);
-
- /**
- * Sets up {@link Aggregator}s created by the {@link OldDoFn} so they are
- * usable within this context.
- *
- * <p>This method should be called by runners before {@link OldDoFn#startBundle}
- * is executed.
- */
- @Experimental(Kind.AGGREGATOR)
- protected final void setupDelegateAggregators() {
- for (DelegatingAggregator<?, ?> aggregator : aggregators.values()) {
- setupDelegateAggregator(aggregator);
- }
-
- aggregatorsAreFinal = true;
- }
-
- private <AggInputT, AggOutputT> void setupDelegateAggregator(
- DelegatingAggregator<AggInputT, AggOutputT> aggregator) {
-
- Aggregator<AggInputT, AggOutputT> delegate = createAggregatorInternal(
- aggregator.getName(), aggregator.getCombineFn());
-
- aggregator.setDelegate(delegate);
- }
- }
-
- /**
- * Information accessible when running {@link OldDoFn#processElement}.
- */
- public abstract class ProcessContext extends Context {
-
- /**
- * Returns the input element to be processed.
- *
- * <p>The element should be considered immutable. A Beam runner will not mutate the
- * element, so it is safe to cache, etc. The element should not be mutated by any of the
- * {@link OldDoFn} methods, because it may be cached elsewhere, retained by the runner
- * runtime, or used in other unspecified ways.
- */
- public abstract InputT element();
-
- /**
- * Returns the value of the side input for the window corresponding to the
- * window of the main input element.
- *
- * <p>See
- * {@link org.apache.beam.sdk.transforms.windowing.WindowFn#getSideInputWindow}
- * for how this corresponding window is determined.
- *
- * @throws IllegalArgumentException if this is not a side input
- * @see ParDo#withSideInputs
- */
- public abstract <T> T sideInput(PCollectionView<T> view);
-
- /**
- * Returns the timestamp of the input element.
- *
- * <p>See {@link org.apache.beam.sdk.transforms.windowing.Window}
- * for more information.
- */
- public abstract Instant timestamp();
-
- /**
- * Returns the window into which the input element has been assigned.
- *
- * <p>See {@link org.apache.beam.sdk.transforms.windowing.Window}
- * for more information.
- *
- * @throws UnsupportedOperationException if this {@link OldDoFn} does
- * not implement {@link RequiresWindowAccess}.
- */
- public abstract BoundedWindow window();
-
- /**
- * Returns information about the pane within this window into which the
- * input element has been assigned.
- *
- * <p>Generally all data is in a single, uninteresting pane unless custom
- * triggering and/or late data has been explicitly requested.
- * See {@link org.apache.beam.sdk.transforms.windowing.Window}
- * for more information.
- */
- public abstract PaneInfo pane();
-
- /**
- * Returns the process context to use for implementing windowing.
- */
- @Experimental
- public abstract WindowingInternals<InputT, OutputT> windowingInternals();
- }
-
- /**
- * Returns the allowed timestamp skew duration, which is the maximum
- * duration that timestamps can be shifted backward in
- * {@link OldDoFn.Context#outputWithTimestamp}.
- *
- * <p>The default value is {@code Duration.ZERO}, in which case
- * timestamps can only be shifted forward to future. For infinite
- * skew, return {@code Duration.millis(Long.MAX_VALUE)}.
- *
- * <p>Note that producing an element whose timestamp is less than the
- * current timestamp may result in late data, i.e. returning a non-zero
- * value here does not impact watermark calculations used for firing
- * windows.
- *
- * @deprecated does not interact well with the watermark.
- */
- @Deprecated
- public Duration getAllowedTimestampSkew() {
- return Duration.ZERO;
- }
-
- /**
- * Interface for signaling that a {@link OldDoFn} needs to access the window the
- * element is being processed in, via {@link OldDoFn.ProcessContext#window}.
- */
- @Experimental
- public interface RequiresWindowAccess {}
-
- public OldDoFn() {
- this(new HashMap<String, DelegatingAggregator<?, ?>>());
- }
-
- public OldDoFn(Map<String, DelegatingAggregator<?, ?>> aggregators) {
- this.aggregators = aggregators;
- }
-
- /////////////////////////////////////////////////////////////////////////////
-
- private final Map<String, DelegatingAggregator<?, ?>> aggregators;
-
- /**
- * Protects aggregators from being created after initialization.
- */
- private boolean aggregatorsAreFinal;
-
- /**
- * Prepares this {@link DoFn} instance for processing bundles.
- *
- * <p>{@link #setup()} will be called at most once per {@link DoFn} instance, and before any other
- * {@link DoFn} method is called.
- *
- * <p>By default, does nothing.
- */
- public void setup() throws Exception {
- }
-
- /**
- * Prepares this {@code OldDoFn} instance for processing a batch of elements.
- *
- * <p>By default, does nothing.
- */
- public void startBundle(Context c) throws Exception {
- }
-
- /**
- * Processes one input element.
- *
- * <p>The current element of the input {@code PCollection} is returned by
- * {@link ProcessContext#element() c.element()}. It should be considered immutable. The Beam
- * runner will not mutate the element, so it is safe to cache, etc. The element should not be
- * mutated by any of the {@link OldDoFn} methods, because it may be cached elsewhere, retained by
- * the Beam runner, or used in other unspecified ways.
- *
- * <p>A value is added to the main output {@code PCollection} by {@link ProcessContext#output}.
- * Once passed to {@code output} the element should be considered immutable and not be modified in
- * any way. It may be cached elsewhere, retained by the Beam runner, or used in other
- * unspecified ways.
- *
- * @see ProcessContext
- */
- public abstract void processElement(ProcessContext c) throws Exception;
-
- /**
- * Finishes processing this batch of elements.
- *
- * <p>By default, does nothing.
- */
- public void finishBundle(Context c) throws Exception {
- }
-
- /**
- * Cleans up this {@link DoFn}.
- *
- * <p>{@link #teardown()} will be called before the {@link PipelineRunner} discards a {@link DoFn}
- * instance, including due to another {@link DoFn} method throwing an {@link Exception}. No other
- * {@link DoFn} methods will be called after a call to {@link #teardown()}.
- *
- * <p>By default, does nothing.
- */
- public void teardown() throws Exception {
- }
-
- /////////////////////////////////////////////////////////////////////////////
-
- /**
- * {@inheritDoc}
- *
- * <p>By default, does not register any display data. Implementors may override this method
- * to provide their own display data.
- */
- @Override
- public void populateDisplayData(DisplayData.Builder builder) {
- }
-
- /////////////////////////////////////////////////////////////////////////////
-
- /**
- * Returns an {@link Aggregator} with aggregation logic specified by the
- * {@link CombineFn} argument. The name provided must be unique across
- * {@link Aggregator}s created within the OldDoFn. Aggregators can only be created
- * during pipeline construction.
- *
- * @param name the name of the aggregator
- * @param combiner the {@link CombineFn} to use in the aggregator
- * @return an aggregator for the provided name and combiner in the scope of
- * this OldDoFn
- * @throws NullPointerException if the name or combiner is null
- * @throws IllegalArgumentException if the given name collides with another
- * aggregator in this scope
- * @throws IllegalStateException if called during pipeline processing.
- */
- protected final <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT>
- createAggregator(String name, CombineFn<? super AggInputT, ?, AggOutputT> combiner) {
- checkNotNull(name, "name cannot be null");
- checkNotNull(combiner, "combiner cannot be null");
- checkArgument(!aggregators.containsKey(name),
- "Cannot create aggregator with name %s."
- + " An Aggregator with that name already exists within this scope.",
- name);
-
- checkState(!aggregatorsAreFinal, "Cannot create an aggregator during OldDoFn processing."
- + " Aggregators should be registered during pipeline construction.");
-
- DelegatingAggregator<AggInputT, AggOutputT> aggregator =
- new DelegatingAggregator<>(name, combiner);
- aggregators.put(name, aggregator);
- return aggregator;
- }
-
- /**
- * Returns an {@link Aggregator} with the aggregation logic specified by the
- * {@link SerializableFunction} argument. The name provided must be unique
- * across {@link Aggregator}s created within the OldDoFn. Aggregators can only be
- * created during pipeline construction.
- *
- * @param name the name of the aggregator
- * @param combiner the {@link SerializableFunction} to use in the aggregator
- * @return an aggregator for the provided name and combiner in the scope of
- * this OldDoFn
- * @throws NullPointerException if the name or combiner is null
- * @throws IllegalArgumentException if the given name collides with another
- * aggregator in this scope
- * @throws IllegalStateException if called during pipeline processing.
- */
- protected final <AggInputT> Aggregator<AggInputT, AggInputT> createAggregator(String name,
- SerializableFunction<Iterable<AggInputT>, AggInputT> combiner) {
- checkNotNull(combiner, "combiner cannot be null.");
- return createAggregator(name, Combine.IterableCombineFn.of(combiner));
- }
-
- /**
- * Returns the {@link Aggregator Aggregators} created by this {@code OldDoFn}.
- */
- Collection<Aggregator<?, ?>> getAggregators() {
- return Collections.<Aggregator<?, ?>>unmodifiableCollection(aggregators.values());
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/5f8b8c5b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NameUtils.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NameUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NameUtils.java
index 1c59af7..72179a3 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NameUtils.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NameUtils.java
@@ -40,7 +40,7 @@ public class NameUtils {
}
private static final String[] STANDARD_NAME_SUFFIXES =
- new String[]{"OldDoFn", "DoFn", "CombineFn", "Fn"};
+ new String[]{"DoFn", "CombineFn", "Fn"};
/**
* Pattern to match a non-anonymous inner class.
http://git-wip-us.apache.org/repos/asf/beam/blob/5f8b8c5b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnDelegatingAggregatorTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnDelegatingAggregatorTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnDelegatingAggregatorTest.java
deleted file mode 100644
index f51a6b0..0000000
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnDelegatingAggregatorTest.java
+++ /dev/null
@@ -1,142 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.transforms;
-
-import static org.junit.Assert.assertEquals;
-import static org.mockito.Matchers.anyLong;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.verify;
-
-import org.apache.beam.sdk.transforms.Combine.CombineFn;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-import org.mockito.Mock;
-import org.mockito.MockitoAnnotations;
-
-/**
- * Tests for {@link DelegatingAggregator}.
- */
-@RunWith(JUnit4.class)
-public class DoFnDelegatingAggregatorTest {
-
- @Mock
- private Aggregator<Long, Long> delegate;
-
- @Rule
- public ExpectedException thrown = ExpectedException.none();
-
- @Before
- public void setup() {
- MockitoAnnotations.initMocks(this);
- }
-
- @Test
- public void testAddValueWithoutDelegateThrowsException() {
- OldDoFn<Void, Void> doFn = new NoOpOldDoFn<>();
-
- String name = "agg";
- CombineFn<Double, ?, Double> combiner = mockCombineFn(Double.class);
-
- DelegatingAggregator<Double, Double> aggregator =
- (DelegatingAggregator<Double, Double>) doFn.createAggregator(name, combiner);
-
- thrown.expect(IllegalStateException.class);
- thrown.expectMessage("cannot be called");
- thrown.expectMessage("DoFn");
-
- aggregator.addValue(21.2);
- }
-
- @Test
- public void testSetDelegateThenAddValueCallsDelegate() {
- String name = "agg";
- CombineFn<Long, ?, Long> combiner = mockCombineFn(Long.class);
-
- OldDoFn<Void, Void> doFn = new NoOpOldDoFn<>();
-
- DelegatingAggregator<Long, Long> aggregator =
- (DelegatingAggregator<Long, Long>) doFn.createAggregator(name, combiner);
-
- aggregator.setDelegate(delegate);
-
- aggregator.addValue(12L);
-
- verify(delegate).addValue(12L);
- }
-
- @Test
- public void testSetDelegateWithExistingDelegateStartsDelegatingToSecond() {
- String name = "agg";
- CombineFn<Double, ?, Double> combiner = mockCombineFn(Double.class);
-
- OldDoFn<Void, Void> doFn = new NoOpOldDoFn<>();
-
- DelegatingAggregator<Double, Double> aggregator =
- (DelegatingAggregator<Double, Double>) doFn.createAggregator(name, combiner);
-
- @SuppressWarnings("unchecked")
- Aggregator<Double, Double> secondDelegate =
- mock(Aggregator.class, "secondDelegate");
-
- aggregator.setDelegate(aggregator);
- aggregator.setDelegate(secondDelegate);
-
- aggregator.addValue(2.25);
-
- verify(secondDelegate).addValue(2.25);
- verify(delegate, never()).addValue(anyLong());
- }
-
- @Test
- public void testGetNameReturnsName() {
- String name = "agg";
- CombineFn<Double, ?, Double> combiner = mockCombineFn(Double.class);
-
- OldDoFn<Void, Void> doFn = new NoOpOldDoFn<>();
-
- DelegatingAggregator<Double, Double> aggregator =
- (DelegatingAggregator<Double, Double>) doFn.createAggregator(name, combiner);
-
- assertEquals(name, aggregator.getName());
- }
-
- @Test
- public void testGetCombineFnReturnsCombineFn() {
- String name = "agg";
- CombineFn<Double, ?, Double> combiner = mockCombineFn(Double.class);
-
- OldDoFn<Void, Void> doFn = new NoOpOldDoFn<>();
-
- DelegatingAggregator<Double, Double> aggregator =
- (DelegatingAggregator<Double, Double>) doFn.createAggregator(name, combiner);
-
- assertEquals(combiner, aggregator.getCombineFn());
- }
-
- @SuppressWarnings("unchecked")
- private static <T> CombineFn<T, ?, T> mockCombineFn(
- @SuppressWarnings("unused") Class<T> clazz) {
- return mock(CombineFn.class);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/5f8b8c5b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/NoOpOldDoFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/NoOpOldDoFn.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/NoOpOldDoFn.java
deleted file mode 100644
index 0db130d..0000000
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/NoOpOldDoFn.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.transforms;
-
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.Combine.CombineFn;
-import org.apache.beam.sdk.values.TupleTag;
-import org.joda.time.Instant;
-
-/**
- * A {@link OldDoFn} that does nothing with provided elements. Used for testing
- * methods provided by the {@link OldDoFn} abstract class.
- *
- * @param <InputT> unused.
- * @param <OutputT> unused.
- */
-class NoOpOldDoFn<InputT, OutputT> extends OldDoFn<InputT, OutputT> {
- @Override
- public void processElement(OldDoFn<InputT, OutputT>.ProcessContext c) throws Exception {
- }
-
- /**
- * Returns a new NoOp Context.
- */
- public OldDoFn<InputT, OutputT>.Context context() {
- return new NoOpDoFnContext();
- }
-
- /**
- * A {@link OldDoFn.Context} that does nothing and returns exclusively null.
- */
- private class NoOpDoFnContext extends OldDoFn<InputT, OutputT>.Context {
- @Override
- public PipelineOptions getPipelineOptions() {
- return null;
- }
- @Override
- public void output(OutputT output) {
- }
- @Override
- public void outputWithTimestamp(OutputT output, Instant timestamp) {
- }
- @Override
- public <T> void sideOutput(TupleTag<T> tag, T output) {
- }
- @Override
- public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output,
- Instant timestamp) {
- }
- @Override
- public <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT>
- createAggregatorInternal(String name, CombineFn<AggInputT, ?, AggOutputT> combiner) {
- return null;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/5f8b8c5b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/OldDoFnContextTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/OldDoFnContextTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/OldDoFnContextTest.java
deleted file mode 100644
index b5cb286..0000000
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/OldDoFnContextTest.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.transforms;
-
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-import org.mockito.Mock;
-import org.mockito.MockitoAnnotations;
-
-/**
- * Tests for {@link OldDoFn.Context}.
- */
-@RunWith(JUnit4.class)
-public class OldDoFnContextTest {
-
- @Mock
- private Aggregator<Long, Long> agg;
-
- private OldDoFn<Object, Object> fn;
- private OldDoFn<Object, Object>.Context context;
-
- @Before
- public void setup() {
- MockitoAnnotations.initMocks(this);
-
- // Need to be real objects to call the constructor, and to reference the
- // outer instance of OldDoFn
- NoOpOldDoFn<Object, Object> noOpFn = new NoOpOldDoFn<>();
- OldDoFn<Object, Object>.Context noOpContext = noOpFn.context();
-
- fn = spy(noOpFn);
- context = spy(noOpContext);
- }
-
- @Test
- public void testSetupDelegateAggregatorsCreatesAndLinksDelegateAggregators() {
- Combine.BinaryCombineLongFn combiner = Sum.ofLongs();
- Aggregator<Long, Long> delegateAggregator =
- fn.createAggregator("test", combiner);
-
- when(context.createAggregatorInternal("test", combiner)).thenReturn(agg);
-
- context.setupDelegateAggregators();
- delegateAggregator.addValue(1L);
-
- verify(agg).addValue(1L);
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/5f8b8c5b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/OldDoFnTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/OldDoFnTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/OldDoFnTest.java
deleted file mode 100644
index 1c767b1..0000000
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/OldDoFnTest.java
+++ /dev/null
@@ -1,187 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.transforms;
-
-import static org.hamcrest.Matchers.empty;
-import static org.hamcrest.Matchers.isA;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotEquals;
-import static org.junit.Assert.assertThat;
-
-import java.io.Serializable;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.Combine.CombineFn;
-import org.apache.beam.sdk.transforms.display.DisplayData;
-import org.apache.beam.sdk.values.TupleTag;
-import org.joda.time.Instant;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/**
- * Tests for OldDoFn.
- */
-@RunWith(JUnit4.class)
-public class OldDoFnTest implements Serializable {
-
- @Rule
- public transient ExpectedException thrown = ExpectedException.none();
-
- @Test
- public void testCreateAggregatorWithCombinerSucceeds() {
- String name = "testAggregator";
- Combine.BinaryCombineLongFn combiner = Sum.ofLongs();
-
- OldDoFn<Void, Void> doFn = new NoOpOldDoFn<>();
-
- Aggregator<Long, Long> aggregator = doFn.createAggregator(name, combiner);
-
- assertEquals(name, aggregator.getName());
- assertEquals(combiner, aggregator.getCombineFn());
- }
-
- @Test
- public void testCreateAggregatorWithNullNameThrowsException() {
- thrown.expect(NullPointerException.class);
- thrown.expectMessage("name cannot be null");
-
- OldDoFn<Void, Void> doFn = new NoOpOldDoFn<>();
-
- doFn.createAggregator(null, Sum.ofLongs());
- }
-
- @Test
- public void testCreateAggregatorWithNullCombineFnThrowsException() {
- CombineFn<Object, Object, Object> combiner = null;
-
- thrown.expect(NullPointerException.class);
- thrown.expectMessage("combiner cannot be null");
-
- OldDoFn<Void, Void> doFn = new NoOpOldDoFn<>();
-
- doFn.createAggregator("testAggregator", combiner);
- }
-
- @Test
- public void testCreateAggregatorWithNullSerializableFnThrowsException() {
- SerializableFunction<Iterable<Object>, Object> combiner = null;
-
- thrown.expect(NullPointerException.class);
- thrown.expectMessage("combiner cannot be null");
-
- OldDoFn<Void, Void> doFn = new NoOpOldDoFn<>();
-
- doFn.createAggregator("testAggregator", combiner);
- }
-
- @Test
- public void testCreateAggregatorWithSameNameThrowsException() {
- String name = "testAggregator";
- CombineFn<Double, ?, Double> combiner = Max.ofDoubles();
-
- OldDoFn<Void, Void> doFn = new NoOpOldDoFn<>();
-
- doFn.createAggregator(name, combiner);
-
- thrown.expect(IllegalArgumentException.class);
- thrown.expectMessage("Cannot create");
- thrown.expectMessage(name);
- thrown.expectMessage("already exists");
-
- doFn.createAggregator(name, combiner);
- }
-
- @Test
- public void testCreateAggregatorsWithDifferentNamesSucceeds() {
- String nameOne = "testAggregator";
- String nameTwo = "aggregatorPrime";
- CombineFn<Double, ?, Double> combiner = Max.ofDoubles();
-
- OldDoFn<Void, Void> doFn = new NoOpOldDoFn<>();
-
- Aggregator<Double, Double> aggregatorOne =
- doFn.createAggregator(nameOne, combiner);
- Aggregator<Double, Double> aggregatorTwo =
- doFn.createAggregator(nameTwo, combiner);
-
- assertNotEquals(aggregatorOne, aggregatorTwo);
- }
-
- @Test
- public void testCreateAggregatorThrowsWhenAggregatorsAreFinal() throws Exception {
- OldDoFn<String, String> fn = new OldDoFn<String, String>() {
- @Override
- public void processElement(ProcessContext c) throws Exception { }
- };
- OldDoFn<String, String>.Context context = createContext(fn);
- context.setupDelegateAggregators();
-
- thrown.expect(isA(IllegalStateException.class));
- fn.createAggregator("anyAggregate", Max.ofIntegers());
- }
-
- private OldDoFn<String, String>.Context createContext(OldDoFn<String, String> fn) {
- return fn.new Context() {
- @Override
- public PipelineOptions getPipelineOptions() {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void output(String output) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void outputWithTimestamp(String output, Instant timestamp) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public <T> void sideOutput(TupleTag<T> tag, T output) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public <AggInputT, AggOutputT>
- Aggregator<AggInputT, AggOutputT> createAggregatorInternal(
- String name, CombineFn<AggInputT, ?, AggOutputT> combiner) {
- throw new UnsupportedOperationException();
- }
- };
- }
-
- @Test
- public void testPopulateDisplayDataDefaultBehavior() {
- OldDoFn<String, String> usesDefault =
- new OldDoFn<String, String>() {
- @Override
- public void processElement(ProcessContext c) throws Exception {}
- };
-
- DisplayData data = DisplayData.from(usesDefault);
- assertThat(data.items(), empty());
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/5f8b8c5b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/NameUtilsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/NameUtilsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/NameUtilsTest.java
index b81aa36..6848ea4 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/NameUtilsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/NameUtilsTest.java
@@ -42,22 +42,18 @@ public class NameUtilsTest {
@Test
public void testDropsStandardSuffixes() {
- assertEquals("Embedded", NameUtils.approximateSimpleName("EmbeddedOldDoFn", true));
assertEquals("Embedded", NameUtils.approximateSimpleName("EmbeddedDoFn", true));
assertEquals("Embedded", NameUtils.approximateSimpleName("EmbeddedFn", true));
- assertEquals("Embedded", NameUtils.approximateSimpleName("EmbeddedOldDoFn", false));
assertEquals("Embedded", NameUtils.approximateSimpleName("EmbeddedDoFn", false));
assertEquals("Embedded", NameUtils.approximateSimpleName("EmbeddedFn", false));
}
@Test
public void testDropsStandardSuffixesInAllComponents() {
- assertEquals("Embedded", NameUtils.approximateSimpleName("SomeOldDoFn$EmbeddedFn", true));
assertEquals("Embedded", NameUtils.approximateSimpleName("SomeDoFn$EmbeddedDoFn", true));
assertEquals("Embedded", NameUtils.approximateSimpleName("SomeFn$EmbeddedFn", true));
- assertEquals("Some.Embedded", NameUtils.approximateSimpleName("SomeOldDoFn$EmbeddedFn", false));
assertEquals("Some.Embedded", NameUtils.approximateSimpleName("SomeDoFn$EmbeddedDoFn", false));
assertEquals("Some.Embedded", NameUtils.approximateSimpleName("SomeFn$EmbeddedFn", false));
}
@@ -79,12 +75,12 @@ public class NameUtilsTest {
/**
* Inner class for simple name test.
*/
- private class EmbeddedOldDoFn {
+ private class EmbeddedDoFn {
- private class DeeperEmbeddedOldDoFn extends EmbeddedOldDoFn {}
+ private class DeeperEmbeddedDoFn extends EmbeddedDoFn {}
- private EmbeddedOldDoFn getEmbedded() {
- return new DeeperEmbeddedOldDoFn();
+ private EmbeddedDoFn getEmbedded() {
+ return new DeeperEmbeddedDoFn();
}
}
@@ -112,18 +108,18 @@ public class NameUtilsTest {
@Test
public void testSimpleName() {
- assertEquals("Embedded", NameUtils.approximateSimpleName(new EmbeddedOldDoFn()));
+ assertEquals("Embedded", NameUtils.approximateSimpleName(new EmbeddedDoFn()));
}
@Test
public void testAnonSimpleName() throws Exception {
- assertEquals("Anonymous", NameUtils.approximateSimpleName(new EmbeddedOldDoFn() {}));
+ assertEquals("Anonymous", NameUtils.approximateSimpleName(new EmbeddedDoFn() {}));
}
@Test
public void testNestedSimpleName() {
- EmbeddedOldDoFn fn = new EmbeddedOldDoFn();
- EmbeddedOldDoFn inner = fn.getEmbedded();
+ EmbeddedDoFn fn = new EmbeddedDoFn();
+ EmbeddedDoFn inner = fn.getEmbedded();
assertEquals("DeeperEmbedded", NameUtils.approximateSimpleName(inner));
}
[2/4] beam git commit: Moves OldDoFn to runners-core
Posted by ke...@apache.org.
Moves OldDoFn to runners-core
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/5f8b8c5b
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/5f8b8c5b
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/5f8b8c5b
Branch: refs/heads/master
Commit: 5f8b8c5b06cfd49c4293a20dff2eea99f1076444
Parents: 77c7505
Author: Eugene Kirpichov <ki...@google.com>
Authored: Tue Jan 17 16:12:39 2017 -0800
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Fri Jan 20 13:31:58 2017 -0800
----------------------------------------------------------------------
.../apex/translation/WindowBoundTranslator.java | 2 +-
.../operators/ApexGroupByKeyOperator.java | 2 +-
.../operators/ApexParDoOperator.java | 2 +-
.../beam/runners/core/AssignWindowsDoFn.java | 3 +-
.../apache/beam/runners/core/DoFnAdapters.java | 1 -
.../apache/beam/runners/core/DoFnRunner.java | 1 -
.../apache/beam/runners/core/DoFnRunners.java | 1 -
.../core/GroupAlsoByWindowViaWindowSetDoFn.java | 1 -
.../runners/core/GroupAlsoByWindowsDoFn.java | 1 -
.../core/LateDataDroppingDoFnRunner.java | 1 -
.../org/apache/beam/runners/core/OldDoFn.java | 472 +++++++++++++++++++
.../beam/runners/core/SimpleOldDoFnRunner.java | 3 +-
.../core/DoFnDelegatingAggregatorTest.java | 144 ++++++
.../apache/beam/runners/core/NoOpOldDoFn.java | 72 +++
.../beam/runners/core/OldDoFnContextTest.java | 72 +++
.../apache/beam/runners/core/OldDoFnTest.java | 192 ++++++++
.../runners/core/SimpleOldDoFnRunnerTest.java | 2 +-
.../runners/flink/OldPerKeyCombineFnRunner.java | 2 +-
.../flink/OldPerKeyCombineFnRunners.java | 2 +-
.../functions/FlinkDoFnFunction.java | 2 +-
.../FlinkMergingNonShuffleReduceFunction.java | 2 +-
.../functions/FlinkMultiOutputDoFnFunction.java | 2 +-
.../FlinkMultiOutputProcessContext.java | 2 +-
.../functions/FlinkNoElementAssignContext.java | 2 +-
.../functions/FlinkPartialReduceFunction.java | 2 +-
.../functions/FlinkProcessContextBase.java | 2 +-
.../functions/FlinkReduceFunction.java | 2 +-
.../FlinkSingleOutputProcessContext.java | 2 +-
.../wrappers/streaming/DoFnOperator.java | 2 +-
.../wrappers/streaming/WindowDoFnOperator.java | 2 +-
.../beam/runners/dataflow/util/DoFnInfo.java | 4 +-
.../sdk/transforms/DelegatingAggregator.java | 2 +-
.../org/apache/beam/sdk/transforms/OldDoFn.java | 465 ------------------
.../org/apache/beam/sdk/util/NameUtils.java | 2 +-
.../DoFnDelegatingAggregatorTest.java | 142 ------
.../apache/beam/sdk/transforms/NoOpOldDoFn.java | 71 ---
.../beam/sdk/transforms/OldDoFnContextTest.java | 69 ---
.../apache/beam/sdk/transforms/OldDoFnTest.java | 187 --------
.../org/apache/beam/sdk/util/NameUtilsTest.java | 20 +-
39 files changed, 982 insertions(+), 978 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/5f8b8c5b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/WindowBoundTranslator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/WindowBoundTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/WindowBoundTranslator.java
index ef049e1..50af81d 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/WindowBoundTranslator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/WindowBoundTranslator.java
@@ -23,8 +23,8 @@ import org.apache.beam.runners.apex.ApexPipelineOptions;
import org.apache.beam.runners.apex.translation.operators.ApexParDoOperator;
import org.apache.beam.runners.core.AssignWindowsDoFn;
import org.apache.beam.runners.core.DoFnAdapters;
+import org.apache.beam.runners.core.OldDoFn;
import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.util.WindowedValue;
http://git-wip-us.apache.org/repos/asf/beam/blob/5f8b8c5b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java
index 173434f..4c2b461 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java
@@ -44,6 +44,7 @@ import org.apache.beam.runners.apex.translation.utils.SerializablePipelineOption
import org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetDoFn;
import org.apache.beam.runners.core.KeyedWorkItem;
import org.apache.beam.runners.core.KeyedWorkItems;
+import org.apache.beam.runners.core.OldDoFn;
import org.apache.beam.runners.core.SystemReduceFn;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
@@ -52,7 +53,6 @@ import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.GroupByKey;
-import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.CoderUtils;
http://git-wip-us.apache.org/repos/asf/beam/blob/5f8b8c5b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
index de4c15d..808001e 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
@@ -43,6 +43,7 @@ import org.apache.beam.runners.core.DoFnRunner;
import org.apache.beam.runners.core.DoFnRunners;
import org.apache.beam.runners.core.DoFnRunners.OutputManager;
import org.apache.beam.runners.core.ExecutionContext;
+import org.apache.beam.runners.core.OldDoFn;
import org.apache.beam.runners.core.PushbackSideInputDoFnRunner;
import org.apache.beam.runners.core.SideInputHandler;
import org.apache.beam.sdk.coders.Coder;
@@ -50,7 +51,6 @@ import org.apache.beam.sdk.coders.ListCoder;
import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.Combine.CombineFn;
import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.util.NullSideInputReader;
import org.apache.beam.sdk.util.SideInputReader;
import org.apache.beam.sdk.util.UserCodeException;
http://git-wip-us.apache.org/repos/asf/beam/blob/5f8b8c5b/runners/core-java/src/main/java/org/apache/beam/runners/core/AssignWindowsDoFn.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/AssignWindowsDoFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/AssignWindowsDoFn.java
index 0eb1667..bbf3574 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/AssignWindowsDoFn.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/AssignWindowsDoFn.java
@@ -21,8 +21,7 @@ import static com.google.common.base.Preconditions.checkNotNull;
import com.google.common.collect.Iterables;
import java.util.Collection;
-import org.apache.beam.sdk.transforms.OldDoFn;
-import org.apache.beam.sdk.transforms.OldDoFn.RequiresWindowAccess;
+import org.apache.beam.runners.core.OldDoFn.RequiresWindowAccess;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
http://git-wip-us.apache.org/repos/asf/beam/blob/5f8b8c5b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnAdapters.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnAdapters.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnAdapters.java
index 0f5624f..23aba58 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnAdapters.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnAdapters.java
@@ -26,7 +26,6 @@ import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.DoFn.Context;
import org.apache.beam.sdk.transforms.DoFn.OnTimerContext;
import org.apache.beam.sdk.transforms.DoFn.ProcessContext;
-import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
http://git-wip-us.apache.org/repos/asf/beam/blob/5f8b8c5b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunner.java
index 7c73a34..66f95db 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunner.java
@@ -19,7 +19,6 @@ package org.apache.beam.runners.core;
import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.TimeDomain;
import org.apache.beam.sdk.util.WindowedValue;
http://git-wip-us.apache.org/repos/asf/beam/blob/5f8b8c5b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java
index 2f3e93c..f3972ae 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java
@@ -22,7 +22,6 @@ import org.apache.beam.runners.core.ExecutionContext.StepContext;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.SideInputReader;
import org.apache.beam.sdk.util.WindowedValue;
http://git-wip-us.apache.org/repos/asf/beam/blob/5f8b8c5b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java
index d79683a..ecce4fc 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java
@@ -21,7 +21,6 @@ import org.apache.beam.runners.core.DoFnRunner.ReduceFnExecutor;
import org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine;
import org.apache.beam.runners.core.triggers.TriggerStateMachines;
import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.SystemDoFnInternal;
http://git-wip-us.apache.org/repos/asf/beam/blob/5f8b8c5b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsDoFn.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsDoFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsDoFn.java
index 9a2f8fd..7e96136 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsDoFn.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsDoFn.java
@@ -18,7 +18,6 @@
package org.apache.beam.runners.core;
import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.SystemDoFnInternal;
http://git-wip-us.apache.org/repos/asf/beam/blob/5f8b8c5b/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java
index 290171a..9436ccf 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java
@@ -22,7 +22,6 @@ import com.google.common.base.Function;
import com.google.common.base.Predicate;
import com.google.common.collect.Iterables;
import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.TimeDomain;
import org.apache.beam.sdk.util.TimerInternals;
http://git-wip-us.apache.org/repos/asf/beam/blob/5f8b8c5b/runners/core-java/src/main/java/org/apache/beam/runners/core/OldDoFn.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/OldDoFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/OldDoFn.java
new file mode 100644
index 0000000..b099721
--- /dev/null
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/OldDoFn.java
@@ -0,0 +1,472 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.runners.PipelineRunner;
+import org.apache.beam.sdk.transforms.Aggregator;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.Combine.CombineFn;
+import org.apache.beam.sdk.transforms.DelegatingAggregator;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.DoFnTester;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.display.HasDisplayData;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.util.WindowingInternals;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.TupleTag;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+/**
+ * The argument to {@link ParDo} providing the code to use to process
+ * elements of the input
+ * {@link org.apache.beam.sdk.values.PCollection}.
+ *
+ * <p>See {@link ParDo} for more explanation, examples of use, and
+ * discussion of constraints on {@code OldDoFn}s, including their
+ * serializability, lack of access to global shared mutable state,
+ * requirements for failure tolerance, and benefits of optimization.
+ *
+ * <p>{@code OldDoFn}s can be tested in the context of a particular
+ * {@code Pipeline} by running that {@code Pipeline} on sample input
+ * and then checking its output. Unit testing of a {@code OldDoFn},
+ * separately from any {@code ParDo} transform or {@code Pipeline},
+ * can be done via the {@link DoFnTester} harness.
+ *
+ * <p>{@link DoFn} (currently experimental) offers an alternative
+ * mechanism for accessing {@link ProcessContext#window()} without the need
+ * to implement {@link RequiresWindowAccess}.
+ *
+ * <p>See also {@link #processElement} for details on implementing the transformation
+ * from {@code InputT} to {@code OutputT}.
+ *
+ * @param <InputT> the type of the (main) input elements
+ * @param <OutputT> the type of the (main) output elements
+ * @deprecated Uses of {@link OldDoFn} should be replaced by the new {@link DoFn}.
+ */
+@Deprecated
+public abstract class OldDoFn<InputT, OutputT> implements Serializable, HasDisplayData {
+ /**
+ * Information accessible to all methods in this {@code OldDoFn}.
+ * Used primarily to output elements.
+ */
+ public abstract class Context {
+
+ /**
+ * Returns the {@code PipelineOptions} specified with the
+ * {@link org.apache.beam.sdk.runners.PipelineRunner}
+ * invoking this {@code OldDoFn}. The {@code PipelineOptions} will
+ * be the default running via {@link DoFnTester}.
+ */
+ public abstract PipelineOptions getPipelineOptions();
+
+ /**
+ * Adds the given element to the main output {@code PCollection}.
+ *
+ * <p>Once passed to {@code output} the element should be considered
+ * immutable and not be modified in any way. It may be cached or retained
+ * by a Beam runner or later steps in the pipeline, or used in
+ * other unspecified ways.
+ *
+ * <p>If invoked from {@link OldDoFn#processElement processElement}, the output
+ * element will have the same timestamp and be in the same windows
+ * as the input element passed to {@link OldDoFn#processElement processElement}.
+ *
+ * <p>If invoked from {@link #startBundle startBundle} or {@link #finishBundle finishBundle},
+ * this will attempt to use the
+ * {@link org.apache.beam.sdk.transforms.windowing.WindowFn}
+ * of the input {@code PCollection} to determine what windows the element
+ * should be in, throwing an exception if the {@code WindowFn} attempts
+ * to access any information about the input element. The output element
+ * will have a timestamp of negative infinity.
+ */
+ public abstract void output(OutputT output);
+
+ /**
+ * Adds the given element to the main output {@code PCollection},
+ * with the given timestamp.
+ *
+ * <p>Once passed to {@code outputWithTimestamp} the element should not be
+ * modified in any way.
+ *
+ * <p>If invoked from {@link OldDoFn#processElement processElement}, the timestamp
+ * must not be older than the input element's timestamp minus
+ * {@link OldDoFn#getAllowedTimestampSkew getAllowedTimestampSkew}. The output element will
+ * be in the same windows as the input element.
+ *
+ * <p>If invoked from {@link #startBundle startBundle} or {@link #finishBundle finishBundle},
+ * this will attempt to use the
+ * {@link org.apache.beam.sdk.transforms.windowing.WindowFn}
+ * of the input {@code PCollection} to determine what windows the element
+ * should be in, throwing an exception if the {@code WindowFn} attempts
+ * to access any information about the input element except for the
+ * timestamp.
+ */
+ public abstract void outputWithTimestamp(OutputT output, Instant timestamp);
+
+ /**
+ * Adds the given element to the side output {@code PCollection} with the
+ * given tag.
+ *
+ * <p>Once passed to {@code sideOutput} the element should not be modified
+ * in any way.
+ *
+ * <p>The caller of {@code ParDo} uses {@link ParDo#withOutputTags withOutputTags} to
+ * specify the tags of side outputs that it consumes. Non-consumed side
+ * outputs, e.g., outputs for monitoring purposes only, don't necessarily
+ * need to be specified.
+ *
+ * <p>The output element will have the same timestamp and be in the same
+ * windows as the input element passed to {@link OldDoFn#processElement processElement}.
+ *
+ * <p>If invoked from {@link #startBundle startBundle} or {@link #finishBundle finishBundle},
+ * this will attempt to use the
+ * {@link org.apache.beam.sdk.transforms.windowing.WindowFn}
+ * of the input {@code PCollection} to determine what windows the element
+ * should be in, throwing an exception if the {@code WindowFn} attempts
+ * to access any information about the input element. The output element
+ * will have a timestamp of negative infinity.
+ *
+ * @see ParDo#withOutputTags
+ */
+ public abstract <T> void sideOutput(TupleTag<T> tag, T output);
+
+ /**
+ * Adds the given element to the specified side output {@code PCollection},
+ * with the given timestamp.
+ *
+ * <p>Once passed to {@code sideOutputWithTimestamp} the element should not be
+ * modified in any way.
+ *
+ * <p>If invoked from {@link OldDoFn#processElement processElement}, the timestamp
+ * must not be older than the input element's timestamp minus
+ * {@link OldDoFn#getAllowedTimestampSkew getAllowedTimestampSkew}. The output element will
+ * be in the same windows as the input element.
+ *
+ * <p>If invoked from {@link #startBundle startBundle} or {@link #finishBundle finishBundle},
+ * this will attempt to use the
+ * {@link org.apache.beam.sdk.transforms.windowing.WindowFn}
+ * of the input {@code PCollection} to determine what windows the element
+ * should be in, throwing an exception if the {@code WindowFn} attempts
+ * to access any information about the input element except for the
+ * timestamp.
+ *
+ * @see ParDo#withOutputTags
+ */
+ public abstract <T> void sideOutputWithTimestamp(
+ TupleTag<T> tag, T output, Instant timestamp);
+
+ /**
+ * Creates an {@link Aggregator} in the {@link OldDoFn} context with the
+ * specified name and aggregation logic specified by {@link CombineFn}.
+ *
+ * <p>For internal use only.
+ *
+ * @param name the name of the aggregator
+ * @param combiner the {@link CombineFn} to use in the aggregator
+ * @return an aggregator for the provided name and {@link CombineFn} in this
+ * context
+ */
+ @Experimental(Kind.AGGREGATOR)
+ public abstract <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT>
+ createAggregatorInternal(String name, CombineFn<AggInputT, ?, AggOutputT> combiner);
+
+ /**
+ * Sets up {@link Aggregator}s created by the {@link OldDoFn} so they are
+ * usable within this context.
+ *
+ * <p>This method should be called by runners before {@link OldDoFn#startBundle}
+ * is executed.
+ */
+ @Experimental(Kind.AGGREGATOR)
+ protected final void setupDelegateAggregators() {
+ for (DelegatingAggregator<?, ?> aggregator : aggregators.values()) {
+ setupDelegateAggregator(aggregator);
+ }
+
+ aggregatorsAreFinal = true;
+ }
+
+ private <AggInputT, AggOutputT> void setupDelegateAggregator(
+ DelegatingAggregator<AggInputT, AggOutputT> aggregator) {
+
+ Aggregator<AggInputT, AggOutputT> delegate = createAggregatorInternal(
+ aggregator.getName(), aggregator.getCombineFn());
+
+ aggregator.setDelegate(delegate);
+ }
+ }
+
+ /**
+ * Information accessible when running {@link OldDoFn#processElement}.
+ */
+ public abstract class ProcessContext extends Context {
+
+ /**
+ * Returns the input element to be processed.
+ *
+ * <p>The element should be considered immutable. A Beam runner will not mutate the
+ * element, so it is safe to cache, etc. The element should not be mutated by any of the
+ * {@link OldDoFn} methods, because it may be cached elsewhere, retained by the runner
+ * runtime, or used in other unspecified ways.
+ */
+ public abstract InputT element();
+
+ /**
+ * Returns the value of the side input for the window corresponding to the
+ * window of the main input element.
+ *
+ * <p>See
+ * {@link org.apache.beam.sdk.transforms.windowing.WindowFn#getSideInputWindow}
+ * for how this corresponding window is determined.
+ *
+ * @throws IllegalArgumentException if this is not a side input
+ * @see ParDo#withSideInputs
+ */
+ public abstract <T> T sideInput(PCollectionView<T> view);
+
+ /**
+ * Returns the timestamp of the input element.
+ *
+ * <p>See {@link org.apache.beam.sdk.transforms.windowing.Window}
+ * for more information.
+ */
+ public abstract Instant timestamp();
+
+ /**
+ * Returns the window into which the input element has been assigned.
+ *
+ * <p>See {@link org.apache.beam.sdk.transforms.windowing.Window}
+ * for more information.
+ *
+ * @throws UnsupportedOperationException if this {@link OldDoFn} does
+ * not implement {@link RequiresWindowAccess}.
+ */
+ public abstract BoundedWindow window();
+
+ /**
+ * Returns information about the pane within this window into which the
+ * input element has been assigned.
+ *
+ * <p>Generally all data is in a single, uninteresting pane unless custom
+ * triggering and/or late data has been explicitly requested.
+ * See {@link org.apache.beam.sdk.transforms.windowing.Window}
+ * for more information.
+ */
+ public abstract PaneInfo pane();
+
+ /**
+ * Returns the process context to use for implementing windowing.
+ */
+ @Experimental
+ public abstract WindowingInternals<InputT, OutputT> windowingInternals();
+ }
+
+ /**
+ * Returns the allowed timestamp skew duration, which is the maximum
+ * duration that timestamps can be shifted backward in
+ * {@link OldDoFn.Context#outputWithTimestamp}.
+ *
+ * <p>The default value is {@code Duration.ZERO}, in which case
+ * timestamps can only be shifted forward to future. For infinite
+ * skew, return {@code Duration.millis(Long.MAX_VALUE)}.
+ *
+ * <p>Note that producing an element whose timestamp is less than the
+ * current timestamp may result in late data, i.e. returning a non-zero
+ * value here does not impact watermark calculations used for firing
+ * windows.
+ *
+ * @deprecated does not interact well with the watermark.
+ */
+ @Deprecated
+ public Duration getAllowedTimestampSkew() {
+ return Duration.ZERO;
+ }
+
+ /**
+ * Interface for signaling that a {@link OldDoFn} needs to access the window the
+ * element is being processed in, via {@link OldDoFn.ProcessContext#window}.
+ */
+ @Experimental
+ public interface RequiresWindowAccess {}
+
+ public OldDoFn() {
+ this(new HashMap<String, DelegatingAggregator<?, ?>>());
+ }
+
+ public OldDoFn(Map<String, DelegatingAggregator<?, ?>> aggregators) {
+ this.aggregators = aggregators;
+ }
+
+ /////////////////////////////////////////////////////////////////////////////
+
+ private final Map<String, DelegatingAggregator<?, ?>> aggregators;
+
+ /**
+ * Protects aggregators from being created after initialization.
+ */
+ private boolean aggregatorsAreFinal;
+
+ /**
+ * Prepares this {@link DoFn} instance for processing bundles.
+ *
+ * <p>{@link #setup()} will be called at most once per {@link DoFn} instance, and before any other
+ * {@link DoFn} method is called.
+ *
+ * <p>By default, does nothing.
+ */
+ public void setup() throws Exception {
+ }
+
+ /**
+ * Prepares this {@code OldDoFn} instance for processing a batch of elements.
+ *
+ * <p>By default, does nothing.
+ */
+ public void startBundle(Context c) throws Exception {
+ }
+
+ /**
+ * Processes one input element.
+ *
+ * <p>The current element of the input {@code PCollection} is returned by
+ * {@link ProcessContext#element() c.element()}. It should be considered immutable. The Beam
+ * runner will not mutate the element, so it is safe to cache, etc. The element should not be
+ * mutated by any of the {@link OldDoFn} methods, because it may be cached elsewhere, retained by
+ * the Beam runner, or used in other unspecified ways.
+ *
+ * <p>A value is added to the main output {@code PCollection} by {@link ProcessContext#output}.
+ * Once passed to {@code output} the element should be considered immutable and not be modified in
+ * any way. It may be cached elsewhere, retained by the Beam runner, or used in other
+ * unspecified ways.
+ *
+ * @see ProcessContext
+ */
+ public abstract void processElement(ProcessContext c) throws Exception;
+
+ /**
+ * Finishes processing this batch of elements.
+ *
+ * <p>By default, does nothing.
+ */
+ public void finishBundle(Context c) throws Exception {
+ }
+
+ /**
+ * Cleans up this {@link DoFn}.
+ *
+ * <p>{@link #teardown()} will be called before the {@link PipelineRunner} discards a {@link DoFn}
+ * instance, including due to another {@link DoFn} method throwing an {@link Exception}. No other
+ * {@link DoFn} methods will be called after a call to {@link #teardown()}.
+ *
+ * <p>By default, does nothing.
+ */
+ public void teardown() throws Exception {
+ }
+
+ /////////////////////////////////////////////////////////////////////////////
+
+ /**
+ * {@inheritDoc}
+ *
+ * <p>By default, does not register any display data. Implementors may override this method
+ * to provide their own display data.
+ */
+ @Override
+ public void populateDisplayData(DisplayData.Builder builder) {
+ }
+
+ /////////////////////////////////////////////////////////////////////////////
+
+ /**
+ * Returns an {@link Aggregator} with aggregation logic specified by the
+ * {@link CombineFn} argument. The name provided must be unique across
+ * {@link Aggregator}s created within the OldDoFn. Aggregators can only be created
+ * during pipeline construction.
+ *
+ * @param name the name of the aggregator
+ * @param combiner the {@link CombineFn} to use in the aggregator
+ * @return an aggregator for the provided name and combiner in the scope of
+ * this OldDoFn
+ * @throws NullPointerException if the name or combiner is null
+ * @throws IllegalArgumentException if the given name collides with another
+ * aggregator in this scope
+ * @throws IllegalStateException if called during pipeline processing.
+ */
+ protected final <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT>
+ createAggregator(String name, CombineFn<? super AggInputT, ?, AggOutputT> combiner) {
+ checkNotNull(name, "name cannot be null");
+ checkNotNull(combiner, "combiner cannot be null");
+ checkArgument(!aggregators.containsKey(name),
+ "Cannot create aggregator with name %s."
+ + " An Aggregator with that name already exists within this scope.",
+ name);
+
+ checkState(!aggregatorsAreFinal, "Cannot create an aggregator during OldDoFn processing."
+ + " Aggregators should be registered during pipeline construction.");
+
+ DelegatingAggregator<AggInputT, AggOutputT> aggregator =
+ new DelegatingAggregator<>(name, combiner);
+ aggregators.put(name, aggregator);
+ return aggregator;
+ }
+
+ /**
+ * Returns an {@link Aggregator} with the aggregation logic specified by the
+ * {@link SerializableFunction} argument. The name provided must be unique
+ * across {@link Aggregator}s created within the OldDoFn. Aggregators can only be
+ * created during pipeline construction.
+ *
+ * @param name the name of the aggregator
+ * @param combiner the {@link SerializableFunction} to use in the aggregator
+ * @return an aggregator for the provided name and combiner in the scope of
+ * this OldDoFn
+ * @throws NullPointerException if the name or combiner is null
+ * @throws IllegalArgumentException if the given name collides with another
+ * aggregator in this scope
+ * @throws IllegalStateException if called during pipeline processing.
+ */
+ protected final <AggInputT> Aggregator<AggInputT, AggInputT> createAggregator(String name,
+ SerializableFunction<Iterable<AggInputT>, AggInputT> combiner) {
+ checkNotNull(combiner, "combiner cannot be null.");
+ return createAggregator(name, Combine.IterableCombineFn.of(combiner));
+ }
+
+ /**
+ * Returns the {@link Aggregator Aggregators} created by this {@code OldDoFn}.
+ */
+ Collection<Aggregator<?, ?>> getAggregators() {
+ return Collections.<Aggregator<?, ?>>unmodifiableCollection(aggregators.values());
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/5f8b8c5b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java
index 9808e56..2fe9226 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java
@@ -27,11 +27,10 @@ import java.util.List;
import java.util.Set;
import org.apache.beam.runners.core.DoFnRunners.OutputManager;
import org.apache.beam.runners.core.ExecutionContext.StepContext;
+import org.apache.beam.runners.core.OldDoFn.RequiresWindowAccess;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.Combine.CombineFn;
-import org.apache.beam.sdk.transforms.OldDoFn;
-import org.apache.beam.sdk.transforms.OldDoFn.RequiresWindowAccess;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
http://git-wip-us.apache.org/repos/asf/beam/blob/5f8b8c5b/runners/core-java/src/test/java/org/apache/beam/runners/core/DoFnDelegatingAggregatorTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/DoFnDelegatingAggregatorTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/DoFnDelegatingAggregatorTest.java
new file mode 100644
index 0000000..b44e8a4
--- /dev/null
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/DoFnDelegatingAggregatorTest.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+
+import org.apache.beam.sdk.transforms.Aggregator;
+import org.apache.beam.sdk.transforms.Combine.CombineFn;
+import org.apache.beam.sdk.transforms.DelegatingAggregator;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+/**
+ * Tests for {@link DelegatingAggregator}.
+ */
+@RunWith(JUnit4.class)
+public class DoFnDelegatingAggregatorTest {
+
+ @Mock
+ private Aggregator<Long, Long> delegate;
+
+ @Rule
+ public ExpectedException thrown = ExpectedException.none();
+
+ @Before
+ public void setup() {
+ MockitoAnnotations.initMocks(this);
+ }
+
+ @Test
+ public void testAddValueWithoutDelegateThrowsException() {
+ OldDoFn<Void, Void> doFn = new NoOpOldDoFn<>();
+
+ String name = "agg";
+ CombineFn<Double, ?, Double> combiner = mockCombineFn(Double.class);
+
+ DelegatingAggregator<Double, Double> aggregator =
+ (DelegatingAggregator<Double, Double>) doFn.createAggregator(name, combiner);
+
+ thrown.expect(IllegalStateException.class);
+ thrown.expectMessage("cannot be called");
+ thrown.expectMessage("DoFn");
+
+ aggregator.addValue(21.2);
+ }
+
+ @Test
+ public void testSetDelegateThenAddValueCallsDelegate() {
+ String name = "agg";
+ CombineFn<Long, ?, Long> combiner = mockCombineFn(Long.class);
+
+ OldDoFn<Void, Void> doFn = new NoOpOldDoFn<>();
+
+ DelegatingAggregator<Long, Long> aggregator =
+ (DelegatingAggregator<Long, Long>) doFn.createAggregator(name, combiner);
+
+ aggregator.setDelegate(delegate);
+
+ aggregator.addValue(12L);
+
+ verify(delegate).addValue(12L);
+ }
+
+ @Test
+ public void testSetDelegateWithExistingDelegateStartsDelegatingToSecond() {
+ String name = "agg";
+ CombineFn<Double, ?, Double> combiner = mockCombineFn(Double.class);
+
+ OldDoFn<Void, Void> doFn = new NoOpOldDoFn<>();
+
+ DelegatingAggregator<Double, Double> aggregator =
+ (DelegatingAggregator<Double, Double>) doFn.createAggregator(name, combiner);
+
+ @SuppressWarnings("unchecked")
+ Aggregator<Double, Double> secondDelegate =
+ mock(Aggregator.class, "secondDelegate");
+
+ aggregator.setDelegate(aggregator);
+ aggregator.setDelegate(secondDelegate);
+
+ aggregator.addValue(2.25);
+
+ verify(secondDelegate).addValue(2.25);
+ verify(delegate, never()).addValue(anyLong());
+ }
+
+ @Test
+ public void testGetNameReturnsName() {
+ String name = "agg";
+ CombineFn<Double, ?, Double> combiner = mockCombineFn(Double.class);
+
+ OldDoFn<Void, Void> doFn = new NoOpOldDoFn<>();
+
+ DelegatingAggregator<Double, Double> aggregator =
+ (DelegatingAggregator<Double, Double>) doFn.createAggregator(name, combiner);
+
+ assertEquals(name, aggregator.getName());
+ }
+
+ @Test
+ public void testGetCombineFnReturnsCombineFn() {
+ String name = "agg";
+ CombineFn<Double, ?, Double> combiner = mockCombineFn(Double.class);
+
+ OldDoFn<Void, Void> doFn = new NoOpOldDoFn<>();
+
+ DelegatingAggregator<Double, Double> aggregator =
+ (DelegatingAggregator<Double, Double>) doFn.createAggregator(name, combiner);
+
+ assertEquals(combiner, aggregator.getCombineFn());
+ }
+
+ @SuppressWarnings("unchecked")
+ private static <T> CombineFn<T, ?, T> mockCombineFn(
+ @SuppressWarnings("unused") Class<T> clazz) {
+ return mock(CombineFn.class);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/5f8b8c5b/runners/core-java/src/test/java/org/apache/beam/runners/core/NoOpOldDoFn.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/NoOpOldDoFn.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/NoOpOldDoFn.java
new file mode 100644
index 0000000..5cbea8c
--- /dev/null
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/NoOpOldDoFn.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.Aggregator;
+import org.apache.beam.sdk.transforms.Combine.CombineFn;
+import org.apache.beam.sdk.values.TupleTag;
+import org.joda.time.Instant;
+
+/**
+ * A {@link OldDoFn} that does nothing with provided elements. Used for testing
+ * methods provided by the {@link OldDoFn} abstract class.
+ *
+ * @param <InputT> unused.
+ * @param <OutputT> unused.
+ */
+class NoOpOldDoFn<InputT, OutputT> extends OldDoFn<InputT, OutputT> {
+ @Override
+ public void processElement(OldDoFn<InputT, OutputT>.ProcessContext c) throws Exception {
+ }
+
+ /**
+ * Returns a new NoOp Context.
+ */
+ public OldDoFn<InputT, OutputT>.Context context() {
+ return new NoOpDoFnContext();
+ }
+
+ /**
+ * A {@link OldDoFn.Context} that does nothing and returns exclusively null.
+ */
+ private class NoOpDoFnContext extends OldDoFn<InputT, OutputT>.Context {
+ @Override
+ public PipelineOptions getPipelineOptions() {
+ return null;
+ }
+ @Override
+ public void output(OutputT output) {
+ }
+ @Override
+ public void outputWithTimestamp(OutputT output, Instant timestamp) {
+ }
+ @Override
+ public <T> void sideOutput(TupleTag<T> tag, T output) {
+ }
+ @Override
+ public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output,
+ Instant timestamp) {
+ }
+ @Override
+ public <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT>
+ createAggregatorInternal(String name, CombineFn<AggInputT, ?, AggOutputT> combiner) {
+ return null;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/5f8b8c5b/runners/core-java/src/test/java/org/apache/beam/runners/core/OldDoFnContextTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/OldDoFnContextTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/OldDoFnContextTest.java
new file mode 100644
index 0000000..a1cd49d
--- /dev/null
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/OldDoFnContextTest.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import org.apache.beam.sdk.transforms.Aggregator;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.Sum;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+/**
+ * Tests for {@link OldDoFn.Context}.
+ */
+@RunWith(JUnit4.class)
+public class OldDoFnContextTest {
+
+ @Mock
+ private Aggregator<Long, Long> agg;
+
+ private OldDoFn<Object, Object> fn;
+ private OldDoFn<Object, Object>.Context context;
+
+ @Before
+ public void setup() {
+ MockitoAnnotations.initMocks(this);
+
+ // Need to be real objects to call the constructor, and to reference the
+ // outer instance of OldDoFn
+ NoOpOldDoFn<Object, Object> noOpFn = new NoOpOldDoFn<>();
+ OldDoFn<Object, Object>.Context noOpContext = noOpFn.context();
+
+ fn = spy(noOpFn);
+ context = spy(noOpContext);
+ }
+
+ @Test
+ public void testSetupDelegateAggregatorsCreatesAndLinksDelegateAggregators() {
+ Combine.BinaryCombineLongFn combiner = Sum.ofLongs();
+ Aggregator<Long, Long> delegateAggregator =
+ fn.createAggregator("test", combiner);
+
+ when(context.createAggregatorInternal("test", combiner)).thenReturn(agg);
+
+ context.setupDelegateAggregators();
+ delegateAggregator.addValue(1L);
+
+ verify(agg).addValue(1L);
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/5f8b8c5b/runners/core-java/src/test/java/org/apache/beam/runners/core/OldDoFnTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/OldDoFnTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/OldDoFnTest.java
new file mode 100644
index 0000000..651bc72
--- /dev/null
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/OldDoFnTest.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.isA;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertThat;
+
+import java.io.Serializable;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.Aggregator;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.Combine.CombineFn;
+import org.apache.beam.sdk.transforms.Max;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.Sum;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.values.TupleTag;
+import org.joda.time.Instant;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for OldDoFn.
+ */
+@RunWith(JUnit4.class)
+public class OldDoFnTest implements Serializable {
+
+ @Rule
+ public transient ExpectedException thrown = ExpectedException.none();
+
+ @Test
+ public void testCreateAggregatorWithCombinerSucceeds() {
+ String name = "testAggregator";
+ Combine.BinaryCombineLongFn combiner = Sum.ofLongs();
+
+ OldDoFn<Void, Void> doFn = new NoOpOldDoFn<>();
+
+ Aggregator<Long, Long> aggregator = doFn.createAggregator(name, combiner);
+
+ assertEquals(name, aggregator.getName());
+ assertEquals(combiner, aggregator.getCombineFn());
+ }
+
+ @Test
+ public void testCreateAggregatorWithNullNameThrowsException() {
+ thrown.expect(NullPointerException.class);
+ thrown.expectMessage("name cannot be null");
+
+ OldDoFn<Void, Void> doFn = new NoOpOldDoFn<>();
+
+ doFn.createAggregator(null, Sum.ofLongs());
+ }
+
+ @Test
+ public void testCreateAggregatorWithNullCombineFnThrowsException() {
+ CombineFn<Object, Object, Object> combiner = null;
+
+ thrown.expect(NullPointerException.class);
+ thrown.expectMessage("combiner cannot be null");
+
+ OldDoFn<Void, Void> doFn = new NoOpOldDoFn<>();
+
+ doFn.createAggregator("testAggregator", combiner);
+ }
+
+ @Test
+ public void testCreateAggregatorWithNullSerializableFnThrowsException() {
+ SerializableFunction<Iterable<Object>, Object> combiner = null;
+
+ thrown.expect(NullPointerException.class);
+ thrown.expectMessage("combiner cannot be null");
+
+ OldDoFn<Void, Void> doFn = new NoOpOldDoFn<>();
+
+ doFn.createAggregator("testAggregator", combiner);
+ }
+
+ @Test
+ public void testCreateAggregatorWithSameNameThrowsException() {
+ String name = "testAggregator";
+ CombineFn<Double, ?, Double> combiner = Max.ofDoubles();
+
+ OldDoFn<Void, Void> doFn = new NoOpOldDoFn<>();
+
+ doFn.createAggregator(name, combiner);
+
+ thrown.expect(IllegalArgumentException.class);
+ thrown.expectMessage("Cannot create");
+ thrown.expectMessage(name);
+ thrown.expectMessage("already exists");
+
+ doFn.createAggregator(name, combiner);
+ }
+
+ @Test
+ public void testCreateAggregatorsWithDifferentNamesSucceeds() {
+ String nameOne = "testAggregator";
+ String nameTwo = "aggregatorPrime";
+ CombineFn<Double, ?, Double> combiner = Max.ofDoubles();
+
+ OldDoFn<Void, Void> doFn = new NoOpOldDoFn<>();
+
+ Aggregator<Double, Double> aggregatorOne =
+ doFn.createAggregator(nameOne, combiner);
+ Aggregator<Double, Double> aggregatorTwo =
+ doFn.createAggregator(nameTwo, combiner);
+
+ assertNotEquals(aggregatorOne, aggregatorTwo);
+ }
+
+ @Test
+ public void testCreateAggregatorThrowsWhenAggregatorsAreFinal() throws Exception {
+ OldDoFn<String, String> fn = new OldDoFn<String, String>() {
+ @Override
+ public void processElement(ProcessContext c) throws Exception { }
+ };
+ OldDoFn<String, String>.Context context = createContext(fn);
+ context.setupDelegateAggregators();
+
+ thrown.expect(isA(IllegalStateException.class));
+ fn.createAggregator("anyAggregate", Max.ofIntegers());
+ }
+
+ private OldDoFn<String, String>.Context createContext(OldDoFn<String, String> fn) {
+ return fn.new Context() {
+ @Override
+ public PipelineOptions getPipelineOptions() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void output(String output) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void outputWithTimestamp(String output, Instant timestamp) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public <T> void sideOutput(TupleTag<T> tag, T output) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public <AggInputT, AggOutputT>
+ Aggregator<AggInputT, AggOutputT> createAggregatorInternal(
+ String name, CombineFn<AggInputT, ?, AggOutputT> combiner) {
+ throw new UnsupportedOperationException();
+ }
+ };
+ }
+
+ @Test
+ public void testPopulateDisplayDataDefaultBehavior() {
+ OldDoFn<String, String> usesDefault =
+ new OldDoFn<String, String>() {
+ @Override
+ public void processElement(ProcessContext c) throws Exception {}
+ };
+
+ DisplayData data = DisplayData.from(usesDefault);
+ assertThat(data.items(), empty());
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/5f8b8c5b/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleOldDoFnRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleOldDoFnRunnerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleOldDoFnRunnerTest.java
index 4610069..97da9ee 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleOldDoFnRunnerTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleOldDoFnRunnerTest.java
@@ -23,7 +23,7 @@ import static org.mockito.Mockito.mock;
import java.util.Arrays;
import java.util.List;
import org.apache.beam.runners.core.BaseExecutionContext.StepContext;
-import org.apache.beam.sdk.transforms.OldDoFn;
+
import org.apache.beam.sdk.util.SystemDoFnInternal;
import org.apache.beam.sdk.util.UserCodeException;
import org.apache.beam.sdk.util.WindowedValue;
http://git-wip-us.apache.org/repos/asf/beam/blob/5f8b8c5b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/OldPerKeyCombineFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/OldPerKeyCombineFnRunner.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/OldPerKeyCombineFnRunner.java
index 5d676dc..71c3aa4 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/OldPerKeyCombineFnRunner.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/OldPerKeyCombineFnRunner.java
@@ -18,8 +18,8 @@
package org.apache.beam.runners.flink;
import java.io.Serializable;
+import org.apache.beam.runners.core.OldDoFn;
import org.apache.beam.sdk.transforms.CombineFnBase.PerKeyCombineFn;
-import org.apache.beam.sdk.transforms.OldDoFn;
/**
* An interface that runs a {@link PerKeyCombineFn} with unified APIs using
http://git-wip-us.apache.org/repos/asf/beam/blob/5f8b8c5b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/OldPerKeyCombineFnRunners.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/OldPerKeyCombineFnRunners.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/OldPerKeyCombineFnRunners.java
index 8ebeadf..90894f2 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/OldPerKeyCombineFnRunners.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/OldPerKeyCombineFnRunners.java
@@ -17,13 +17,13 @@
*/
package org.apache.beam.runners.flink;
+import org.apache.beam.runners.core.OldDoFn;
import org.apache.beam.runners.core.PerKeyCombineFnRunner;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn;
import org.apache.beam.sdk.transforms.CombineFnBase.PerKeyCombineFn;
import org.apache.beam.sdk.transforms.CombineWithContext;
import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext;
-import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.values.PCollectionView;
/**
http://git-wip-us.apache.org/repos/asf/beam/blob/5f8b8c5b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java
index 2a4a68e..8b2bcc6 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java
@@ -19,10 +19,10 @@ package org.apache.beam.runners.flink.translation.functions;
import java.util.Map;
import org.apache.beam.runners.core.DoFnAdapters;
+import org.apache.beam.runners.core.OldDoFn;
import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.WindowingStrategy;
http://git-wip-us.apache.org/repos/asf/beam/blob/5f8b8c5b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingNonShuffleReduceFunction.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingNonShuffleReduceFunction.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingNonShuffleReduceFunction.java
index 1b43172..5ec6a77 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingNonShuffleReduceFunction.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingNonShuffleReduceFunction.java
@@ -24,12 +24,12 @@ import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import org.apache.beam.runners.core.OldDoFn;
import org.apache.beam.runners.flink.OldPerKeyCombineFnRunner;
import org.apache.beam.runners.flink.OldPerKeyCombineFnRunners;
import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.CombineFnBase;
-import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
http://git-wip-us.apache.org/repos/asf/beam/blob/5f8b8c5b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputDoFnFunction.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputDoFnFunction.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputDoFnFunction.java
index a97bd46..aeeabbf 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputDoFnFunction.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputDoFnFunction.java
@@ -19,10 +19,10 @@ package org.apache.beam.runners.flink.translation.functions;
import java.util.Map;
import org.apache.beam.runners.core.DoFnAdapters;
+import org.apache.beam.runners.core.OldDoFn;
import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.join.RawUnionValue;
import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
import org.apache.beam.sdk.util.WindowedValue;
http://git-wip-us.apache.org/repos/asf/beam/blob/5f8b8c5b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputProcessContext.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputProcessContext.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputProcessContext.java
index a3d2b18..7882b5f 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputProcessContext.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputProcessContext.java
@@ -19,8 +19,8 @@ package org.apache.beam.runners.flink.translation.functions;
import java.util.Collection;
import java.util.Map;
+import org.apache.beam.runners.core.OldDoFn;
import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.join.RawUnionValue;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
http://git-wip-us.apache.org/repos/asf/beam/blob/5f8b8c5b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkNoElementAssignContext.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkNoElementAssignContext.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkNoElementAssignContext.java
index c890272..ad7255b 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkNoElementAssignContext.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkNoElementAssignContext.java
@@ -17,7 +17,7 @@
*/
package org.apache.beam.runners.flink.translation.functions;
-import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.runners.core.OldDoFn;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.joda.time.Instant;
http://git-wip-us.apache.org/repos/asf/beam/blob/5f8b8c5b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java
index f5a9087..7db30d1 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java
@@ -24,12 +24,12 @@ import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
import java.util.Map;
+import org.apache.beam.runners.core.OldDoFn;
import org.apache.beam.runners.flink.OldPerKeyCombineFnRunner;
import org.apache.beam.runners.flink.OldPerKeyCombineFnRunners;
import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.CombineFnBase;
-import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
http://git-wip-us.apache.org/repos/asf/beam/blob/5f8b8c5b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContextBase.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContextBase.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContextBase.java
index 53b9803..e955679 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContextBase.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContextBase.java
@@ -24,11 +24,11 @@ import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.Map;
+import org.apache.beam.runners.core.OldDoFn;
import org.apache.beam.runners.flink.translation.wrappers.SerializableFnAggregatorWrapper;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.Combine;
-import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.TimerInternals;
http://git-wip-us.apache.org/repos/asf/beam/blob/5f8b8c5b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java
index a3fa0d4..81e37f4 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java
@@ -26,12 +26,12 @@ import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import org.apache.beam.runners.core.OldDoFn;
import org.apache.beam.runners.flink.OldPerKeyCombineFnRunner;
import org.apache.beam.runners.flink.OldPerKeyCombineFnRunners;
import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.CombineFnBase;
-import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
http://git-wip-us.apache.org/repos/asf/beam/blob/5f8b8c5b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkSingleOutputProcessContext.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkSingleOutputProcessContext.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkSingleOutputProcessContext.java
index 529b1cc..0db7f5a 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkSingleOutputProcessContext.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkSingleOutputProcessContext.java
@@ -19,8 +19,8 @@ package org.apache.beam.runners.flink.translation.functions;
import java.util.Collection;
import java.util.Map;
+import org.apache.beam.runners.core.OldDoFn;
import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.WindowedValue;
http://git-wip-us.apache.org/repos/asf/beam/blob/5f8b8c5b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
index 90cdf4c..ac85b3c 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
@@ -33,6 +33,7 @@ import org.apache.beam.runners.core.DoFnAdapters;
import org.apache.beam.runners.core.DoFnRunner;
import org.apache.beam.runners.core.DoFnRunners;
import org.apache.beam.runners.core.ExecutionContext;
+import org.apache.beam.runners.core.OldDoFn;
import org.apache.beam.runners.core.PushbackSideInputDoFnRunner;
import org.apache.beam.runners.core.SideInputHandler;
import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions;
@@ -43,7 +44,6 @@ import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.join.RawUnionValue;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.CoderUtils;
http://git-wip-us.apache.org/repos/asf/beam/blob/5f8b8c5b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
index cd6b5aa..d4273b2 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
@@ -42,12 +42,12 @@ import org.apache.beam.runners.core.ExecutionContext;
import org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetDoFn;
import org.apache.beam.runners.core.KeyedWorkItem;
import org.apache.beam.runners.core.KeyedWorkItems;
+import org.apache.beam.runners.core.OldDoFn;
import org.apache.beam.runners.core.SystemReduceFn;
import org.apache.beam.runners.flink.translation.wrappers.DataInputViewWrapper;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.TimeDomain;
import org.apache.beam.sdk.util.TimerInternals;
http://git-wip-us.apache.org/repos/asf/beam/blob/5f8b8c5b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DoFnInfo.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DoFnInfo.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DoFnInfo.java
index 0c5be90..4d80a39 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DoFnInfo.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DoFnInfo.java
@@ -21,7 +21,6 @@ import java.io.Serializable;
import java.util.Map;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.util.WindowingStrategy;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
@@ -41,8 +40,7 @@ public class DoFnInfo<InputT, OutputT> implements Serializable {
private final Map<Long, TupleTag<?>> outputMap;
/**
- * Creates a {@link DoFnInfo} for the given {@link Serializable} object, which is expected to be a
- * {@link DoFn} or {@link OldDoFn} or other context-appropriate UDF blob.
+ * Creates a {@link DoFnInfo} for the given {@link DoFn}.
*/
public static <InputT, OutputT> DoFnInfo<InputT, OutputT> forFn(
DoFn<InputT, OutputT> doFn,
http://git-wip-us.apache.org/repos/asf/beam/blob/5f8b8c5b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DelegatingAggregator.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DelegatingAggregator.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DelegatingAggregator.java
index e03d3b1..cfaf0a6 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DelegatingAggregator.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DelegatingAggregator.java
@@ -35,7 +35,7 @@ import org.apache.beam.sdk.transforms.Combine.CombineFn;
* @param <AggInputT> the type of input element
* @param <AggOutputT> the type of output element
*/
-class DelegatingAggregator<AggInputT, AggOutputT>
+public class DelegatingAggregator<AggInputT, AggOutputT>
implements Aggregator<AggInputT, AggOutputT>, Serializable {
private static final AtomicInteger ID_GEN = new AtomicInteger();
private final int id;
[3/4] beam git commit: Point to new Dataflow worker
Posted by ke...@apache.org.
Point to new Dataflow worker
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/a2b94eca
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/a2b94eca
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/a2b94eca
Branch: refs/heads/master
Commit: a2b94ecabd80ef68654ba3bc8ffa0a0c3759316b
Parents: 5f8b8c5
Author: Eugene Kirpichov <ki...@google.com>
Authored: Fri Jan 20 13:31:30 2017 -0800
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Fri Jan 20 13:32:19 2017 -0800
----------------------------------------------------------------------
.../org/apache/beam/runners/dataflow/dataflow.properties | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/a2b94eca/runners/google-cloud-dataflow-java/src/main/resources/org/apache/beam/runners/dataflow/dataflow.properties
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/resources/org/apache/beam/runners/dataflow/dataflow.properties b/runners/google-cloud-dataflow-java/src/main/resources/org/apache/beam/runners/dataflow/dataflow.properties
index 161a897..9976ed9 100644
--- a/runners/google-cloud-dataflow-java/src/main/resources/org/apache/beam/runners/dataflow/dataflow.properties
+++ b/runners/google-cloud-dataflow-java/src/main/resources/org/apache/beam/runners/dataflow/dataflow.properties
@@ -18,6 +18,6 @@
environment.major.version=6
-worker.image.batch=dataflow.gcr.io/v1beta3/beam-java-batch:beam-master-20170119
+worker.image.batch=dataflow.gcr.io/v1beta3/beam-java-batch:beam-master-20170120
-worker.image.streaming=dataflow.gcr.io/v1beta3/beam-java-streaming:beam-master-20170119
+worker.image.streaming=dataflow.gcr.io/v1beta3/beam-java-streaming:beam-master-20170120
[4/4] beam git commit: This closes #1636: Move OldDoFn to runners-core
Posted by ke...@apache.org.
This closes #1636: Move OldDoFn to runners-core
Point to new Dataflow worker
Moves OldDoFn to runners-core
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/a6810372
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/a6810372
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/a6810372
Branch: refs/heads/master
Commit: a6810372b003adf24bdbe34ed764a63841af9b99
Parents: 77c7505 a2b94ec
Author: Kenneth Knowles <kl...@google.com>
Authored: Fri Jan 20 14:22:38 2017 -0800
Committer: Kenneth Knowles <kl...@google.com>
Committed: Fri Jan 20 14:22:38 2017 -0800
----------------------------------------------------------------------
.../apex/translation/WindowBoundTranslator.java | 2 +-
.../operators/ApexGroupByKeyOperator.java | 2 +-
.../operators/ApexParDoOperator.java | 2 +-
.../beam/runners/core/AssignWindowsDoFn.java | 3 +-
.../apache/beam/runners/core/DoFnAdapters.java | 1 -
.../apache/beam/runners/core/DoFnRunner.java | 1 -
.../apache/beam/runners/core/DoFnRunners.java | 1 -
.../core/GroupAlsoByWindowViaWindowSetDoFn.java | 1 -
.../runners/core/GroupAlsoByWindowsDoFn.java | 1 -
.../core/LateDataDroppingDoFnRunner.java | 1 -
.../org/apache/beam/runners/core/OldDoFn.java | 472 +++++++++++++++++++
.../beam/runners/core/SimpleOldDoFnRunner.java | 3 +-
.../core/DoFnDelegatingAggregatorTest.java | 144 ++++++
.../apache/beam/runners/core/NoOpOldDoFn.java | 72 +++
.../beam/runners/core/OldDoFnContextTest.java | 72 +++
.../apache/beam/runners/core/OldDoFnTest.java | 192 ++++++++
.../runners/core/SimpleOldDoFnRunnerTest.java | 2 +-
.../runners/flink/OldPerKeyCombineFnRunner.java | 2 +-
.../flink/OldPerKeyCombineFnRunners.java | 2 +-
.../functions/FlinkDoFnFunction.java | 2 +-
.../FlinkMergingNonShuffleReduceFunction.java | 2 +-
.../functions/FlinkMultiOutputDoFnFunction.java | 2 +-
.../FlinkMultiOutputProcessContext.java | 2 +-
.../functions/FlinkNoElementAssignContext.java | 2 +-
.../functions/FlinkPartialReduceFunction.java | 2 +-
.../functions/FlinkProcessContextBase.java | 2 +-
.../functions/FlinkReduceFunction.java | 2 +-
.../FlinkSingleOutputProcessContext.java | 2 +-
.../wrappers/streaming/DoFnOperator.java | 2 +-
.../wrappers/streaming/WindowDoFnOperator.java | 2 +-
.../beam/runners/dataflow/util/DoFnInfo.java | 4 +-
.../beam/runners/dataflow/dataflow.properties | 4 +-
.../sdk/transforms/DelegatingAggregator.java | 2 +-
.../org/apache/beam/sdk/transforms/OldDoFn.java | 465 ------------------
.../org/apache/beam/sdk/util/NameUtils.java | 2 +-
.../DoFnDelegatingAggregatorTest.java | 142 ------
.../apache/beam/sdk/transforms/NoOpOldDoFn.java | 71 ---
.../beam/sdk/transforms/OldDoFnContextTest.java | 69 ---
.../apache/beam/sdk/transforms/OldDoFnTest.java | 187 --------
.../org/apache/beam/sdk/util/NameUtilsTest.java | 20 +-
40 files changed, 984 insertions(+), 980 deletions(-)
----------------------------------------------------------------------