You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2016/12/08 17:11:14 UTC
[1/2] incubator-beam git commit: Remove misc occurrences of OldDoFn
Repository: incubator-beam
Updated Branches:
refs/heads/master 92ff63d3b -> 04a41ee54
Remove misc occurrences of OldDoFn
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/44e17d1c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/44e17d1c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/44e17d1c
Branch: refs/heads/master
Commit: 44e17d1c97babd487584cc78690505bdf57704b2
Parents: 92ff63d
Author: Kenneth Knowles <kl...@google.com>
Authored: Wed Dec 7 14:17:01 2016 -0800
Committer: Luke Cwik <lc...@google.com>
Committed: Thu Dec 8 09:03:59 2016 -0800
----------------------------------------------------------------------
.../beam/sdk/AggregatorPipelineExtractor.java | 5 ++--
.../sdk/transforms/AggregatorRetriever.java | 2 +-
.../org/apache/beam/sdk/transforms/Combine.java | 4 +--
.../apache/beam/sdk/util/ExecutionContext.java | 8 +++---
.../sdk/AggregatorPipelineExtractorTest.java | 20 +++++++-------
.../beam/sdk/transforms/DoFnTesterTest.java | 2 +-
.../beam/sdk/transforms/ParDoLifecycleTest.java | 28 ++++++++++----------
7 files changed, 35 insertions(+), 34 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/44e17d1c/sdks/java/core/src/main/java/org/apache/beam/sdk/AggregatorPipelineExtractor.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/AggregatorPipelineExtractor.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/AggregatorPipelineExtractor.java
index d2130d0..ade5978 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/AggregatorPipelineExtractor.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/AggregatorPipelineExtractor.java
@@ -70,9 +70,10 @@ class AggregatorPipelineExtractor {
private Collection<Aggregator<?, ?>> getAggregators(PTransform<?, ?> transform) {
if (transform != null) {
if (transform instanceof ParDo.Bound) {
- return AggregatorRetriever.getAggregators(((ParDo.Bound<?, ?>) transform).getFn());
+ return AggregatorRetriever.getAggregators(((ParDo.Bound<?, ?>) transform).getNewFn());
} else if (transform instanceof ParDo.BoundMulti) {
- return AggregatorRetriever.getAggregators(((ParDo.BoundMulti<?, ?>) transform).getFn());
+ return AggregatorRetriever.getAggregators(
+ ((ParDo.BoundMulti<?, ?>) transform).getNewFn());
}
}
return Collections.emptyList();
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/44e17d1c/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/AggregatorRetriever.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/AggregatorRetriever.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/AggregatorRetriever.java
index abed843..ce47e22 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/AggregatorRetriever.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/AggregatorRetriever.java
@@ -30,7 +30,7 @@ public final class AggregatorRetriever {
/**
* Returns the {@link Aggregator Aggregators} created by the provided {@link OldDoFn}.
*/
- public static Collection<Aggregator<?, ?>> getAggregators(OldDoFn<?, ?> fn) {
+ public static Collection<Aggregator<?, ?>> getAggregators(DoFn<?, ?> fn) {
return fn.getAggregators();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/44e17d1c/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
index be063e2..4127d94 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
@@ -2392,8 +2392,8 @@ public class Combine {
PCollection<? extends KV<K, ? extends Iterable<InputT>>> input) {
PCollection<KV<K, OutputT>> output = input.apply(ParDo.of(
- new OldDoFn<KV<K, ? extends Iterable<InputT>>, KV<K, OutputT>>() {
- @Override
+ new DoFn<KV<K, ? extends Iterable<InputT>>, KV<K, OutputT>>() {
+ @ProcessElement
public void processElement(final ProcessContext c) {
K key = c.element().getKey();
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/44e17d1c/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ExecutionContext.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ExecutionContext.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ExecutionContext.java
index f2a79bd..4429d76 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ExecutionContext.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ExecutionContext.java
@@ -41,14 +41,14 @@ public interface ExecutionContext {
/**
* Hook for subclasses to implement that will be called whenever
- * {@link org.apache.beam.sdk.transforms.OldDoFn.Context#output}
+ * {@link org.apache.beam.sdk.transforms.DoFn.Context#output}
* is called.
*/
void noteOutput(WindowedValue<?> output);
/**
* Hook for subclasses to implement that will be called whenever
- * {@link org.apache.beam.sdk.transforms.OldDoFn.Context#sideOutput}
+ * {@link org.apache.beam.sdk.transforms.DoFn.Context#sideOutput}
* is called.
*/
void noteSideOutput(TupleTag<?> tag, WindowedValue<?> output);
@@ -70,14 +70,14 @@ public interface ExecutionContext {
/**
* Hook for subclasses to implement that will be called whenever
- * {@link org.apache.beam.sdk.transforms.OldDoFn.Context#output}
+ * {@link org.apache.beam.sdk.transforms.DoFn.Context#output}
* is called.
*/
void noteOutput(WindowedValue<?> output);
/**
* Hook for subclasses to implement that will be called whenever
- * {@link org.apache.beam.sdk.transforms.OldDoFn.Context#sideOutput}
+ * {@link org.apache.beam.sdk.transforms.DoFn.Context#sideOutput}
* is called.
*/
void noteSideOutput(TupleTag<?> tag, WindowedValue<?> output);
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/44e17d1c/sdks/java/core/src/test/java/org/apache/beam/sdk/AggregatorPipelineExtractorTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/AggregatorPipelineExtractorTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/AggregatorPipelineExtractorTest.java
index b4de768..c4e9b8a 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/AggregatorPipelineExtractorTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/AggregatorPipelineExtractorTest.java
@@ -33,9 +33,9 @@ import org.apache.beam.sdk.Pipeline.PipelineVisitor;
import org.apache.beam.sdk.runners.TransformHierarchy;
import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.Combine.CombineFn;
+import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Max;
import org.apache.beam.sdk.transforms.Min;
-import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Sum;
@@ -68,7 +68,7 @@ public class AggregatorPipelineExtractorTest {
@SuppressWarnings("rawtypes")
ParDo.Bound bound = mock(ParDo.Bound.class, "Bound");
AggregatorProvidingDoFn<ThreadGroup, StrictMath> fn = new AggregatorProvidingDoFn<>();
- when(bound.getFn()).thenReturn(fn);
+ when(bound.getNewFn()).thenReturn(fn);
Aggregator<Long, Long> aggregatorOne = fn.addAggregator(new Sum.SumLongFn());
Aggregator<Integer, Integer> aggregatorTwo = fn.addAggregator(new Min.MinIntegerFn());
@@ -96,7 +96,7 @@ public class AggregatorPipelineExtractorTest {
@SuppressWarnings("rawtypes")
ParDo.BoundMulti bound = mock(ParDo.BoundMulti.class, "BoundMulti");
AggregatorProvidingDoFn<Object, Void> fn = new AggregatorProvidingDoFn<>();
- when(bound.getFn()).thenReturn(fn);
+ when(bound.getNewFn()).thenReturn(fn);
Aggregator<Long, Long> aggregatorOne = fn.addAggregator(new Max.MaxLongFn());
Aggregator<Double, Double> aggregatorTwo = fn.addAggregator(new Min.MinDoubleFn());
@@ -126,8 +126,8 @@ public class AggregatorPipelineExtractorTest {
@SuppressWarnings("rawtypes")
ParDo.BoundMulti otherBound = mock(ParDo.BoundMulti.class, "otherBound");
AggregatorProvidingDoFn<String, Math> fn = new AggregatorProvidingDoFn<>();
- when(bound.getFn()).thenReturn(fn);
- when(otherBound.getFn()).thenReturn(fn);
+ when(bound.getNewFn()).thenReturn(fn);
+ when(otherBound.getNewFn()).thenReturn(fn);
Aggregator<Long, Long> aggregatorOne = fn.addAggregator(new Sum.SumLongFn());
Aggregator<Double, Double> aggregatorTwo = fn.addAggregator(new Min.MinDoubleFn());
@@ -162,7 +162,7 @@ public class AggregatorPipelineExtractorTest {
AggregatorProvidingDoFn<ThreadGroup, Void> fn = new AggregatorProvidingDoFn<>();
Aggregator<Long, Long> aggregatorOne = fn.addAggregator(new Sum.SumLongFn());
- when(bound.getFn()).thenReturn(fn);
+ when(bound.getNewFn()).thenReturn(fn);
@SuppressWarnings("rawtypes")
ParDo.BoundMulti otherBound = mock(ParDo.BoundMulti.class, "otherBound");
@@ -170,7 +170,7 @@ public class AggregatorPipelineExtractorTest {
AggregatorProvidingDoFn<Long, Long> otherFn = new AggregatorProvidingDoFn<>();
Aggregator<Double, Double> aggregatorTwo = otherFn.addAggregator(new Sum.SumDoubleFn());
- when(otherBound.getFn()).thenReturn(otherFn);
+ when(otherBound.getNewFn()).thenReturn(otherFn);
TransformHierarchy.Node transformNode = mock(TransformHierarchy.Node.class);
when(transformNode.getTransform()).thenReturn(bound);
@@ -208,7 +208,7 @@ public class AggregatorPipelineExtractorTest {
}
}
- private static class AggregatorProvidingDoFn<InT, OuT> extends OldDoFn<InT, OuT> {
+ private static class AggregatorProvidingDoFn<InT, OuT> extends DoFn<InT, OuT> {
public <InputT, OutT> Aggregator<InputT, OutT> addAggregator(
CombineFn<InputT, ?, OutT> combiner) {
return createAggregator(randomName(), combiner);
@@ -218,8 +218,8 @@ public class AggregatorPipelineExtractorTest {
return UUID.randomUUID().toString();
}
- @Override
- public void processElement(OldDoFn<InT, OuT>.ProcessContext c) throws Exception {
+ @ProcessElement
+ public void processElement(DoFn<InT, OuT>.ProcessContext c) throws Exception {
fail();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/44e17d1c/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java
index b47465e..2dafa27 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java
@@ -399,7 +399,7 @@ public class DoFnTesterTest {
/**
* A {@link DoFn} that adds values to an aggregator and converts input to String in
- * {@link OldDoFn#processElement}.
+ * {@link DoFn.ProcessElement @ProcessElement}.
*/
private static class CounterDoFn extends DoFn<Long, String> {
Aggregator<Long, Long> agg = createAggregator("ctr", new Sum.SumLongFn());
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/44e17d1c/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoLifecycleTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoLifecycleTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoLifecycleTest.java
index f69c867..9bc8a64 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoLifecycleTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoLifecycleTest.java
@@ -50,7 +50,7 @@ public class ParDoLifecycleTest implements Serializable {
PCollectionList.of(p.apply("Impolite", Create.of(1, 2, 4)))
.and(p.apply("Polite", Create.of(3, 5, 6, 7)))
.apply(Flatten.<Integer>pCollections())
- .apply(ParDo.of(new CallSequenceEnforcingOldFn<Integer>()));
+ .apply(ParDo.of(new CallSequenceEnforcingDoFn<Integer>()));
p.run();
}
@@ -62,19 +62,19 @@ public class ParDoLifecycleTest implements Serializable {
PCollectionList.of(p.apply("Impolite", Create.of(1, 2, 4)))
.and(p.apply("Polite", Create.of(3, 5, 6, 7)))
.apply(Flatten.<Integer>pCollections())
- .apply(ParDo.of(new CallSequenceEnforcingOldFn<Integer>())
+ .apply(ParDo.of(new CallSequenceEnforcingDoFn<Integer>())
.withOutputTags(new TupleTag<Integer>() {}, TupleTagList.empty()));
p.run();
}
- private static class CallSequenceEnforcingOldFn<T> extends OldDoFn<T, T> {
+ private static class CallSequenceEnforcingDoFn<T> extends DoFn<T, T> {
private boolean setupCalled = false;
private int startBundleCalls = 0;
private int finishBundleCalls = 0;
private boolean teardownCalled = false;
- @Override
+ @Setup
public void setup() {
assertThat("setup should not be called twice", setupCalled, is(false));
assertThat("setup should be called before startBundle", startBundleCalls, equalTo(0));
@@ -83,7 +83,7 @@ public class ParDoLifecycleTest implements Serializable {
setupCalled = true;
}
- @Override
+ @StartBundle
public void startBundle(Context c) {
assertThat("setup should have been called", setupCalled, is(true));
assertThat(
@@ -94,7 +94,7 @@ public class ParDoLifecycleTest implements Serializable {
startBundleCalls++;
}
- @Override
+ @ProcessElement
public void processElement(ProcessContext c) throws Exception {
assertThat("startBundle should have been called", startBundleCalls, greaterThan(0));
assertThat(
@@ -104,7 +104,7 @@ public class ParDoLifecycleTest implements Serializable {
assertThat("teardown should not have been called", teardownCalled, is(false));
}
- @Override
+ @FinishBundle
public void finishBundle(Context c) {
assertThat("startBundle should have been called", startBundleCalls, greaterThan(0));
assertThat(
@@ -115,7 +115,7 @@ public class ParDoLifecycleTest implements Serializable {
finishBundleCalls++;
}
- @Override
+ @Teardown
public void teardown() {
assertThat(setupCalled, is(true));
assertThat(startBundleCalls, anyOf(equalTo(finishBundleCalls)));
@@ -345,7 +345,7 @@ public class ParDoLifecycleTest implements Serializable {
}
}
- private static class ExceptionThrowingOldFn extends OldDoFn<Object, Object> {
+ private static class ExceptionThrowingOldFn extends DoFn<Object, Object> {
static AtomicBoolean teardownCalled = new AtomicBoolean(false);
private final MethodForException toThrow;
@@ -355,22 +355,22 @@ public class ParDoLifecycleTest implements Serializable {
this.toThrow = toThrow;
}
- @Override
+ @Setup
public void setup() throws Exception {
throwIfNecessary(MethodForException.SETUP);
}
- @Override
+ @StartBundle
public void startBundle(Context c) throws Exception {
throwIfNecessary(MethodForException.START_BUNDLE);
}
- @Override
+ @ProcessElement
public void processElement(ProcessContext c) throws Exception {
throwIfNecessary(MethodForException.PROCESS_ELEMENT);
}
- @Override
+ @FinishBundle
public void finishBundle(Context c) throws Exception {
throwIfNecessary(MethodForException.FINISH_BUNDLE);
}
@@ -382,7 +382,7 @@ public class ParDoLifecycleTest implements Serializable {
}
}
- @Override
+ @Teardown
public void teardown() {
if (!thrown) {
fail("Excepted to have a processing method throw an exception");
[2/2] incubator-beam git commit: [BEAM-498] Remove misc occurrences
of OldDoFn
Posted by lc...@apache.org.
[BEAM-498] Remove misc occurrences of OldDoFn
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/04a41ee5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/04a41ee5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/04a41ee5
Branch: refs/heads/master
Commit: 04a41ee54ced13e7d896b7d31ffa99a3273af2dd
Parents: 92ff63d 44e17d1
Author: Luke Cwik <lc...@google.com>
Authored: Thu Dec 8 09:04:27 2016 -0800
Committer: Luke Cwik <lc...@google.com>
Committed: Thu Dec 8 09:04:27 2016 -0800
----------------------------------------------------------------------
.../beam/sdk/AggregatorPipelineExtractor.java | 5 ++--
.../sdk/transforms/AggregatorRetriever.java | 2 +-
.../org/apache/beam/sdk/transforms/Combine.java | 4 +--
.../apache/beam/sdk/util/ExecutionContext.java | 8 +++---
.../sdk/AggregatorPipelineExtractorTest.java | 20 +++++++-------
.../beam/sdk/transforms/DoFnTesterTest.java | 2 +-
.../beam/sdk/transforms/ParDoLifecycleTest.java | 28 ++++++++++----------
7 files changed, 35 insertions(+), 34 deletions(-)
----------------------------------------------------------------------