You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/12/15 22:29:09 UTC
[09/10] incubator-beam git commit: Renames ParDo.getNewFn to getFn
Renames ParDo.getNewFn to getFn
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/6b502fc1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/6b502fc1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/6b502fc1
Branch: refs/heads/master
Commit: 6b502fc111af266c7b1a0e6f7d473c36f57281a2
Parents: 33ed323
Author: Eugene Kirpichov <ki...@google.com>
Authored: Fri Dec 9 17:29:41 2016 -0800
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Thu Dec 15 13:59:11 2016 -0800
----------------------------------------------------------------------
.../translation/ParDoBoundMultiTranslator.java | 2 +-
.../apex/translation/ParDoBoundTranslator.java | 2 +-
.../beam/runners/core/SplittableParDo.java | 4 ++--
.../runners/direct/ParDoEvaluatorFactory.java | 2 +-
.../direct/ParDoMultiOverrideFactory.java | 2 +-
.../ParDoSingleViaMultiOverrideFactory.java | 4 ++--
.../direct/StatefulParDoEvaluatorFactory.java | 4 ++--
.../FlinkBatchTransformTranslators.java | 4 ++--
.../FlinkStreamingTransformTranslators.java | 12 +++++------
.../dataflow/DataflowPipelineTranslator.java | 8 +++----
.../spark/translation/TransformTranslator.java | 4 ++--
.../streaming/StreamingTransformTranslator.java | 4 ++--
.../beam/sdk/AggregatorPipelineExtractor.java | 4 ++--
.../org/apache/beam/sdk/transforms/ParDo.java | 22 ++++++++++----------
.../sdk/AggregatorPipelineExtractorTest.java | 12 +++++------
15 files changed, 45 insertions(+), 45 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6b502fc1/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoBoundMultiTranslator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoBoundMultiTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoBoundMultiTranslator.java
index 574ce8f..bff7652 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoBoundMultiTranslator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoBoundMultiTranslator.java
@@ -53,7 +53,7 @@ class ParDoBoundMultiTranslator<InputT, OutputT>
@Override
public void translate(ParDo.BoundMulti<InputT, OutputT> transform, TranslationContext context) {
- DoFn<InputT, OutputT> doFn = transform.getNewFn();
+ DoFn<InputT, OutputT> doFn = transform.getFn();
DoFnSignature signature = DoFnSignatures.getSignature(doFn.getClass());
if (signature.stateDeclarations().size() > 0) {
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6b502fc1/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoBoundTranslator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoBoundTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoBoundTranslator.java
index de78628..3b6eb6e 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoBoundTranslator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoBoundTranslator.java
@@ -40,7 +40,7 @@ class ParDoBoundTranslator<InputT, OutputT>
@Override
public void translate(ParDo.Bound<InputT, OutputT> transform, TranslationContext context) {
- DoFn<InputT, OutputT> doFn = transform.getNewFn();
+ DoFn<InputT, OutputT> doFn = transform.getFn();
DoFnSignature signature = DoFnSignatures.getSignature(doFn.getClass());
if (signature.stateDeclarations().size() > 0) {
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6b502fc1/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
index 720db63..f8d12ec 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
@@ -104,7 +104,7 @@ public class SplittableParDo<InputT, OutputT, RestrictionT>
checkNotNull(parDo, "parDo must not be null");
this.parDo = parDo;
checkArgument(
- DoFnSignatures.getSignature(parDo.getNewFn().getClass()).processElement().isSplittable(),
+ DoFnSignatures.getSignature(parDo.getFn().getClass()).processElement().isSplittable(),
"fn must be a splittable DoFn");
}
@@ -114,7 +114,7 @@ public class SplittableParDo<InputT, OutputT, RestrictionT>
}
private PCollectionTuple applyTyped(PCollection<InputT> input) {
- DoFn<InputT, OutputT> fn = parDo.getNewFn();
+ DoFn<InputT, OutputT> fn = parDo.getFn();
Coder<RestrictionT> restrictionCoder =
DoFnInvokers.invokerFor(fn)
.invokeGetRestrictionCoder(input.getPipeline().getCoderRegistry());
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6b502fc1/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java
index ec5dc2c..b4684e3 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java
@@ -65,7 +65,7 @@ final class ParDoEvaluatorFactory<InputT, OutputT> implements TransformEvaluator
application;
ParDo.BoundMulti<InputT, OutputT> transform = parDoApplication.getTransform();
- final DoFn<InputT, OutputT> doFn = transform.getNewFn();
+ final DoFn<InputT, OutputT> doFn = transform.getFn();
@SuppressWarnings({"unchecked", "rawtypes"})
TransformEvaluator<T> evaluator =
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6b502fc1/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
index 4e7914f..4401434 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
@@ -46,7 +46,7 @@ class ParDoMultiOverrideFactory<InputT, OutputT>
public PTransform<PCollection<? extends InputT>, PCollectionTuple> override(
ParDo.BoundMulti<InputT, OutputT> transform) {
- DoFn<InputT, OutputT> fn = transform.getNewFn();
+ DoFn<InputT, OutputT> fn = transform.getFn();
DoFnSignature signature = DoFnSignatures.getSignature(fn.getClass());
if (signature.processElement().isSplittable()) {
return new SplittableParDo(transform);
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6b502fc1/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleViaMultiOverrideFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleViaMultiOverrideFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleViaMultiOverrideFactory.java
index 10530bb..5fcf49c 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleViaMultiOverrideFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleViaMultiOverrideFactory.java
@@ -56,12 +56,12 @@ class ParDoSingleViaMultiOverrideFactory<InputT, OutputT>
PCollectionTuple outputs =
input.apply(
- ParDo.of(underlyingParDo.getNewFn())
+ ParDo.of(underlyingParDo.getFn())
.withSideInputs(underlyingParDo.getSideInputs())
.withOutputTags(mainOutputTag, TupleTagList.empty()));
PCollection<OutputT> output = outputs.get(mainOutputTag);
- output.setTypeDescriptor(underlyingParDo.getNewFn().getOutputTypeDescriptor());
+ output.setTypeDescriptor(underlyingParDo.getFn().getOutputTypeDescriptor());
return output;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6b502fc1/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java
index 1f3286c..1f64d9a 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java
@@ -86,7 +86,7 @@ final class StatefulParDoEvaluatorFactory<K, InputT, OutputT> implements Transfo
throws Exception {
final DoFn<KV<K, InputT>, OutputT> doFn =
- application.getTransform().getUnderlyingParDo().getNewFn();
+ application.getTransform().getUnderlyingParDo().getFn();
final DoFnSignature signature = DoFnSignatures.getSignature(doFn.getClass());
// If the DoFn is stateful, schedule state clearing.
@@ -141,7 +141,7 @@ final class StatefulParDoEvaluatorFactory<K, InputT, OutputT> implements Transfo
WindowingStrategy<?, ?> windowingStrategy = pc.getWindowingStrategy();
BoundedWindow window = transformOutputWindow.getWindow();
final DoFn<?, ?> doFn =
- transformOutputWindow.getTransform().getTransform().getUnderlyingParDo().getNewFn();
+ transformOutputWindow.getTransform().getTransform().getUnderlyingParDo().getFn();
final DoFnSignature signature = DoFnSignatures.getSignature(doFn.getClass());
final DirectStepContext stepContext =
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6b502fc1/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java
index 497b293..eb625b2 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java
@@ -516,7 +516,7 @@ class FlinkBatchTransformTranslators {
ParDo.Bound<InputT, OutputT> transform,
FlinkBatchTranslationContext context) {
- DoFn<InputT, OutputT> doFn = transform.getNewFn();
+ DoFn<InputT, OutputT> doFn = transform.getFn();
rejectStateAndTimers(doFn);
DataSet<WindowedValue<InputT>> inputDataSet =
@@ -562,7 +562,7 @@ class FlinkBatchTransformTranslators {
public void translateNode(
ParDo.BoundMulti<InputT, OutputT> transform,
FlinkBatchTranslationContext context) {
- DoFn<InputT, OutputT> doFn = transform.getNewFn();
+ DoFn<InputT, OutputT> doFn = transform.getFn();
rejectStateAndTimers(doFn);
DataSet<WindowedValue<InputT>> inputDataSet =
context.getInputDataSet(context.getInput(transform));
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6b502fc1/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
index 42ef630..ffa6d16 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
@@ -338,7 +338,7 @@ public class FlinkStreamingTransformTranslators {
ParDo.Bound<InputT, OutputT> transform,
FlinkStreamingTranslationContext context) {
- DoFn<InputT, OutputT> doFn = transform.getNewFn();
+ DoFn<InputT, OutputT> doFn = transform.getFn();
rejectStateAndTimers(doFn);
WindowingStrategy<?, ?> windowingStrategy =
@@ -358,7 +358,7 @@ public class FlinkStreamingTransformTranslators {
if (sideInputs.isEmpty()) {
DoFnOperator<InputT, OutputT, WindowedValue<OutputT>> doFnOperator =
new DoFnOperator<>(
- transform.getNewFn(),
+ transform.getFn(),
inputTypeInfo,
new TupleTag<OutputT>("main output"),
Collections.<TupleTag<?>>emptyList(),
@@ -381,7 +381,7 @@ public class FlinkStreamingTransformTranslators {
DoFnOperator<InputT, OutputT, WindowedValue<OutputT>> doFnOperator =
new DoFnOperator<>(
- transform.getNewFn(),
+ transform.getFn(),
inputTypeInfo,
new TupleTag<OutputT>("main output"),
Collections.<TupleTag<?>>emptyList(),
@@ -490,7 +490,7 @@ public class FlinkStreamingTransformTranslators {
ParDo.BoundMulti<InputT, OutputT> transform,
FlinkStreamingTranslationContext context) {
- DoFn<InputT, OutputT> doFn = transform.getNewFn();
+ DoFn<InputT, OutputT> doFn = transform.getFn();
rejectStateAndTimers(doFn);
// we assume that the transformation does not change the windowing strategy.
@@ -515,7 +515,7 @@ public class FlinkStreamingTransformTranslators {
if (sideInputs.isEmpty()) {
DoFnOperator<InputT, OutputT, RawUnionValue> doFnOperator =
new DoFnOperator<>(
- transform.getNewFn(),
+ transform.getFn(),
inputTypeInfo,
transform.getMainOutputTag(),
transform.getSideOutputTags().getAll(),
@@ -542,7 +542,7 @@ public class FlinkStreamingTransformTranslators {
DoFnOperator<InputT, OutputT, RawUnionValue> doFnOperator =
new DoFnOperator<>(
- transform.getNewFn(),
+ transform.getFn(),
inputTypeInfo,
transform.getMainOutputTag(),
transform.getSideOutputTags().getAll(),
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6b502fc1/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
index a56690c..8d2b076 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
@@ -955,14 +955,14 @@ public class DataflowPipelineTranslator {
private <InputT, OutputT> void translateMultiHelper(
ParDo.BoundMulti<InputT, OutputT> transform,
TranslationContext context) {
- rejectStatefulDoFn(transform.getNewFn());
+ rejectStatefulDoFn(transform.getFn());
context.addStep(transform, "ParallelDo");
translateInputs(context.getInput(transform), transform.getSideInputs(), context);
BiMap<Long, TupleTag<?>> outputMap =
translateOutputs(context.getOutput(transform), context);
translateFn(
- transform.getNewFn(),
+ transform.getFn(),
context.getInput(transform).getWindowingStrategy(),
transform.getSideInputs(),
context.getInput(transform).getCoder(),
@@ -985,13 +985,13 @@ public class DataflowPipelineTranslator {
private <InputT, OutputT> void translateSingleHelper(
ParDo.Bound<InputT, OutputT> transform,
TranslationContext context) {
- rejectStatefulDoFn(transform.getNewFn());
+ rejectStatefulDoFn(transform.getFn());
context.addStep(transform, "ParallelDo");
translateInputs(context.getInput(transform), transform.getSideInputs(), context);
long mainOutput = context.addOutput(context.getOutput(transform));
translateFn(
- transform.getNewFn(),
+ transform.getFn(),
context.getInput(transform).getWindowingStrategy(),
transform.getSideInputs(),
context.getInput(transform).getCoder(),
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6b502fc1/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
index ac91892..5dd6beb 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
@@ -227,7 +227,7 @@ public final class TransformTranslator {
return new TransformEvaluator<ParDo.Bound<InputT, OutputT>>() {
@Override
public void evaluate(ParDo.Bound<InputT, OutputT> transform, EvaluationContext context) {
- DoFn<InputT, OutputT> doFn = transform.getNewFn();
+ DoFn<InputT, OutputT> doFn = transform.getFn();
rejectStateAndTimers(doFn);
@SuppressWarnings("unchecked")
JavaRDD<WindowedValue<InputT>> inRDD =
@@ -250,7 +250,7 @@ public final class TransformTranslator {
return new TransformEvaluator<ParDo.BoundMulti<InputT, OutputT>>() {
@Override
public void evaluate(ParDo.BoundMulti<InputT, OutputT> transform, EvaluationContext context) {
- DoFn<InputT, OutputT> doFn = transform.getNewFn();
+ DoFn<InputT, OutputT> doFn = transform.getFn();
rejectStateAndTimers(doFn);
@SuppressWarnings("unchecked")
JavaRDD<WindowedValue<InputT>> inRDD =
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6b502fc1/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
index 27204ed..070ccbb 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
@@ -344,7 +344,7 @@ final class StreamingTransformTranslator {
@Override
public void evaluate(final ParDo.Bound<InputT, OutputT> transform,
final EvaluationContext context) {
- final DoFn<InputT, OutputT> doFn = transform.getNewFn();
+ final DoFn<InputT, OutputT> doFn = transform.getFn();
rejectStateAndTimers(doFn);
final SparkRuntimeContext runtimeContext = context.getRuntimeContext();
final Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, BroadcastHelper<?>>> sideInputs =
@@ -378,7 +378,7 @@ final class StreamingTransformTranslator {
@Override
public void evaluate(final ParDo.BoundMulti<InputT, OutputT> transform,
final EvaluationContext context) {
- final DoFn<InputT, OutputT> doFn = transform.getNewFn();
+ final DoFn<InputT, OutputT> doFn = transform.getFn();
rejectStateAndTimers(doFn);
final SparkRuntimeContext runtimeContext = context.getRuntimeContext();
final Map<TupleTag<?>, KV<WindowingStrategy<?, ?>, BroadcastHelper<?>>> sideInputs =
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6b502fc1/sdks/java/core/src/main/java/org/apache/beam/sdk/AggregatorPipelineExtractor.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/AggregatorPipelineExtractor.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/AggregatorPipelineExtractor.java
index ade5978..c79f779 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/AggregatorPipelineExtractor.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/AggregatorPipelineExtractor.java
@@ -70,10 +70,10 @@ class AggregatorPipelineExtractor {
private Collection<Aggregator<?, ?>> getAggregators(PTransform<?, ?> transform) {
if (transform != null) {
if (transform instanceof ParDo.Bound) {
- return AggregatorRetriever.getAggregators(((ParDo.Bound<?, ?>) transform).getNewFn());
+ return AggregatorRetriever.getAggregators(((ParDo.Bound<?, ?>) transform).getFn());
} else if (transform instanceof ParDo.BoundMulti) {
return AggregatorRetriever.getAggregators(
- ((ParDo.BoundMulti<?, ?>) transform).getNewFn());
+ ((ParDo.BoundMulti<?, ?>) transform).getFn());
}
}
return Collections.emptyList();
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6b502fc1/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
index d2149c0..f897f82 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
@@ -745,7 +745,7 @@ public class ParDo {
@Override
public PCollection<OutputT> expand(PCollection<? extends InputT> input) {
checkArgument(
- !isSplittable(getNewFn()),
+ !isSplittable(getFn()),
"%s does not support Splittable DoFn",
input.getPipeline().getOptions().getRunner().getName());
validateWindowType(input, fn);
@@ -753,7 +753,7 @@ public class ParDo {
input.getPipeline(),
input.getWindowingStrategy(),
input.isBounded())
- .setTypeDescriptor(getNewFn().getOutputTypeDescriptor());
+ .setTypeDescriptor(getFn().getOutputTypeDescriptor());
}
@Override
@@ -761,14 +761,14 @@ public class ParDo {
protected Coder<OutputT> getDefaultOutputCoder(PCollection<? extends InputT> input)
throws CannotProvideCoderException {
return input.getPipeline().getCoderRegistry().getDefaultCoder(
- getNewFn().getOutputTypeDescriptor(),
- getNewFn().getInputTypeDescriptor(),
+ getFn().getOutputTypeDescriptor(),
+ getFn().getInputTypeDescriptor(),
((PCollection<InputT>) input).getCoder());
}
@Override
protected String getKindString() {
- Class<?> clazz = getNewFn().getClass();
+ Class<?> clazz = getFn().getClass();
if (clazz.isAnonymousClass()) {
return "AnonymousParDo";
} else {
@@ -789,7 +789,7 @@ public class ParDo {
ParDo.populateDisplayData(builder, (HasDisplayData) fn, fnDisplayData);
}
- public DoFn<InputT, OutputT> getNewFn() {
+ public DoFn<InputT, OutputT> getFn() {
return fn;
}
@@ -952,7 +952,7 @@ public class ParDo {
@Override
public PCollectionTuple expand(PCollection<? extends InputT> input) {
checkArgument(
- !isSplittable(getNewFn()),
+ !isSplittable(getFn()),
"%s does not support Splittable DoFn",
input.getPipeline().getOptions().getRunner().getName());
validateWindowType(input, fn);
@@ -965,7 +965,7 @@ public class ParDo {
// The fn will likely be an instance of an anonymous subclass
// such as DoFn<Integer, String> { }, thus will have a high-fidelity
// TypeDescriptor for the output type.
- outputs.get(mainOutputTag).setTypeDescriptor(getNewFn().getOutputTypeDescriptor());
+ outputs.get(mainOutputTag).setTypeDescriptor(getFn().getOutputTypeDescriptor());
return outputs;
}
@@ -984,13 +984,13 @@ public class ParDo {
Coder<InputT> inputCoder = ((PCollection<InputT>) input).getCoder();
return input.getPipeline().getCoderRegistry().getDefaultCoder(
output.getTypeDescriptor(),
- getNewFn().getInputTypeDescriptor(),
+ getFn().getInputTypeDescriptor(),
inputCoder);
}
@Override
protected String getKindString() {
- Class<?> clazz = getNewFn().getClass();
+ Class<?> clazz = getFn().getClass();
if (clazz.isAnonymousClass()) {
return "AnonymousParMultiDo";
} else {
@@ -1004,7 +1004,7 @@ public class ParDo {
ParDo.populateDisplayData(builder, fn, fnDisplayData);
}
- public DoFn<InputT, OutputT> getNewFn() {
+ public DoFn<InputT, OutputT> getFn() {
return fn;
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6b502fc1/sdks/java/core/src/test/java/org/apache/beam/sdk/AggregatorPipelineExtractorTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/AggregatorPipelineExtractorTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/AggregatorPipelineExtractorTest.java
index c4e9b8a..1bf2c3d 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/AggregatorPipelineExtractorTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/AggregatorPipelineExtractorTest.java
@@ -68,7 +68,7 @@ public class AggregatorPipelineExtractorTest {
@SuppressWarnings("rawtypes")
ParDo.Bound bound = mock(ParDo.Bound.class, "Bound");
AggregatorProvidingDoFn<ThreadGroup, StrictMath> fn = new AggregatorProvidingDoFn<>();
- when(bound.getNewFn()).thenReturn(fn);
+ when(bound.getFn()).thenReturn(fn);
Aggregator<Long, Long> aggregatorOne = fn.addAggregator(new Sum.SumLongFn());
Aggregator<Integer, Integer> aggregatorTwo = fn.addAggregator(new Min.MinIntegerFn());
@@ -96,7 +96,7 @@ public class AggregatorPipelineExtractorTest {
@SuppressWarnings("rawtypes")
ParDo.BoundMulti bound = mock(ParDo.BoundMulti.class, "BoundMulti");
AggregatorProvidingDoFn<Object, Void> fn = new AggregatorProvidingDoFn<>();
- when(bound.getNewFn()).thenReturn(fn);
+ when(bound.getFn()).thenReturn(fn);
Aggregator<Long, Long> aggregatorOne = fn.addAggregator(new Max.MaxLongFn());
Aggregator<Double, Double> aggregatorTwo = fn.addAggregator(new Min.MinDoubleFn());
@@ -126,8 +126,8 @@ public class AggregatorPipelineExtractorTest {
@SuppressWarnings("rawtypes")
ParDo.BoundMulti otherBound = mock(ParDo.BoundMulti.class, "otherBound");
AggregatorProvidingDoFn<String, Math> fn = new AggregatorProvidingDoFn<>();
- when(bound.getNewFn()).thenReturn(fn);
- when(otherBound.getNewFn()).thenReturn(fn);
+ when(bound.getFn()).thenReturn(fn);
+ when(otherBound.getFn()).thenReturn(fn);
Aggregator<Long, Long> aggregatorOne = fn.addAggregator(new Sum.SumLongFn());
Aggregator<Double, Double> aggregatorTwo = fn.addAggregator(new Min.MinDoubleFn());
@@ -162,7 +162,7 @@ public class AggregatorPipelineExtractorTest {
AggregatorProvidingDoFn<ThreadGroup, Void> fn = new AggregatorProvidingDoFn<>();
Aggregator<Long, Long> aggregatorOne = fn.addAggregator(new Sum.SumLongFn());
- when(bound.getNewFn()).thenReturn(fn);
+ when(bound.getFn()).thenReturn(fn);
@SuppressWarnings("rawtypes")
ParDo.BoundMulti otherBound = mock(ParDo.BoundMulti.class, "otherBound");
@@ -170,7 +170,7 @@ public class AggregatorPipelineExtractorTest {
AggregatorProvidingDoFn<Long, Long> otherFn = new AggregatorProvidingDoFn<>();
Aggregator<Double, Double> aggregatorTwo = otherFn.addAggregator(new Sum.SumDoubleFn());
- when(otherBound.getNewFn()).thenReturn(otherFn);
+ when(otherBound.getFn()).thenReturn(otherFn);
TransformHierarchy.Node transformNode = mock(TransformHierarchy.Node.class);
when(transformNode.getTransform()).thenReturn(bound);