You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/06/06 03:06:22 UTC

[1/3] beam git commit: Add CreatePCollectionView translation

Repository: beam
Updated Branches:
  refs/heads/master 1cc6dc120 -> 6d64c6ec1


Add CreatePCollectionView translation


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/c3b036a2
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/c3b036a2
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/c3b036a2

Branch: refs/heads/master
Commit: c3b036a243c768546f0273e22fb44eaa2fcfb245
Parents: ae7bc1d
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu May 25 06:56:23 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Mon Jun 5 19:48:27 2017 -0700

----------------------------------------------------------------------
 .../CreatePCollectionViewTranslation.java       | 126 +++++++++++++++++
 .../construction/PTransformTranslation.java     |  10 +-
 .../CreatePCollectionViewTranslationTest.java   | 136 +++++++++++++++++++
 3 files changed, 270 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/c3b036a2/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CreatePCollectionViewTranslation.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CreatePCollectionViewTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CreatePCollectionViewTranslation.java
new file mode 100644
index 0000000..aa24909
--- /dev/null
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CreatePCollectionViewTranslation.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.core.construction;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.auto.service.AutoService;
+import com.google.protobuf.Any;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.BytesValue;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Map;
+import org.apache.beam.runners.core.construction.PTransformTranslation.TransformPayloadTranslator;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi.FunctionSpec;
+import org.apache.beam.sdk.runners.AppliedPTransform;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.View;
+import org.apache.beam.sdk.transforms.View.CreatePCollectionView;
+import org.apache.beam.sdk.util.SerializableUtils;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionView;
+
+/**
+ * Utility methods for translating a {@link View} transforms to and from {@link RunnerApi}
+ * representations.
+ *
+ * @deprecated this should generally be done as part of {@link ParDo} translation, or moved into a
+ *     dedicated runners-core-construction auxiliary class
+ */
+@Deprecated
+public class CreatePCollectionViewTranslation {
+
+  /**
+   * @deprecated Since {@link CreatePCollectionView} is not a part of the Beam model, there is no
+   *     SDK-agnostic specification. Using this method means your runner is tied to Java.
+   */
+  @Deprecated
+  public static <ElemT, ViewT> PCollectionView<ViewT> getView(
+      AppliedPTransform<
+              PCollection<ElemT>, PCollectionView<ViewT>,
+              PTransform<PCollection<ElemT>, PCollectionView<ViewT>>>
+          application)
+      throws IOException {
+
+    RunnerApi.PTransform transformProto =
+        PTransformTranslation.toProto(
+            application,
+            Collections.<AppliedPTransform<?, ?, ?>>emptyList(),
+            SdkComponents.create());
+
+    checkArgument(
+        PTransformTranslation.CREATE_VIEW_TRANSFORM_URN.equals(transformProto.getSpec().getUrn()),
+        "Illegal attempt to extract %s from transform %s with name \"%s\" and URN \"%s\"",
+        PCollectionView.class.getSimpleName(),
+        application.getTransform(),
+        application.getFullName(),
+        transformProto.getSpec().getUrn());
+
+    return (PCollectionView<ViewT>)
+        SerializableUtils.deserializeFromByteArray(
+            transformProto
+                .getSpec()
+                .getParameter()
+                .unpack(BytesValue.class)
+                .getValue()
+                .toByteArray(),
+            PCollectionView.class.getSimpleName());
+  }
+
+  @Deprecated
+  static class CreatePCollectionViewTranslator
+      implements TransformPayloadTranslator<View.CreatePCollectionView<?, ?>> {
+    @Override
+    public String getUrn(View.CreatePCollectionView<?, ?> transform) {
+      return PTransformTranslation.CREATE_VIEW_TRANSFORM_URN;
+    }
+
+    @Override
+    public FunctionSpec translate(
+        AppliedPTransform<?, ?, View.CreatePCollectionView<?, ?>> transform,
+        SdkComponents components) {
+      return FunctionSpec.newBuilder()
+          .setUrn(getUrn(transform.getTransform()))
+          .setParameter(
+              Any.pack(
+                  BytesValue.newBuilder()
+                      .setValue(
+                          ByteString.copyFrom(
+                              SerializableUtils.serializeToByteArray(
+                                  transform.getTransform().getView())))
+                      .build()))
+          .build();
+    }
+  }
+
+  /** Registers {@link CreatePCollectionViewTranslator}. */
+  @AutoService(TransformPayloadTranslatorRegistrar.class)
+  @Deprecated
+  public static class Registrar implements TransformPayloadTranslatorRegistrar {
+    @Override
+    public Map<? extends Class<? extends PTransform>, ? extends TransformPayloadTranslator>
+        getTransformPayloadTranslators() {
+      return Collections.singletonMap(
+          View.CreatePCollectionView.class, new CreatePCollectionViewTranslator());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/c3b036a2/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java
index fcbe84b..7c5c593 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java
@@ -56,6 +56,9 @@ public class PTransformTranslation {
   // Less well-known. And where shall these live?
   public static final String WRITE_FILES_TRANSFORM_URN = "urn:beam:transform:write_files:0.1";
 
+  @Deprecated
+  public static final String CREATE_VIEW_TRANSFORM_URN = "urn:beam:transform:create_view:v1";
+
   private static final Map<Class<? extends PTransform>, TransformPayloadTranslator>
       KNOWN_PAYLOAD_TRANSLATORS = loadTransformPayloadTranslators();
 
@@ -141,9 +144,11 @@ public class PTransformTranslation {
     return tag.getId();
   }
 
+  /**
+   * Returns the URN for the transform if it is known, otherwise throws.
+   */
   public static String urnForTransform(PTransform<?, ?> transform) {
-    TransformPayloadTranslator translator =
-    KNOWN_PAYLOAD_TRANSLATORS.get(transform.getClass());
+    TransformPayloadTranslator translator = KNOWN_PAYLOAD_TRANSLATORS.get(transform.getClass());
     if (translator == null) {
       throw new IllegalStateException(
           String.format("No translator known for %s", transform.getClass().getName()));
@@ -158,6 +163,7 @@ public class PTransformTranslation {
    */
   public interface TransformPayloadTranslator<T extends PTransform<?, ?>> {
     String getUrn(T transform);
+
     FunctionSpec translate(AppliedPTransform<?, ?, T> application, SdkComponents components)
         throws IOException;
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/c3b036a2/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CreatePCollectionViewTranslationTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CreatePCollectionViewTranslationTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CreatePCollectionViewTranslationTest.java
new file mode 100644
index 0000000..0d209a0
--- /dev/null
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CreatePCollectionViewTranslationTest.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.core.construction;
+
+import static org.junit.Assert.assertThat;
+
+import com.google.common.collect.ImmutableList;
+import com.google.protobuf.BytesValue;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi.FunctionSpec;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi.ParDoPayload;
+import org.apache.beam.sdk.runners.AppliedPTransform;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.View.CreatePCollectionView;
+import org.apache.beam.sdk.util.SerializableUtils;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.PCollectionViews;
+import org.hamcrest.Matchers;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameter;
+import org.junit.runners.Parameterized.Parameters;
+import org.junit.runners.Suite;
+
+/** Tests for {@link CreatePCollectionViewTranslation}. */
+@RunWith(Suite.class)
+@Suite.SuiteClasses({
+  CreatePCollectionViewTranslationTest.TestCreatePCollectionViewPayloadTranslation.class,
+})
+public class CreatePCollectionViewTranslationTest {
+
+  /** Tests for translating various {@link ParDo} transforms to/from {@link ParDoPayload} protos. */
+  @RunWith(Parameterized.class)
+  public static class TestCreatePCollectionViewPayloadTranslation {
+
+    // Two parameters suffices because the nature of the serialization/deserialization of
+    // the view is not what is being tested; it is just important that the round trip
+    // is not vacuous.
+    @Parameters(name = "{index}: {0}")
+    public static Iterable<CreatePCollectionView<?, ?>> data() {
+      return ImmutableList.<CreatePCollectionView<?, ?>>of(
+          CreatePCollectionView.of(
+              PCollectionViews.singletonView(
+                  testPCollection,
+                  testPCollection.getWindowingStrategy(),
+                  false,
+                  null,
+                  testPCollection.getCoder())),
+          CreatePCollectionView.of(
+              PCollectionViews.listView(
+                  testPCollection,
+                  testPCollection.getWindowingStrategy(),
+                  testPCollection.getCoder())));
+    }
+
+    @Parameter(0)
+    public CreatePCollectionView<?, ?> createViewTransform;
+
+    public static TestPipeline p = TestPipeline.create().enableAbandonedNodeEnforcement(false);
+
+    private static final PCollection<String> testPCollection = p.apply(Create.of("one"));
+
+    @Test
+    public void testEncodedProto() throws Exception {
+      SdkComponents components = SdkComponents.create();
+      components.registerPCollection(testPCollection);
+
+      AppliedPTransform<?, ?, ?> appliedPTransform =
+          AppliedPTransform.of(
+              "foo",
+              testPCollection.expand(),
+              createViewTransform.getView().expand(),
+              createViewTransform,
+              p);
+
+      FunctionSpec payload = PTransformTranslation.toProto(appliedPTransform, components).getSpec();
+
+      // Checks that the payload is what it should be
+      PCollectionView<?> deserializedView =
+          (PCollectionView<?>)
+              SerializableUtils.deserializeFromByteArray(
+                  payload.getParameter().unpack(BytesValue.class).getValue().toByteArray(),
+                  PCollectionView.class.getSimpleName());
+
+      assertThat(
+          deserializedView, Matchers.<PCollectionView<?>>equalTo(createViewTransform.getView()));
+    }
+
+    @Test
+    public void testExtractionDirectFromTransform() throws Exception {
+      SdkComponents components = SdkComponents.create();
+      components.registerPCollection(testPCollection);
+
+      AppliedPTransform<?, ?, ?> appliedPTransform =
+          AppliedPTransform.of(
+              "foo",
+              testPCollection.expand(),
+              createViewTransform.getView().expand(),
+              createViewTransform,
+              p);
+
+      CreatePCollectionViewTranslation.getView((AppliedPTransform) appliedPTransform);
+
+      FunctionSpec payload = PTransformTranslation.toProto(appliedPTransform, components).getSpec();
+
+      // Checks that the payload is what it should be
+      PCollectionView<?> deserializedView =
+          (PCollectionView<?>)
+              SerializableUtils.deserializeFromByteArray(
+                  payload.getParameter().unpack(BytesValue.class).getValue().toByteArray(),
+                  PCollectionView.class.getSimpleName());
+
+      assertThat(
+          deserializedView, Matchers.<PCollectionView<?>>equalTo(createViewTransform.getView()));
+    }
+  }
+}


[3/3] beam git commit: This closes #3281: [BEAM-3271] Add CreatePCollectionView translation

Posted by ke...@apache.org.
This closes #3281: [BEAM-3271] Add CreatePCollectionView translation

  Add CreatePCollectionView translation
  Clarify javadoc on PTransformTranslation


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/6d64c6ec
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/6d64c6ec
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/6d64c6ec

Branch: refs/heads/master
Commit: 6d64c6ec1be2134b577161f2709ada4e10bfaeb0
Parents: 1cc6dc1 c3b036a
Author: Kenneth Knowles <kl...@google.com>
Authored: Mon Jun 5 19:49:33 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Mon Jun 5 19:49:33 2017 -0700

----------------------------------------------------------------------
 .../CreatePCollectionViewTranslation.java       | 126 +++++++++++++++++
 .../construction/PTransformTranslation.java     |  16 ++-
 .../CreatePCollectionViewTranslationTest.java   | 136 +++++++++++++++++++
 3 files changed, 275 insertions(+), 3 deletions(-)
----------------------------------------------------------------------



[2/3] beam git commit: Clarify javadoc on PTransformTranslation

Posted by ke...@apache.org.
Clarify javadoc on PTransformTranslation


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/ae7bc1d7
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/ae7bc1d7
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/ae7bc1d7

Branch: refs/heads/master
Commit: ae7bc1d781f793d5091b70bab1c788b795866a8f
Parents: 1cc6dc1
Author: Kenneth Knowles <kl...@google.com>
Authored: Mon Jun 5 19:47:31 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Mon Jun 5 19:48:27 2017 -0700

----------------------------------------------------------------------
 .../beam/runners/core/construction/PTransformTranslation.java  | 6 +++++-
 1 file changed, 5 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/ae7bc1d7/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java
index fd3f9f3..fcbe84b 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java
@@ -29,6 +29,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.ServiceLoader;
 import javax.annotation.Nullable;
+import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.common.runner.v1.RunnerApi;
 import org.apache.beam.sdk.common.runner.v1.RunnerApi.FunctionSpec;
 import org.apache.beam.sdk.runners.AppliedPTransform;
@@ -123,7 +124,10 @@ public class PTransformTranslation {
   }
 
   /**
-   * Translates a non-composite {@link AppliedPTransform} into a runner API proto.
+   * Translates a composite {@link AppliedPTransform} into a runner API proto with no component
+   * transforms.
+   *
+   * <p>This should not be used when translating a {@link Pipeline}.
    *
    * <p>Does not register the {@code appliedPTransform} within the provided {@link SdkComponents}.
    */