You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/06/06 03:06:22 UTC
[1/3] beam git commit: Add CreatePCollectionView translation
Repository: beam
Updated Branches:
refs/heads/master 1cc6dc120 -> 6d64c6ec1
Add CreatePCollectionView translation
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/c3b036a2
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/c3b036a2
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/c3b036a2
Branch: refs/heads/master
Commit: c3b036a243c768546f0273e22fb44eaa2fcfb245
Parents: ae7bc1d
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu May 25 06:56:23 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Mon Jun 5 19:48:27 2017 -0700
----------------------------------------------------------------------
.../CreatePCollectionViewTranslation.java | 126 +++++++++++++++++
.../construction/PTransformTranslation.java | 10 +-
.../CreatePCollectionViewTranslationTest.java | 136 +++++++++++++++++++
3 files changed, 270 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/c3b036a2/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CreatePCollectionViewTranslation.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CreatePCollectionViewTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CreatePCollectionViewTranslation.java
new file mode 100644
index 0000000..aa24909
--- /dev/null
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CreatePCollectionViewTranslation.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.core.construction;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.auto.service.AutoService;
+import com.google.protobuf.Any;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.BytesValue;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Map;
+import org.apache.beam.runners.core.construction.PTransformTranslation.TransformPayloadTranslator;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi.FunctionSpec;
+import org.apache.beam.sdk.runners.AppliedPTransform;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.View;
+import org.apache.beam.sdk.transforms.View.CreatePCollectionView;
+import org.apache.beam.sdk.util.SerializableUtils;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionView;
+
+/**
+ * Utility methods for translating a {@link View} transforms to and from {@link RunnerApi}
+ * representations.
+ *
+ * @deprecated this should generally be done as part of {@link ParDo} translation, or moved into a
+ * dedicated runners-core-construction auxiliary class
+ */
+@Deprecated
+public class CreatePCollectionViewTranslation {
+
+ /**
+ * @deprecated Since {@link CreatePCollectionView} is not a part of the Beam model, there is no
+ * SDK-agnostic specification. Using this method means your runner is tied to Java.
+ */
+ @Deprecated
+ public static <ElemT, ViewT> PCollectionView<ViewT> getView(
+ AppliedPTransform<
+ PCollection<ElemT>, PCollectionView<ViewT>,
+ PTransform<PCollection<ElemT>, PCollectionView<ViewT>>>
+ application)
+ throws IOException {
+
+ RunnerApi.PTransform transformProto =
+ PTransformTranslation.toProto(
+ application,
+ Collections.<AppliedPTransform<?, ?, ?>>emptyList(),
+ SdkComponents.create());
+
+ checkArgument(
+ PTransformTranslation.CREATE_VIEW_TRANSFORM_URN.equals(transformProto.getSpec().getUrn()),
+ "Illegal attempt to extract %s from transform %s with name \"%s\" and URN \"%s\"",
+ PCollectionView.class.getSimpleName(),
+ application.getTransform(),
+ application.getFullName(),
+ transformProto.getSpec().getUrn());
+
+ return (PCollectionView<ViewT>)
+ SerializableUtils.deserializeFromByteArray(
+ transformProto
+ .getSpec()
+ .getParameter()
+ .unpack(BytesValue.class)
+ .getValue()
+ .toByteArray(),
+ PCollectionView.class.getSimpleName());
+ }
+
+ @Deprecated
+ static class CreatePCollectionViewTranslator
+ implements TransformPayloadTranslator<View.CreatePCollectionView<?, ?>> {
+ @Override
+ public String getUrn(View.CreatePCollectionView<?, ?> transform) {
+ return PTransformTranslation.CREATE_VIEW_TRANSFORM_URN;
+ }
+
+ @Override
+ public FunctionSpec translate(
+ AppliedPTransform<?, ?, View.CreatePCollectionView<?, ?>> transform,
+ SdkComponents components) {
+ return FunctionSpec.newBuilder()
+ .setUrn(getUrn(transform.getTransform()))
+ .setParameter(
+ Any.pack(
+ BytesValue.newBuilder()
+ .setValue(
+ ByteString.copyFrom(
+ SerializableUtils.serializeToByteArray(
+ transform.getTransform().getView())))
+ .build()))
+ .build();
+ }
+ }
+
+ /** Registers {@link CreatePCollectionViewTranslator}. */
+ @AutoService(TransformPayloadTranslatorRegistrar.class)
+ @Deprecated
+ public static class Registrar implements TransformPayloadTranslatorRegistrar {
+ @Override
+ public Map<? extends Class<? extends PTransform>, ? extends TransformPayloadTranslator>
+ getTransformPayloadTranslators() {
+ return Collections.singletonMap(
+ View.CreatePCollectionView.class, new CreatePCollectionViewTranslator());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/c3b036a2/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java
index fcbe84b..7c5c593 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java
@@ -56,6 +56,9 @@ public class PTransformTranslation {
// Less well-known. And where shall these live?
public static final String WRITE_FILES_TRANSFORM_URN = "urn:beam:transform:write_files:0.1";
+ @Deprecated
+ public static final String CREATE_VIEW_TRANSFORM_URN = "urn:beam:transform:create_view:v1";
+
private static final Map<Class<? extends PTransform>, TransformPayloadTranslator>
KNOWN_PAYLOAD_TRANSLATORS = loadTransformPayloadTranslators();
@@ -141,9 +144,11 @@ public class PTransformTranslation {
return tag.getId();
}
+ /**
+ * Returns the URN for the transform if it is known, otherwise throws.
+ */
public static String urnForTransform(PTransform<?, ?> transform) {
- TransformPayloadTranslator translator =
- KNOWN_PAYLOAD_TRANSLATORS.get(transform.getClass());
+ TransformPayloadTranslator translator = KNOWN_PAYLOAD_TRANSLATORS.get(transform.getClass());
if (translator == null) {
throw new IllegalStateException(
String.format("No translator known for %s", transform.getClass().getName()));
@@ -158,6 +163,7 @@ public class PTransformTranslation {
*/
public interface TransformPayloadTranslator<T extends PTransform<?, ?>> {
String getUrn(T transform);
+
FunctionSpec translate(AppliedPTransform<?, ?, T> application, SdkComponents components)
throws IOException;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/c3b036a2/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CreatePCollectionViewTranslationTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CreatePCollectionViewTranslationTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CreatePCollectionViewTranslationTest.java
new file mode 100644
index 0000000..0d209a0
--- /dev/null
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CreatePCollectionViewTranslationTest.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.core.construction;
+
+import static org.junit.Assert.assertThat;
+
+import com.google.common.collect.ImmutableList;
+import com.google.protobuf.BytesValue;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi.FunctionSpec;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi.ParDoPayload;
+import org.apache.beam.sdk.runners.AppliedPTransform;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.View.CreatePCollectionView;
+import org.apache.beam.sdk.util.SerializableUtils;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.PCollectionViews;
+import org.hamcrest.Matchers;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameter;
+import org.junit.runners.Parameterized.Parameters;
+import org.junit.runners.Suite;
+
+/** Tests for {@link CreatePCollectionViewTranslation}. */
+@RunWith(Suite.class)
+@Suite.SuiteClasses({
+ CreatePCollectionViewTranslationTest.TestCreatePCollectionViewPayloadTranslation.class,
+})
+public class CreatePCollectionViewTranslationTest {
+
+ /** Tests for translating various {@link ParDo} transforms to/from {@link ParDoPayload} protos. */
+ @RunWith(Parameterized.class)
+ public static class TestCreatePCollectionViewPayloadTranslation {
+
+ // Two parameters suffices because the nature of the serialization/deserialization of
+ // the view is not what is being tested; it is just important that the round trip
+ // is not vacuous.
+ @Parameters(name = "{index}: {0}")
+ public static Iterable<CreatePCollectionView<?, ?>> data() {
+ return ImmutableList.<CreatePCollectionView<?, ?>>of(
+ CreatePCollectionView.of(
+ PCollectionViews.singletonView(
+ testPCollection,
+ testPCollection.getWindowingStrategy(),
+ false,
+ null,
+ testPCollection.getCoder())),
+ CreatePCollectionView.of(
+ PCollectionViews.listView(
+ testPCollection,
+ testPCollection.getWindowingStrategy(),
+ testPCollection.getCoder())));
+ }
+
+ @Parameter(0)
+ public CreatePCollectionView<?, ?> createViewTransform;
+
+ public static TestPipeline p = TestPipeline.create().enableAbandonedNodeEnforcement(false);
+
+ private static final PCollection<String> testPCollection = p.apply(Create.of("one"));
+
+ @Test
+ public void testEncodedProto() throws Exception {
+ SdkComponents components = SdkComponents.create();
+ components.registerPCollection(testPCollection);
+
+ AppliedPTransform<?, ?, ?> appliedPTransform =
+ AppliedPTransform.of(
+ "foo",
+ testPCollection.expand(),
+ createViewTransform.getView().expand(),
+ createViewTransform,
+ p);
+
+ FunctionSpec payload = PTransformTranslation.toProto(appliedPTransform, components).getSpec();
+
+ // Checks that the payload is what it should be
+ PCollectionView<?> deserializedView =
+ (PCollectionView<?>)
+ SerializableUtils.deserializeFromByteArray(
+ payload.getParameter().unpack(BytesValue.class).getValue().toByteArray(),
+ PCollectionView.class.getSimpleName());
+
+ assertThat(
+ deserializedView, Matchers.<PCollectionView<?>>equalTo(createViewTransform.getView()));
+ }
+
+ @Test
+ public void testExtractionDirectFromTransform() throws Exception {
+ SdkComponents components = SdkComponents.create();
+ components.registerPCollection(testPCollection);
+
+ AppliedPTransform<?, ?, ?> appliedPTransform =
+ AppliedPTransform.of(
+ "foo",
+ testPCollection.expand(),
+ createViewTransform.getView().expand(),
+ createViewTransform,
+ p);
+
+ CreatePCollectionViewTranslation.getView((AppliedPTransform) appliedPTransform);
+
+ FunctionSpec payload = PTransformTranslation.toProto(appliedPTransform, components).getSpec();
+
+ // Checks that the payload is what it should be
+ PCollectionView<?> deserializedView =
+ (PCollectionView<?>)
+ SerializableUtils.deserializeFromByteArray(
+ payload.getParameter().unpack(BytesValue.class).getValue().toByteArray(),
+ PCollectionView.class.getSimpleName());
+
+ assertThat(
+ deserializedView, Matchers.<PCollectionView<?>>equalTo(createViewTransform.getView()));
+ }
+ }
+}
[3/3] beam git commit: This closes #3281: [BEAM-3271] Add
CreatePCollectionView translation
Posted by ke...@apache.org.
This closes #3281: [BEAM-3271] Add CreatePCollectionView translation
Add CreatePCollectionView translation
Clarify javadoc on PTransformTranslation
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/6d64c6ec
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/6d64c6ec
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/6d64c6ec
Branch: refs/heads/master
Commit: 6d64c6ec1be2134b577161f2709ada4e10bfaeb0
Parents: 1cc6dc1 c3b036a
Author: Kenneth Knowles <kl...@google.com>
Authored: Mon Jun 5 19:49:33 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Mon Jun 5 19:49:33 2017 -0700
----------------------------------------------------------------------
.../CreatePCollectionViewTranslation.java | 126 +++++++++++++++++
.../construction/PTransformTranslation.java | 16 ++-
.../CreatePCollectionViewTranslationTest.java | 136 +++++++++++++++++++
3 files changed, 275 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
[2/3] beam git commit: Clarify javadoc on PTransformTranslation
Posted by ke...@apache.org.
Clarify javadoc on PTransformTranslation
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/ae7bc1d7
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/ae7bc1d7
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/ae7bc1d7
Branch: refs/heads/master
Commit: ae7bc1d781f793d5091b70bab1c788b795866a8f
Parents: 1cc6dc1
Author: Kenneth Knowles <kl...@google.com>
Authored: Mon Jun 5 19:47:31 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Mon Jun 5 19:48:27 2017 -0700
----------------------------------------------------------------------
.../beam/runners/core/construction/PTransformTranslation.java | 6 +++++-
1 file changed, 5 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/ae7bc1d7/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java
index fd3f9f3..fcbe84b 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java
@@ -29,6 +29,7 @@ import java.util.List;
import java.util.Map;
import java.util.ServiceLoader;
import javax.annotation.Nullable;
+import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.common.runner.v1.RunnerApi;
import org.apache.beam.sdk.common.runner.v1.RunnerApi.FunctionSpec;
import org.apache.beam.sdk.runners.AppliedPTransform;
@@ -123,7 +124,10 @@ public class PTransformTranslation {
}
/**
- * Translates a non-composite {@link AppliedPTransform} into a runner API proto.
+ * Translates a composite {@link AppliedPTransform} into a runner API proto with no component
+ * transforms.
+ *
+ * <p>This should not be used when translating a {@link Pipeline}.
*
* <p>Does not register the {@code appliedPTransform} within the provided {@link SdkComponents}.
*/