You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/06/08 19:00:45 UTC
[1/3] beam git commit: DirectRunner override matchers using Runner API
Repository: beam
Updated Branches:
refs/heads/master b53e6f0dc -> 4aef93854
DirectRunner override matchers using Runner API
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d8d90878
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d8d90878
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d8d90878
Branch: refs/heads/master
Commit: d8d9087877c01f1786271726a541fb3eeda7f939
Parents: ca7b9c2
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu May 25 06:31:16 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Jun 8 11:36:28 2017 -0700
----------------------------------------------------------------------
.../beam/runners/direct/DirectRunner.java | 20 +++++++++-----------
1 file changed, 9 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/d8d90878/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
index dbd1ec4..136ccf3 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
@@ -30,6 +30,7 @@ import java.util.Map;
import java.util.Set;
import org.apache.beam.runners.core.SplittableParDoViaKeyedWorkItems;
import org.apache.beam.runners.core.construction.PTransformMatchers;
+import org.apache.beam.runners.core.construction.PTransformTranslation;
import org.apache.beam.runners.core.construction.SplittableParDo;
import org.apache.beam.runners.direct.DirectRunner.DirectPipelineResult;
import org.apache.beam.runners.direct.TestStreamEvaluatorFactory.DirectTestStreamFactory;
@@ -42,12 +43,9 @@ import org.apache.beam.sdk.metrics.MetricResults;
import org.apache.beam.sdk.metrics.MetricsEnvironment;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.runners.PTransformOverride;
-import org.apache.beam.sdk.testing.TestStream;
-import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.ParDo.MultiOutput;
-import org.apache.beam.sdk.transforms.View.CreatePCollectionView;
import org.apache.beam.sdk.util.UserCodeException;
import org.apache.beam.sdk.values.PCollection;
import org.joda.time.Duration;
@@ -230,33 +228,33 @@ public class DirectRunner extends PipelineRunner<DirectPipelineResult> {
new WriteWithShardingFactory())) /* Uses a view internally. */
.add(
PTransformOverride.of(
- PTransformMatchers.classEqualTo(CreatePCollectionView.class),
+ PTransformMatchers.urnEqualTo(PTransformTranslation.CREATE_VIEW_TRANSFORM_URN),
new ViewOverrideFactory())) /* Uses pardos and GBKs */
.add(
PTransformOverride.of(
- PTransformMatchers.classEqualTo(TestStream.class),
+ PTransformMatchers.urnEqualTo(PTransformTranslation.TEST_STREAM_TRANSFORM_URN),
new DirectTestStreamFactory(this))) /* primitive */
// SplittableParMultiDo is implemented in terms of nonsplittable simple ParDos and extra
// primitives
.add(
PTransformOverride.of(
- PTransformMatchers.splittableParDoMulti(), new ParDoMultiOverrideFactory()))
+ PTransformMatchers.splittableParDo(), new ParDoMultiOverrideFactory()))
// state and timer pardos are implemented in terms of simple ParDos and extra primitives
.add(
PTransformOverride.of(
- PTransformMatchers.stateOrTimerParDoMulti(), new ParDoMultiOverrideFactory()))
+ PTransformMatchers.stateOrTimerParDo(), new ParDoMultiOverrideFactory()))
.add(
PTransformOverride.of(
- PTransformMatchers.classEqualTo(SplittableParDo.ProcessKeyedElements.class),
+ PTransformMatchers.urnEqualTo(
+ SplittableParDo.SPLITTABLE_PROCESS_KEYED_ELEMENTS_URN),
new SplittableParDoViaKeyedWorkItems.OverrideFactory()))
.add(
PTransformOverride.of(
- PTransformMatchers.classEqualTo(
- SplittableParDoViaKeyedWorkItems.GBKIntoKeyedWorkItems.class),
+ PTransformMatchers.urnEqualTo(SplittableParDo.SPLITTABLE_GBKIKWI_URN),
new DirectGBKIntoKeyedWorkItemsOverrideFactory())) /* Returns a GBKO */
.add(
PTransformOverride.of(
- PTransformMatchers.classEqualTo(GroupByKey.class),
+ PTransformMatchers.urnEqualTo(PTransformTranslation.GROUP_BY_KEY_TRANSFORM_URN),
new DirectGroupByKeyOverrideFactory())) /* returns two chained primitives. */
.build();
}
[3/3] beam git commit: This closes #3239: [BEAM-2371] Port
DirectRunner override matchers to language-agnostic construction APIs
Posted by ke...@apache.org.
This closes #3239: [BEAM-2371] Port DirectRunner override matchers to language-agnostic construction APIs
DirectRunner override matchers using Runner API
Add Runner API oriented PTransformMatchers for DirectRunner overrides
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/4aef9385
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/4aef9385
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/4aef9385
Branch: refs/heads/master
Commit: 4aef9385477f0d13bdaa345cb5a629b2fd00b867
Parents: b53e6f0 d8d9087
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu Jun 8 11:37:15 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Jun 8 11:37:15 2017 -0700
----------------------------------------------------------------------
.../core/construction/PTransformMatchers.java | 94 +++++++++++++++++++-
.../construction/PTransformTranslation.java | 7 +-
.../construction/PTransformMatchersTest.java | 32 +++++++
.../beam/runners/direct/DirectRunner.java | 20 ++---
4 files changed, 137 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
[2/3] beam git commit: Add Runner API oriented PTransformMatchers for
DirectRunner overrides
Posted by ke...@apache.org.
Add Runner API oriented PTransformMatchers for DirectRunner overrides
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/ca7b9c28
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/ca7b9c28
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/ca7b9c28
Branch: refs/heads/master
Commit: ca7b9c288151d318898ab000b91d26fcf62046ca
Parents: b53e6f0
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu May 25 06:29:09 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Jun 8 11:36:28 2017 -0700
----------------------------------------------------------------------
.../core/construction/PTransformMatchers.java | 94 +++++++++++++++++++-
.../construction/PTransformTranslation.java | 7 +-
.../construction/PTransformMatchersTest.java | 32 +++++++
3 files changed, 128 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/ca7b9c28/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformMatchers.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformMatchers.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformMatchers.java
index bfe24a0..c339891 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformMatchers.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformMatchers.java
@@ -18,6 +18,7 @@
package org.apache.beam.runners.core.construction;
import com.google.common.base.MoreObjects;
+import java.io.IOException;
import java.util.HashSet;
import java.util.Set;
import org.apache.beam.sdk.annotations.Experimental;
@@ -50,6 +51,34 @@ public class PTransformMatchers {
private PTransformMatchers() {}
/**
+ * Returns a {@link PTransformMatcher} that matches a {@link PTransform} if the URN of the
+ * {@link PTransform} is equal to the URN provided ot this matcher.
+ */
+ public static PTransformMatcher urnEqualTo(String urn) {
+ return new EqualUrnPTransformMatcher(urn);
+ }
+
+ private static class EqualUrnPTransformMatcher implements PTransformMatcher {
+ private final String urn;
+
+ private EqualUrnPTransformMatcher(String urn) {
+ this.urn = urn;
+ }
+
+ @Override
+ public boolean matches(AppliedPTransform<?, ?, ?> application) {
+ return urn.equals(PTransformTranslation.urnForTransformOrNull(application.getTransform()));
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(this)
+ .add("urn", urn)
+ .toString();
+ }
+ }
+
+ /**
* Returns a {@link PTransformMatcher} that matches a {@link PTransform} if the class of the
* {@link PTransform} is equal to the {@link Class} provided ot this matcher.
*/
@@ -151,6 +180,68 @@ public class PTransformMatchers {
}
/**
+ * A {@link PTransformMatcher} that matches a {@link ParDo} by URN if it has a splittable {@link
+ * DoFn}.
+ */
+ public static PTransformMatcher splittableParDo() {
+ return new PTransformMatcher() {
+ @Override
+ public boolean matches(AppliedPTransform<?, ?, ?> application) {
+ if (PTransformTranslation.PAR_DO_TRANSFORM_URN.equals(
+ PTransformTranslation.urnForTransformOrNull(application.getTransform()))) {
+
+ try {
+ return ParDoTranslation.isSplittable(application);
+ } catch (IOException e) {
+ throw new RuntimeException(
+ String.format(
+ "Transform with URN %s could not be translated",
+ PTransformTranslation.PAR_DO_TRANSFORM_URN),
+ e);
+ }
+ }
+ return false;
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper("SplittableParDoMultiMatcher").toString();
+ }
+ };
+ }
+
+ /**
+ * A {@link PTransformMatcher} that matches a {@link ParDo} transform by URN
+ * and whether it contains state or timers as specified by {@link ParDoTranslation}.
+ */
+ public static PTransformMatcher stateOrTimerParDo() {
+ return new PTransformMatcher() {
+ @Override
+ public boolean matches(AppliedPTransform<?, ?, ?> application) {
+ if (PTransformTranslation.PAR_DO_TRANSFORM_URN.equals(
+ PTransformTranslation.urnForTransformOrNull(application.getTransform()))) {
+
+ try {
+ return ParDoTranslation.usesStateOrTimers(application);
+ } catch (IOException e) {
+ throw new RuntimeException(
+ String.format(
+ "Transform with URN %s could not be translated",
+ PTransformTranslation.PAR_DO_TRANSFORM_URN),
+ e);
+ }
+ }
+ return false;
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper("StateOrTimerParDoMatcher").toString();
+ }
+ };
+ }
+
+ /**
* A {@link PTransformMatcher} that matches a {@link ParDo.MultiOutput} containing a {@link DoFn}
* that uses state or timers, as specified by {@link DoFnSignature#usesState()} and
* {@link DoFnSignature#usesTimers()}.
@@ -268,7 +359,8 @@ public class PTransformMatchers {
return new PTransformMatcher() {
@Override
public boolean matches(AppliedPTransform<?, ?, ?> application) {
- if (application.getTransform() instanceof WriteFiles) {
+ if (PTransformTranslation.WRITE_FILES_TRANSFORM_URN.equals(
+ PTransformTranslation.urnForTransformOrNull(application.getTransform()))) {
WriteFiles write = (WriteFiles) application.getTransform();
return write.getSharding() == null && write.getNumShards() == null;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/ca7b9c28/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java
index 32ecf43..bae7b05 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java
@@ -179,13 +179,12 @@ public class PTransformTranslation {
* Returns the URN for the transform if it is known, otherwise throws.
*/
public static String urnForTransform(PTransform<?, ?> transform) {
- TransformPayloadTranslator translator = KNOWN_PAYLOAD_TRANSLATORS.get(transform.getClass());
- if (translator == null) {
+ String urn = urnForTransformOrNull(transform);
+ if (urn == null) {
throw new IllegalStateException(
String.format("No translator known for %s", transform.getClass().getName()));
}
-
- return translator.getUrn(transform);
+ return urn;
}
/**
http://git-wip-us.apache.org/repos/asf/beam/blob/ca7b9c28/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
index 2497598..6459849 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
@@ -27,6 +27,8 @@ import com.google.common.base.MoreObjects;
import com.google.common.collect.ImmutableMap;
import java.io.Serializable;
import java.util.Collections;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.io.DefaultFilenamePolicy;
@@ -95,9 +97,14 @@ public class PTransformMatchersTest implements Serializable {
PCollection<KV<String, Integer>> input =
PCollection.createPrimitiveOutputInternal(
p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED);
+ input.setName("dummy input");
+ input.setCoder(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()));
+
PCollection<Integer> output =
PCollection.createPrimitiveOutputInternal(
p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED);
+ output.setName("dummy output");
+ output.setCoder(VarIntCoder.of());
return AppliedPTransform.of("pardo", input.expand(), output.expand(), pardo, p);
}
@@ -272,6 +279,18 @@ public class PTransformMatchersTest implements Serializable {
}
@Test
+ public void parDoSplittable() {
+ AppliedPTransform<?, ?, ?> parDoApplication =
+ getAppliedTransform(
+ ParDo.of(splittableDoFn).withOutputTags(new TupleTag<Integer>(), TupleTagList.empty()));
+ assertThat(PTransformMatchers.splittableParDo().matches(parDoApplication), is(true));
+
+ assertThat(PTransformMatchers.stateOrTimerParDoMulti().matches(parDoApplication), is(false));
+ assertThat(PTransformMatchers.splittableParDoSingle().matches(parDoApplication), is(false));
+ assertThat(PTransformMatchers.stateOrTimerParDoSingle().matches(parDoApplication), is(false));
+ }
+
+ @Test
public void parDoMultiWithState() {
AppliedPTransform<?, ?, ?> parDoApplication =
getAppliedTransform(
@@ -284,6 +303,19 @@ public class PTransformMatchersTest implements Serializable {
}
@Test
+ public void parDoWithState() {
+ AppliedPTransform<?, ?, ?> statefulApplication =
+ getAppliedTransform(
+ ParDo.of(doFnWithState).withOutputTags(new TupleTag<Integer>(), TupleTagList.empty()));
+ assertThat(PTransformMatchers.stateOrTimerParDo().matches(statefulApplication), is(true));
+
+ AppliedPTransform<?, ?, ?> splittableApplication =
+ getAppliedTransform(
+ ParDo.of(splittableDoFn).withOutputTags(new TupleTag<Integer>(), TupleTagList.empty()));
+ assertThat(PTransformMatchers.stateOrTimerParDo().matches(splittableApplication), is(false));
+ }
+
+ @Test
public void parDoMultiWithTimers() {
AppliedPTransform<?, ?, ?> parDoApplication =
getAppliedTransform(