You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tg...@apache.org on 2017/05/26 16:23:53 UTC
[2/4] beam git commit: Update Apex Overrides
Update Apex Overrides
Only override CreatePCollectionView transforms
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/8eb09aad
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/8eb09aad
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/8eb09aad
Branch: refs/heads/master
Commit: 8eb09aad9c975f787ba8afac83394cc8b56eb94f
Parents: bd1dfdf
Author: Thomas Groh <tg...@google.com>
Authored: Thu May 25 10:41:56 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Fri May 26 07:50:37 2017 -0700
----------------------------------------------------------------------
.../apache/beam/runners/apex/ApexRunner.java | 119 ++++---------------
1 file changed, 21 insertions(+), 98 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/8eb09aad/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
index f91d8e5..c595b3f 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
@@ -57,7 +57,6 @@ import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.runners.PTransformOverride;
import org.apache.beam.sdk.runners.PTransformOverrideFactory;
import org.apache.beam.sdk.transforms.Combine;
-import org.apache.beam.sdk.transforms.Combine.GloballyAsSingletonView;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
@@ -65,7 +64,7 @@ import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.ParDo.MultiOutput;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.transforms.View.AsIterable;
-import org.apache.beam.sdk.transforms.View.AsSingleton;
+import org.apache.beam.sdk.transforms.View.CreatePCollectionView;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.PCollectionView;
@@ -111,16 +110,12 @@ public class ApexRunner extends PipelineRunner<ApexRunnerResult> {
new PrimitiveCreate.Factory()))
.add(
PTransformOverride.of(
- PTransformMatchers.classEqualTo(View.AsSingleton.class),
- new StreamingViewAsSingleton.Factory()))
- .add(
- PTransformOverride.of(
- PTransformMatchers.classEqualTo(View.AsIterable.class),
+ PTransformMatchers.createViewWithViewFn(PCollectionViews.IterableViewFn.class),
new StreamingViewAsIterable.Factory()))
.add(
PTransformOverride.of(
- PTransformMatchers.classEqualTo(Combine.GloballyAsSingletonView.class),
- new StreamingCombineGloballyAsSingletonView.Factory()))
+ PTransformMatchers.createViewWithViewFn(PCollectionViews.SingletonViewFn.class),
+ new StreamingWrapSingletonInList.Factory()))
.add(
PTransformOverride.of(
PTransformMatchers.splittableParDoMulti(),
@@ -245,117 +240,45 @@ public class ApexRunner extends PipelineRunner<ApexRunnerResult> {
}
}
- private static class StreamingCombineGloballyAsSingletonView<InputT, OutputT>
- extends PTransform<PCollection<InputT>, PCollectionView<OutputT>> {
+ private static class StreamingWrapSingletonInList<T>
+ extends PTransform<PCollection<T>, PCollectionView<T>> {
private static final long serialVersionUID = 1L;
- Combine.GloballyAsSingletonView<InputT, OutputT> transform;
+ CreatePCollectionView<T, T> transform;
/**
* Builds an instance of this class from the overridden transform.
*/
- private StreamingCombineGloballyAsSingletonView(
- Combine.GloballyAsSingletonView<InputT, OutputT> transform) {
+ private StreamingWrapSingletonInList(
+ CreatePCollectionView<T, T> transform) {
this.transform = transform;
}
@Override
- public PCollectionView<OutputT> expand(PCollection<InputT> input) {
- PCollection<OutputT> combined = input
- .apply(Combine.globally(transform.getCombineFn())
- .withoutDefaults().withFanout(transform.getFanout()));
-
- PCollectionView<OutputT> view = PCollectionViews.singletonView(combined,
- combined.getWindowingStrategy(), transform.getInsertDefault(),
- transform.getInsertDefault() ? transform.getCombineFn().defaultValue() : null,
- combined.getCoder());
- return combined.apply(ParDo.of(new WrapAsList<OutputT>()))
- .apply(CreateApexPCollectionView.<OutputT, OutputT> of(view));
+ public PCollectionView<T> expand(PCollection<T> input) {
+ return input
+ .apply(ParDo.of(new WrapAsList<T>()))
+ .apply(CreateApexPCollectionView.<T, T>of(transform.getView()));
}
@Override
protected String getKindString() {
- return "StreamingCombineGloballyAsSingletonView";
+ return "StreamingWrapSingletonInList";
}
- static class Factory<InputT, OutputT>
+ static class Factory<T>
extends SingleInputOutputOverrideFactory<
- PCollection<InputT>, PCollectionView<OutputT>,
- Combine.GloballyAsSingletonView<InputT, OutputT>> {
+ PCollection<T>, PCollectionView<T>,
+ CreatePCollectionView<T, T>> {
@Override
- public PTransformReplacement<PCollection<InputT>, PCollectionView<OutputT>>
+ public PTransformReplacement<PCollection<T>, PCollectionView<T>>
getReplacementTransform(
AppliedPTransform<
- PCollection<InputT>, PCollectionView<OutputT>,
- GloballyAsSingletonView<InputT, OutputT>>
+ PCollection<T>, PCollectionView<T>,
+ CreatePCollectionView<T, T>>
transform) {
return PTransformReplacement.of(
PTransformReplacements.getSingletonMainInput(transform),
- new StreamingCombineGloballyAsSingletonView<>(transform.getTransform()));
- }
- }
- }
-
- private static class StreamingViewAsSingleton<T>
- extends PTransform<PCollection<T>, PCollectionView<T>> {
- private static final long serialVersionUID = 1L;
-
- private View.AsSingleton<T> transform;
-
- public StreamingViewAsSingleton(View.AsSingleton<T> transform) {
- this.transform = transform;
- }
-
- @Override
- public PCollectionView<T> expand(PCollection<T> input) {
- Combine.Globally<T, T> combine = Combine
- .globally(new SingletonCombine<>(transform.hasDefaultValue(), transform.defaultValue()));
- if (!transform.hasDefaultValue()) {
- combine = combine.withoutDefaults();
- }
- return input.apply(combine.asSingletonView());
- }
-
- @Override
- protected String getKindString() {
- return "StreamingViewAsSingleton";
- }
-
- private static class SingletonCombine<T> extends Combine.BinaryCombineFn<T> {
- private boolean hasDefaultValue;
- private T defaultValue;
-
- SingletonCombine(boolean hasDefaultValue, T defaultValue) {
- this.hasDefaultValue = hasDefaultValue;
- this.defaultValue = defaultValue;
- }
-
- @Override
- public T apply(T left, T right) {
- throw new IllegalArgumentException("PCollection with more than one element "
- + "accessed as a singleton view. Consider using Combine.globally().asSingleton() to "
- + "combine the PCollection into a single value");
- }
-
- @Override
- public T identity() {
- if (hasDefaultValue) {
- return defaultValue;
- } else {
- throw new IllegalArgumentException("Empty PCollection accessed as a singleton view. "
- + "Consider setting withDefault to provide a default value");
- }
- }
- }
-
- static class Factory<T>
- extends SingleInputOutputOverrideFactory<
- PCollection<T>, PCollectionView<T>, View.AsSingleton<T>> {
- @Override
- public PTransformReplacement<PCollection<T>, PCollectionView<T>> getReplacementTransform(
- AppliedPTransform<PCollection<T>, PCollectionView<T>, AsSingleton<T>> transform) {
- return PTransformReplacement.of(
- PTransformReplacements.getSingletonMainInput(transform),
- new StreamingViewAsSingleton<>(transform.getTransform()));
+ new StreamingWrapSingletonInList<>(transform.getTransform()));
}
}
}