You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tg...@apache.org on 2017/04/10 22:14:45 UTC

[1/2] beam git commit: This closes #2459

Repository: beam
Updated Branches:
  refs/heads/master fc1006500 -> 8afa62398


This closes #2459


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/8afa6239
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/8afa6239
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/8afa6239

Branch: refs/heads/master
Commit: 8afa62398d51e3b11215970b139ab839ccb2e91a
Parents: fc10065 b7d7adc
Author: Thomas Groh <tg...@google.com>
Authored: Mon Apr 10 15:14:34 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Mon Apr 10 15:14:34 2017 -0700

----------------------------------------------------------------------
 runners/core-construction-java/pom.xml          |   4 +
 .../core/construction/SdkComponents.java        | 152 +++++++++++++++++++
 .../core/construction/SdkComponentsTest.java    | 131 ++++++++++++++++
 .../beam/sdk/transforms/AppliedPTransform.java  |   2 +
 4 files changed, 289 insertions(+)
----------------------------------------------------------------------



[2/2] beam git commit: Add SdkComponents

Posted by tg...@apache.org.
Add SdkComponents

This takes SDK objects and assigns IDs to them. It is effectively a
ComponentsBuilder context where a component is referred to by the
Java object which is being translated, rather than by an opaque
string or protocol buffer.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b7d7adc8
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b7d7adc8
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b7d7adc8

Branch: refs/heads/master
Commit: b7d7adc879694cf5b22f80a26a46982730a483ec
Parents: fc10065
Author: Thomas Groh <tg...@google.com>
Authored: Fri Apr 7 09:17:19 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Mon Apr 10 15:14:34 2017 -0700

----------------------------------------------------------------------
 runners/core-construction-java/pom.xml          |   4 +
 .../core/construction/SdkComponents.java        | 152 +++++++++++++++++++
 .../core/construction/SdkComponentsTest.java    | 131 ++++++++++++++++
 .../beam/sdk/transforms/AppliedPTransform.java  |   2 +
 4 files changed, 289 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/b7d7adc8/runners/core-construction-java/pom.xml
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/pom.xml b/runners/core-construction-java/pom.xml
index 78b6819..ee64f91 100644
--- a/runners/core-construction-java/pom.xml
+++ b/runners/core-construction-java/pom.xml
@@ -56,6 +56,10 @@
   <dependencies>
     <dependency>
       <groupId>org.apache.beam</groupId>
+      <artifactId>beam-sdks-common-runner-api</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.beam</groupId>
       <artifactId>beam-sdks-java-core</artifactId>
     </dependency>
 

http://git-wip-us.apache.org/repos/asf/beam/blob/b7d7adc8/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SdkComponents.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SdkComponents.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SdkComponents.java
new file mode 100644
index 0000000..c4b8cf1
--- /dev/null
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SdkComponents.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.core.construction;
+
+import com.google.common.base.Equivalence;
+import com.google.common.collect.BiMap;
+import com.google.common.collect.HashBiMap;
+import java.util.Set;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi.Components;
+import org.apache.beam.sdk.transforms.AppliedPTransform;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.util.NameUtils;
+import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.values.PCollection;
+
+/** SDK objects that will be represented at some later point within a {@link Components} object. */
+class SdkComponents {
+  private final RunnerApi.Components.Builder componentsBuilder;
+
+  private final BiMap<AppliedPTransform<?, ?, ?>, String> transformIds;
+  private final BiMap<PCollection<?>, String> pCollectionIds;
+  private final BiMap<WindowingStrategy<?, ?>, String> windowingStrategyIds;
+
+  /** A map of Coder to IDs. Coders are stored here with identity equivalence. */
+  private final BiMap<Equivalence.Wrapper<? extends Coder<?>>, String> coderIds;
+  // TODO: Specify environments
+
+  /** Create a new {@link SdkComponents} with no components. */
+  static SdkComponents create() {
+    return new SdkComponents();
+  }
+
+  private SdkComponents() {
+    this.componentsBuilder = RunnerApi.Components.newBuilder();
+    this.transformIds = HashBiMap.create();
+    this.pCollectionIds = HashBiMap.create();
+    this.windowingStrategyIds = HashBiMap.create();
+    this.coderIds = HashBiMap.create();
+  }
+
+  /**
+   * Registers the provided {@link AppliedPTransform} into this {@link SdkComponents}, returning a
+   * unique ID for the {@link AppliedPTransform}. Multiple registrations of the same
+   * {@link AppliedPTransform} will return the same unique ID.
+   */
+  String registerPTransform(AppliedPTransform<?, ?, ?> pTransform) {
+    String existing = transformIds.get(pTransform);
+    if (existing != null) {
+      return existing;
+    }
+    String name = pTransform.getFullName();
+    if (name.isEmpty()) {
+      name = uniqify("unnamed_ptransform", transformIds.values());
+    }
+    transformIds.put(pTransform, name);
+    return name;
+  }
+
+  /**
+   * Registers the provided {@link PCollection} into this {@link SdkComponents}, returning a unique
+   * ID for the {@link PCollection}. Multiple registrations of the same {@link PCollection} will
+   * return the same unique ID.
+   */
+  String registerPCollection(PCollection<?> pCollection) {
+    String existing = pCollectionIds.get(pCollection);
+    if (existing != null) {
+      return existing;
+    }
+    String uniqueName = uniqify(pCollection.getName(), pCollectionIds.values());
+    pCollectionIds.put(pCollection, uniqueName);
+    return uniqueName;
+  }
+
+  /**
+   * Registers the provided {@link WindowingStrategy} into this {@link SdkComponents}, returning a
+   * unique ID for the {@link WindowingStrategy}. Multiple registrations of the same {@link
+   * WindowingStrategy} will return the same unique ID.
+   */
+  String registerWindowingStrategy(WindowingStrategy<?, ?> windowingStrategy) {
+    String existing = windowingStrategyIds.get(windowingStrategy);
+    if (existing != null) {
+      return existing;
+    }
+    String baseName =
+        String.format(
+            "%s(%s)",
+            NameUtils.approximateSimpleName(windowingStrategy),
+            NameUtils.approximateSimpleName(windowingStrategy.getWindowFn()));
+    String name = uniqify(baseName, windowingStrategyIds.values());
+    windowingStrategyIds.put(windowingStrategy, name);
+    return name;
+  }
+
+  /**
+   * Registers the provided {@link Coder} into this {@link SdkComponents}, returning a unique ID for
+   * the {@link Coder}. Multiple registrations of the same {@link Coder} will return the same
+   * unique ID.
+   *
+   * <p>Coders are stored by identity to ensure that coders with implementations of {@link
+   * #equals(Object)} and {@link #hashCode()} but incompatible binary formats are not considered the
+   * same coder.
+   */
+  String registerCoder(Coder<?> coder) {
+    String existing = coderIds.get(Equivalence.identity().wrap(coder));
+    if (existing != null) {
+      return existing;
+    }
+    String baseName = NameUtils.approximateSimpleName(coder);
+    String name = uniqify(baseName, coderIds.values());
+    coderIds.put(Equivalence.identity().wrap(coder), name);
+    return name;
+  }
+
+  private String uniqify(String baseName, Set<String> existing) {
+    String name = baseName;
+    int increment = 1;
+    while (existing.contains(name)) {
+      name = baseName + Integer.toString(increment);
+      increment++;
+    }
+    return name;
+  }
+
+  /**
+   * Convert this {@link SdkComponents} into a {@link RunnerApi.Components}, including all of the
+   * contained {@link Coder coders}, {@link WindowingStrategy windowing strategies}, {@link
+   * PCollection PCollections}, and {@link PTransform PTransforms}.
+   */
+  @Experimental
+  RunnerApi.Components toComponents() {
+    return componentsBuilder.build();
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/b7d7adc8/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SdkComponentsTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SdkComponentsTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SdkComponentsTest.java
new file mode 100644
index 0000000..c96e57c
--- /dev/null
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SdkComponentsTest.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.core.construction;
+
+import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.isEmptyOrNullString;
+import static org.hamcrest.Matchers.not;
+import static org.junit.Assert.assertThat;
+
+import org.apache.beam.sdk.coders.ByteArrayCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.IterableCoder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.SetCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.VarLongCoder;
+import org.apache.beam.sdk.io.CountingInput;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.AppliedPTransform;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.util.WindowingStrategy.AccumulationMode;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link SdkComponents}. */
+@RunWith(JUnit4.class)
+public class SdkComponentsTest {
+  @Rule
+  public TestPipeline pipeline = TestPipeline.create().enableAbandonedNodeEnforcement(false);
+  @Rule
+  public ExpectedException thrown = ExpectedException.none();
+
+  private SdkComponents components = SdkComponents.create();
+
+  @Test
+  public void registerCoder() {
+    Coder<?> coder =
+        KvCoder.of(StringUtf8Coder.of(), IterableCoder.of(SetCoder.of(ByteArrayCoder.of())));
+    String id = components.registerCoder(coder);
+    assertThat(components.registerCoder(coder), equalTo(id));
+    assertThat(id, not(isEmptyOrNullString()));
+    assertThat(components.registerCoder(VarLongCoder.of()), not(equalTo(id)));
+  }
+
+  @Test
+  public void registerTransform() {
+    Create.Values<Integer> create = Create.of(1, 2, 3);
+    PCollection<Integer> pt = pipeline.apply(create);
+    String userName = "my_transform/my_nesting";
+    AppliedPTransform<?, ?, ?> transform =
+        AppliedPTransform.<PBegin, PCollection<Integer>, Create.Values<Integer>>of(
+            userName, pipeline.begin().expand(), pt.expand(), create, pipeline);
+    String componentName = components.registerPTransform(transform);
+    assertThat(componentName, equalTo(userName));
+    assertThat(components.registerPTransform(transform), equalTo(componentName));
+  }
+
+  @Test
+  public void registerTransformIdEmptyFullName() {
+    Create.Values<Integer> create = Create.of(1, 2, 3);
+    PCollection<Integer> pt = pipeline.apply(create);
+    AppliedPTransform<?, ?, ?> transform =
+        AppliedPTransform.<PBegin, PCollection<Integer>, Create.Values<Integer>>of(
+            "", pipeline.begin().expand(), pt.expand(), create, pipeline);
+    String assignedName = components.registerPTransform(transform);
+
+    assertThat(assignedName, not(isEmptyOrNullString()));
+  }
+
+  @Test
+  public void registerPCollection() {
+    PCollection<Long> pCollection = pipeline.apply(CountingInput.unbounded()).setName("foo");
+    String id = components.registerPCollection(pCollection);
+    assertThat(id, equalTo("foo"));
+  }
+
+  @Test
+  public void registerPCollectionExistingNameCollision() {
+    PCollection<Long> pCollection =
+        pipeline.apply("FirstCount", CountingInput.unbounded()).setName("foo");
+    String firstId = components.registerPCollection(pCollection);
+    PCollection<Long> duplicate =
+        pipeline.apply("SecondCount", CountingInput.unbounded()).setName("foo");
+    String secondId = components.registerPCollection(duplicate);
+    assertThat(firstId, equalTo("foo"));
+    assertThat(secondId, containsString("foo"));
+    assertThat(secondId, not(equalTo("foo")));
+  }
+
+  @Test
+  public void registerWindowingStrategy() {
+    WindowingStrategy<?, ?> strategy =
+        WindowingStrategy.globalDefault().withMode(AccumulationMode.ACCUMULATING_FIRED_PANES);
+    String name = components.registerWindowingStrategy(strategy);
+    assertThat(name, not(isEmptyOrNullString()));
+  }
+
+  @Test
+  public void registerWindowingStrategyIdEqualStrategies() {
+    WindowingStrategy<?, ?> strategy =
+        WindowingStrategy.globalDefault().withMode(AccumulationMode.ACCUMULATING_FIRED_PANES);
+    String name = components.registerWindowingStrategy(strategy);
+    String duplicateName =
+        components.registerWindowingStrategy(
+            WindowingStrategy.globalDefault().withMode(AccumulationMode.ACCUMULATING_FIRED_PANES));
+    assertThat(name, equalTo(duplicateName));
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/b7d7adc8/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/AppliedPTransform.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/AppliedPTransform.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/AppliedPTransform.java
index 4de81ac..e78d795 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/AppliedPTransform.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/AppliedPTransform.java
@@ -38,6 +38,8 @@ import org.apache.beam.sdk.values.TaggedPValue;
 public abstract class AppliedPTransform<
     InputT extends PInput, OutputT extends POutput,
     TransformT extends PTransform<? super InputT, OutputT>> {
+  // To prevent extension outside of this package.
+  AppliedPTransform() {}
 
   public static <InputT extends PInput, OutputT extends POutput,
           TransformT extends PTransform<? super InputT, OutputT>>