You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by da...@apache.org on 2017/06/08 01:35:04 UTC
[15/50] beam git commit: Fix RawPTransform translation
Fix RawPTransform translation
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/840492d9
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/840492d9
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/840492d9
Branch: refs/heads/DSL_SQL
Commit: 840492d9d8fb3b08cfe70a525655759fc1a31fdf
Parents: 7c608c3
Author: Kenneth Knowles <kl...@google.com>
Authored: Fri May 26 14:18:03 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Tue Jun 6 13:10:33 2017 -0700
----------------------------------------------------------------------
.../construction/PTransformTranslation.java | 57 ++++++++++++++++----
runners/direct-java/pom.xml | 5 --
.../beam/runners/direct/DirectGroupByKey.java | 5 +-
.../direct/ParDoMultiOverrideFactory.java | 3 +-
.../direct/TestStreamEvaluatorFactory.java | 3 +-
.../direct/TransformEvaluatorRegistry.java | 8 +--
.../runners/direct/ViewOverrideFactory.java | 3 +-
7 files changed, 56 insertions(+), 28 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/840492d9/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java
index 7c5c593..32ecf43 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java
@@ -22,7 +22,6 @@ import static com.google.common.base.Preconditions.checkArgument;
import com.google.common.collect.ImmutableMap;
import com.google.protobuf.Any;
-import com.google.protobuf.Message;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
@@ -115,7 +114,20 @@ public class PTransformTranslation {
// TODO: Display Data
PTransform<?, ?> transform = appliedPTransform.getTransform();
- if (KNOWN_PAYLOAD_TRANSLATORS.containsKey(transform.getClass())) {
+ // A RawPTransform directly vends its payload. Because it will generally be
+ // a subclass, we cannot do dictionary lookup in KNOWN_PAYLOAD_TRANSLATORS.
+ if (transform instanceof RawPTransform) {
+ RawPTransform<?, ?> rawPTransform = (RawPTransform<?, ?>) transform;
+
+ if (rawPTransform.getUrn() != null) {
+ FunctionSpec.Builder payload = FunctionSpec.newBuilder().setUrn(rawPTransform.getUrn());
+ @Nullable Any parameter = rawPTransform.getPayload();
+ if (parameter != null) {
+ payload.setParameter(parameter);
+ }
+ transformBuilder.setSpec(payload);
+ }
+ } else if (KNOWN_PAYLOAD_TRANSLATORS.containsKey(transform.getClass())) {
FunctionSpec payload =
KNOWN_PAYLOAD_TRANSLATORS
.get(transform.getClass())
@@ -145,6 +157,25 @@ public class PTransformTranslation {
}
/**
+ * Returns the URN for the transform if it is known, otherwise {@code null}.
+ */
+ @Nullable
+ public static String urnForTransformOrNull(PTransform<?, ?> transform) {
+
+ // A RawPTransform directly vends its URN. Because it will generally be
+ // a subclass, we cannot do dictionary lookup in KNOWN_PAYLOAD_TRANSLATORS.
+ if (transform instanceof RawPTransform) {
+ return ((RawPTransform) transform).getUrn();
+ }
+
+ TransformPayloadTranslator translator = KNOWN_PAYLOAD_TRANSLATORS.get(transform.getClass());
+ if (translator == null) {
+ return null;
+ }
+ return translator.getUrn(transform);
+ }
+
+ /**
* Returns the URN for the transform if it is known, otherwise throws.
*/
public static String urnForTransform(PTransform<?, ?> transform) {
@@ -176,13 +207,14 @@ public class PTransformTranslation {
* fully expanded in the pipeline proto.
*/
public abstract static class RawPTransform<
- InputT extends PInput, OutputT extends POutput, PayloadT extends Message>
+ InputT extends PInput, OutputT extends POutput>
extends PTransform<InputT, OutputT> {
+ @Nullable
public abstract String getUrn();
@Nullable
- PayloadT getPayload() {
+ public Any getPayload() {
return null;
}
}
@@ -190,24 +222,29 @@ public class PTransformTranslation {
/**
* A translator that uses the explicit URN and payload from a {@link RawPTransform}.
*/
- public static class RawPTransformTranslator<PayloadT extends Message>
- implements TransformPayloadTranslator<RawPTransform<?, ?, PayloadT>> {
+ public static class RawPTransformTranslator
+ implements TransformPayloadTranslator<RawPTransform<?, ?>> {
@Override
- public String getUrn(RawPTransform<?, ?, PayloadT> transform) {
+ public String getUrn(RawPTransform<?, ?> transform) {
return transform.getUrn();
}
@Override
public FunctionSpec translate(
- AppliedPTransform<?, ?, RawPTransform<?, ?, PayloadT>> transform,
+ AppliedPTransform<?, ?, RawPTransform<?, ?>> transform,
SdkComponents components) {
- PayloadT payload = transform.getTransform().getPayload();
+
+ // Anonymous composites have no spec
+ if (transform.getTransform().getUrn() == null) {
+ return null;
+ }
FunctionSpec.Builder transformSpec =
FunctionSpec.newBuilder().setUrn(getUrn(transform.getTransform()));
+ Any payload = transform.getTransform().getPayload();
if (payload != null) {
- transformSpec.setParameter(Any.pack(payload));
+ transformSpec.setParameter(payload);
}
return transformSpec.build();
http://git-wip-us.apache.org/repos/asf/beam/blob/840492d9/runners/direct-java/pom.xml
----------------------------------------------------------------------
diff --git a/runners/direct-java/pom.xml b/runners/direct-java/pom.xml
index cba4b09..bec2113 100644
--- a/runners/direct-java/pom.xml
+++ b/runners/direct-java/pom.xml
@@ -208,11 +208,6 @@
</dependency>
<dependency>
- <groupId>com.google.protobuf</groupId>
- <artifactId>protobuf-java</artifactId>
- </dependency>
-
- <dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
</dependency>
http://git-wip-us.apache.org/repos/asf/beam/blob/840492d9/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKey.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKey.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKey.java
index f239070..2fc0dd4 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKey.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKey.java
@@ -20,7 +20,6 @@ package org.apache.beam.runners.direct;
import static com.google.common.base.Preconditions.checkArgument;
-import com.google.protobuf.Message;
import org.apache.beam.runners.core.KeyedWorkItem;
import org.apache.beam.runners.core.KeyedWorkItemCoder;
import org.apache.beam.runners.core.construction.ForwardingPTransform;
@@ -74,7 +73,7 @@ class DirectGroupByKey<K, V>
static final class DirectGroupByKeyOnly<K, V>
extends PTransformTranslation.RawPTransform<
- PCollection<KV<K, V>>, PCollection<KeyedWorkItem<K, V>>, Message> {
+ PCollection<KV<K, V>>, PCollection<KeyedWorkItem<K, V>>> {
@Override
public PCollection<KeyedWorkItem<K, V>> expand(PCollection<KV<K, V>> input) {
return PCollection.createPrimitiveOutputInternal(
@@ -101,7 +100,7 @@ class DirectGroupByKey<K, V>
static final class DirectGroupAlsoByWindow<K, V>
extends PTransformTranslation.RawPTransform<
- PCollection<KeyedWorkItem<K, V>>, PCollection<KV<K, Iterable<V>>>, Message> {
+ PCollection<KeyedWorkItem<K, V>>, PCollection<KV<K, Iterable<V>>>> {
private final WindowingStrategy<?, ?> inputWindowingStrategy;
private final WindowingStrategy<?, ?> outputWindowingStrategy;
http://git-wip-us.apache.org/repos/asf/beam/blob/840492d9/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
index df2054b..858ea34 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
@@ -19,7 +19,6 @@ package org.apache.beam.runners.direct;
import static com.google.common.base.Preconditions.checkState;
-import com.google.protobuf.Message;
import java.util.Map;
import org.apache.beam.runners.core.KeyedWorkItem;
import org.apache.beam.runners.core.KeyedWorkItemCoder;
@@ -172,7 +171,7 @@ class ParDoMultiOverrideFactory<InputT, OutputT>
static class StatefulParDo<K, InputT, OutputT>
extends PTransformTranslation.RawPTransform<
- PCollection<? extends KeyedWorkItem<K, KV<K, InputT>>>, PCollectionTuple, Message> {
+ PCollection<? extends KeyedWorkItem<K, KV<K, InputT>>>, PCollectionTuple> {
private final transient MultiOutput<KV<K, InputT>, OutputT> underlyingParDo;
private final transient PCollection<KV<K, InputT>> originalInput;
http://git-wip-us.apache.org/repos/asf/beam/blob/840492d9/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java
index b1db58f..2da7a71 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java
@@ -22,7 +22,6 @@ import com.google.auto.value.AutoValue;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Supplier;
import com.google.common.collect.Iterables;
-import com.google.protobuf.Message;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
@@ -185,7 +184,7 @@ class TestStreamEvaluatorFactory implements TransformEvaluatorFactory {
static final String DIRECT_TEST_STREAM_URN = "urn:beam:directrunner:transforms:test_stream:v1";
static class DirectTestStream<T>
- extends PTransformTranslation.RawPTransform<PBegin, PCollection<T>, Message> {
+ extends PTransformTranslation.RawPTransform<PBegin, PCollection<T>> {
private final transient DirectRunner runner;
private final TestStream<T> original;
http://git-wip-us.apache.org/repos/asf/beam/blob/840492d9/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
index d144b20..0c907df 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
@@ -92,17 +92,17 @@ class TransformEvaluatorRegistry implements TransformEvaluatorFactory {
.<Class<? extends PTransform>, PTransformTranslation.TransformPayloadTranslator>builder()
.put(
DirectGroupByKey.DirectGroupByKeyOnly.class,
- new PTransformTranslation.RawPTransformTranslator<>())
+ new PTransformTranslation.RawPTransformTranslator())
.put(
DirectGroupByKey.DirectGroupAlsoByWindow.class,
new PTransformTranslation.RawPTransformTranslator())
.put(
ParDoMultiOverrideFactory.StatefulParDo.class,
- new PTransformTranslation.RawPTransformTranslator<>())
+ new PTransformTranslation.RawPTransformTranslator())
.put(
ViewOverrideFactory.WriteView.class,
- new PTransformTranslation.RawPTransformTranslator<>())
- .put(DirectTestStream.class, new PTransformTranslation.RawPTransformTranslator<>())
+ new PTransformTranslation.RawPTransformTranslator())
+ .put(DirectTestStream.class, new PTransformTranslation.RawPTransformTranslator())
.put(
SplittableParDoViaKeyedWorkItems.ProcessElements.class,
new SplittableParDoProcessElementsTranslator())
http://git-wip-us.apache.org/repos/asf/beam/blob/840492d9/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewOverrideFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewOverrideFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewOverrideFactory.java
index 501b436..fdff63d 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewOverrideFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewOverrideFactory.java
@@ -18,7 +18,6 @@
package org.apache.beam.runners.direct;
-import com.google.protobuf.Message;
import java.util.Collections;
import java.util.Map;
import org.apache.beam.runners.core.construction.ForwardingPTransform;
@@ -95,7 +94,7 @@ class ViewOverrideFactory<ElemT, ViewT>
* to {@link ViewT}.
*/
static final class WriteView<ElemT, ViewT>
- extends RawPTransform<PCollection<Iterable<ElemT>>, PCollectionView<ViewT>, Message> {
+ extends RawPTransform<PCollection<Iterable<ElemT>>, PCollectionView<ViewT>> {
private final CreatePCollectionView<ElemT, ViewT> og;
WriteView(CreatePCollectionView<ElemT, ViewT> og) {