You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/06/10 03:14:02 UTC
[3/5] beam git commit: Port ViewOverrideFactory to SDK-agnostic APIs
Port ViewOverrideFactory to SDK-agnostic APIs
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/8c5b57ea
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/8c5b57ea
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/8c5b57ea
Branch: refs/heads/master
Commit: 8c5b57ea8445cd50a35c6dffb460dcf0f426e700
Parents: b4c7716
Author: Kenneth Knowles <kl...@google.com>
Authored: Fri May 26 14:26:55 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Fri Jun 9 19:56:52 2017 -0700
----------------------------------------------------------------------
.../CreatePCollectionViewTranslation.java | 4 +-
.../runners/direct/ViewOverrideFactory.java | 48 ++++++++++++--------
.../direct/ViewEvaluatorFactoryTest.java | 3 +-
.../runners/direct/ViewOverrideFactoryTest.java | 23 ++++++++--
.../beam/sdk/values/PCollectionViews.java | 10 ++++
5 files changed, 62 insertions(+), 26 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/8c5b57ea/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CreatePCollectionViewTranslation.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CreatePCollectionViewTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CreatePCollectionViewTranslation.java
index aa24909..8fc99b9 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CreatePCollectionViewTranslation.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CreatePCollectionViewTranslation.java
@@ -56,8 +56,8 @@ public class CreatePCollectionViewTranslation {
@Deprecated
public static <ElemT, ViewT> PCollectionView<ViewT> getView(
AppliedPTransform<
- PCollection<ElemT>, PCollectionView<ViewT>,
- PTransform<PCollection<ElemT>, PCollectionView<ViewT>>>
+ PCollection<ElemT>, PCollection<ElemT>,
+ PTransform<PCollection<ElemT>, PCollection<ElemT>>>
application)
throws IOException {
http://git-wip-us.apache.org/repos/asf/beam/blob/8c5b57ea/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewOverrideFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewOverrideFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewOverrideFactory.java
index 06a7388..5dcf016 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewOverrideFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewOverrideFactory.java
@@ -18,8 +18,9 @@
package org.apache.beam.runners.direct;
+import java.io.IOException;
import java.util.Map;
-import org.apache.beam.runners.core.construction.ForwardingPTransform;
+import org.apache.beam.runners.core.construction.CreatePCollectionViewTranslation;
import org.apache.beam.runners.core.construction.PTransformReplacements;
import org.apache.beam.runners.core.construction.PTransformTranslation.RawPTransform;
import org.apache.beam.runners.core.construction.ReplacementOutputs;
@@ -43,16 +44,30 @@ import org.apache.beam.sdk.values.TupleTag;
*/
class ViewOverrideFactory<ElemT, ViewT>
implements PTransformOverrideFactory<
- PCollection<ElemT>, PCollection<ElemT>, CreatePCollectionView<ElemT, ViewT>> {
+ PCollection<ElemT>, PCollection<ElemT>,
+ PTransform<PCollection<ElemT>, PCollection<ElemT>>> {
@Override
public PTransformReplacement<PCollection<ElemT>, PCollection<ElemT>> getReplacementTransform(
AppliedPTransform<
- PCollection<ElemT>, PCollection<ElemT>, CreatePCollectionView<ElemT, ViewT>>
+ PCollection<ElemT>, PCollection<ElemT>,
+ PTransform<PCollection<ElemT>, PCollection<ElemT>>>
transform) {
- return PTransformReplacement.of(
+
+ PCollectionView<ViewT> view;
+ try {
+ view = CreatePCollectionViewTranslation.getView(transform);
+ } catch (IOException exc) {
+ throw new RuntimeException(
+ String.format(
+ "Could not extract %s from transform %s",
+ PCollectionView.class.getSimpleName(), transform),
+ exc);
+ }
+
+ return PTransformReplacement.of(
PTransformReplacements.getSingletonMainInput(transform),
- new GroupAndWriteView<>(transform.getTransform()));
+ new GroupAndWriteView<ElemT, ViewT>(view));
}
@Override
@@ -63,11 +78,11 @@ class ViewOverrideFactory<ElemT, ViewT>
/** The {@link DirectRunner} composite override for {@link CreatePCollectionView}. */
static class GroupAndWriteView<ElemT, ViewT>
- extends ForwardingPTransform<PCollection<ElemT>, PCollection<ElemT>> {
- private final CreatePCollectionView<ElemT, ViewT> og;
+ extends PTransform<PCollection<ElemT>, PCollection<ElemT>> {
+ private final PCollectionView<ViewT> view;
- private GroupAndWriteView(CreatePCollectionView<ElemT, ViewT> og) {
- this.og = og;
+ private GroupAndWriteView(PCollectionView<ViewT> view) {
+ this.view = view;
}
@Override
@@ -77,14 +92,9 @@ class ViewOverrideFactory<ElemT, ViewT>
.setCoder(KvCoder.of(VoidCoder.of(), input.getCoder()))
.apply(GroupByKey.<Void, ElemT>create())
.apply(Values.<Iterable<ElemT>>create())
- .apply(new WriteView<ElemT, ViewT>(og));
+ .apply(new WriteView<ElemT, ViewT>(view));
return input;
}
-
- @Override
- protected PTransform<PCollection<ElemT>, PCollection<ElemT>> delegate() {
- return og;
- }
}
/**
@@ -96,10 +106,10 @@ class ViewOverrideFactory<ElemT, ViewT>
*/
static final class WriteView<ElemT, ViewT>
extends RawPTransform<PCollection<Iterable<ElemT>>, PCollection<Iterable<ElemT>>> {
- private final CreatePCollectionView<ElemT, ViewT> og;
+ private final PCollectionView<ViewT> view;
- WriteView(CreatePCollectionView<ElemT, ViewT> og) {
- this.og = og;
+ WriteView(PCollectionView<ViewT> view) {
+ this.view = view;
}
@Override
@@ -112,7 +122,7 @@ class ViewOverrideFactory<ElemT, ViewT>
@SuppressWarnings("deprecation")
public PCollectionView<ViewT> getView() {
- return og.getView();
+ return view;
}
@Override
http://git-wip-us.apache.org/repos/asf/beam/blob/8c5b57ea/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewEvaluatorFactoryTest.java
index ad1aecc..5bc48b7 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewEvaluatorFactoryTest.java
@@ -66,7 +66,8 @@ public class ViewEvaluatorFactoryTest {
.apply(GroupByKey.<Void, String>create())
.apply(Values.<Iterable<String>>create());
PCollection<Iterable<String>> view =
- concat.apply(new ViewOverrideFactory.WriteView<>(createView));
+ concat.apply(
+ new ViewOverrideFactory.WriteView<String, Iterable<String>>(createView.getView()));
EvaluationContext context = mock(EvaluationContext.class);
TestViewWriter<String, Iterable<String>> viewWriter = new TestViewWriter<>();
http://git-wip-us.apache.org/repos/asf/beam/blob/8c5b57ea/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewOverrideFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewOverrideFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewOverrideFactoryTest.java
index 94728c7..6af9273 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewOverrideFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewOverrideFactoryTest.java
@@ -18,6 +18,7 @@
package org.apache.beam.runners.direct;
+import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertThat;
@@ -36,8 +37,11 @@ import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.View.CreatePCollectionView;
+import org.apache.beam.sdk.transforms.ViewFn;
+import org.apache.beam.sdk.transforms.windowing.WindowMappingFn;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.PCollectionViews;
@@ -67,7 +71,7 @@ public class ViewOverrideFactoryTest implements Serializable {
factory.getReplacementTransform(
AppliedPTransform
.<PCollection<Integer>, PCollection<Integer>,
- CreatePCollectionView<Integer, List<Integer>>>
+ PTransform<PCollection<Integer>, PCollection<Integer>>>
of(
"foo",
ints.expand(),
@@ -102,7 +106,7 @@ public class ViewOverrideFactoryTest implements Serializable {
factory.getReplacementTransform(
AppliedPTransform
.<PCollection<Integer>, PCollection<Integer>,
- CreatePCollectionView<Integer, List<Integer>>>
+ PTransform<PCollection<Integer>, PCollection<Integer>>>
of(
"foo",
ints.expand(),
@@ -120,8 +124,19 @@ public class ViewOverrideFactoryTest implements Serializable {
"There should only be one WriteView primitive in the graph",
writeViewVisited.getAndSet(true),
is(false));
- PCollectionView replacementView = ((WriteView) node.getTransform()).getView();
- assertThat(replacementView, Matchers.<PCollectionView>theInstance(view));
+ PCollectionView<?> replacementView = ((WriteView) node.getTransform()).getView();
+
+ // replacementView.getPCollection() is null, but that is not a requirement
+ // so not asserted one way or the other
+ assertThat(
+ replacementView.getTagInternal(),
+ equalTo(view.getTagInternal()));
+ assertThat(
+ replacementView.getViewFn(),
+ Matchers.<ViewFn<?, ?>>equalTo(view.getViewFn()));
+ assertThat(
+ replacementView.getWindowMappingFn(),
+ Matchers.<WindowMappingFn<?>>equalTo(view.getWindowMappingFn()));
assertThat(node.getInputs().entrySet(), hasSize(1));
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/8c5b57ea/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionViews.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionViews.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionViews.java
index 5e2e2c3..0c04370 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionViews.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionViews.java
@@ -282,6 +282,16 @@ public class PCollectionViews {
}
}));
}
+
+ @Override
+ public boolean equals(Object other) {
+ return other instanceof ListViewFn;
+ }
+
+ @Override
+ public int hashCode() {
+ return ListViewFn.class.hashCode();
+ }
}
/**