You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tg...@apache.org on 2017/04/10 22:14:46 UTC
[2/2] beam git commit: Add SdkComponents
Add SdkComponents
This takes SDK objects and assigns IDs to them. It is effectively a
ComponentsBuilder context where a component is referred to by the
Java object which is being translated, rather than by an opaque
string or protocol buffer.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b7d7adc8
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b7d7adc8
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b7d7adc8
Branch: refs/heads/master
Commit: b7d7adc879694cf5b22f80a26a46982730a483ec
Parents: fc10065
Author: Thomas Groh <tg...@google.com>
Authored: Fri Apr 7 09:17:19 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Mon Apr 10 15:14:34 2017 -0700
----------------------------------------------------------------------
runners/core-construction-java/pom.xml | 4 +
.../core/construction/SdkComponents.java | 152 +++++++++++++++++++
.../core/construction/SdkComponentsTest.java | 131 ++++++++++++++++
.../beam/sdk/transforms/AppliedPTransform.java | 2 +
4 files changed, 289 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/b7d7adc8/runners/core-construction-java/pom.xml
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/pom.xml b/runners/core-construction-java/pom.xml
index 78b6819..ee64f91 100644
--- a/runners/core-construction-java/pom.xml
+++ b/runners/core-construction-java/pom.xml
@@ -56,6 +56,10 @@
<dependencies>
<dependency>
<groupId>org.apache.beam</groupId>
+ <artifactId>beam-sdks-common-runner-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-core</artifactId>
</dependency>
http://git-wip-us.apache.org/repos/asf/beam/blob/b7d7adc8/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SdkComponents.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SdkComponents.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SdkComponents.java
new file mode 100644
index 0000000..c4b8cf1
--- /dev/null
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SdkComponents.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.core.construction;
+
+import com.google.common.base.Equivalence;
+import com.google.common.collect.BiMap;
+import com.google.common.collect.HashBiMap;
+import java.util.Set;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi.Components;
+import org.apache.beam.sdk.transforms.AppliedPTransform;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.util.NameUtils;
+import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.values.PCollection;
+
+/** SDK objects that will be represented at some later point within a {@link Components} object. */
+class SdkComponents {
+ private final RunnerApi.Components.Builder componentsBuilder;
+
+ private final BiMap<AppliedPTransform<?, ?, ?>, String> transformIds;
+ private final BiMap<PCollection<?>, String> pCollectionIds;
+ private final BiMap<WindowingStrategy<?, ?>, String> windowingStrategyIds;
+
+ /** A map of Coder to IDs. Coders are stored here with identity equivalence. */
+ private final BiMap<Equivalence.Wrapper<? extends Coder<?>>, String> coderIds;
+ // TODO: Specify environments
+
+ /** Create a new {@link SdkComponents} with no components. */
+ static SdkComponents create() {
+ return new SdkComponents();
+ }
+
+ private SdkComponents() {
+ this.componentsBuilder = RunnerApi.Components.newBuilder();
+ this.transformIds = HashBiMap.create();
+ this.pCollectionIds = HashBiMap.create();
+ this.windowingStrategyIds = HashBiMap.create();
+ this.coderIds = HashBiMap.create();
+ }
+
+ /**
+ * Registers the provided {@link AppliedPTransform} into this {@link SdkComponents}, returning a
+ * unique ID for the {@link AppliedPTransform}. Multiple registrations of the same
+ * {@link AppliedPTransform} will return the same unique ID.
+ */
+ String registerPTransform(AppliedPTransform<?, ?, ?> pTransform) {
+ String existing = transformIds.get(pTransform);
+ if (existing != null) {
+ return existing;
+ }
+ String name = pTransform.getFullName();
+ if (name.isEmpty()) {
+ name = uniqify("unnamed_ptransform", transformIds.values());
+ }
+ transformIds.put(pTransform, name);
+ return name;
+ }
+
+ /**
+ * Registers the provided {@link PCollection} into this {@link SdkComponents}, returning a unique
+ * ID for the {@link PCollection}. Multiple registrations of the same {@link PCollection} will
+ * return the same unique ID.
+ */
+ String registerPCollection(PCollection<?> pCollection) {
+ String existing = pCollectionIds.get(pCollection);
+ if (existing != null) {
+ return existing;
+ }
+ String uniqueName = uniqify(pCollection.getName(), pCollectionIds.values());
+ pCollectionIds.put(pCollection, uniqueName);
+ return uniqueName;
+ }
+
+ /**
+ * Registers the provided {@link WindowingStrategy} into this {@link SdkComponents}, returning a
+ * unique ID for the {@link WindowingStrategy}. Multiple registrations of the same {@link
+ * WindowingStrategy} will return the same unique ID.
+ */
+ String registerWindowingStrategy(WindowingStrategy<?, ?> windowingStrategy) {
+ String existing = windowingStrategyIds.get(windowingStrategy);
+ if (existing != null) {
+ return existing;
+ }
+ String baseName =
+ String.format(
+ "%s(%s)",
+ NameUtils.approximateSimpleName(windowingStrategy),
+ NameUtils.approximateSimpleName(windowingStrategy.getWindowFn()));
+ String name = uniqify(baseName, windowingStrategyIds.values());
+ windowingStrategyIds.put(windowingStrategy, name);
+ return name;
+ }
+
+ /**
+ * Registers the provided {@link Coder} into this {@link SdkComponents}, returning a unique ID for
+ * the {@link Coder}. Multiple registrations of the same {@link Coder} will return the same
+ * unique ID.
+ *
+ * <p>Coders are stored by identity to ensure that coders with implementations of {@link
+ * #equals(Object)} and {@link #hashCode()} but incompatible binary formats are not considered the
+ * same coder.
+ */
+ String registerCoder(Coder<?> coder) {
+ String existing = coderIds.get(Equivalence.identity().wrap(coder));
+ if (existing != null) {
+ return existing;
+ }
+ String baseName = NameUtils.approximateSimpleName(coder);
+ String name = uniqify(baseName, coderIds.values());
+ coderIds.put(Equivalence.identity().wrap(coder), name);
+ return name;
+ }
+
+ private String uniqify(String baseName, Set<String> existing) {
+ String name = baseName;
+ int increment = 1;
+ while (existing.contains(name)) {
+ name = baseName + Integer.toString(increment);
+ increment++;
+ }
+ return name;
+ }
+
+ /**
+ * Convert this {@link SdkComponents} into a {@link RunnerApi.Components}, including all of the
+ * contained {@link Coder coders}, {@link WindowingStrategy windowing strategies}, {@link
+ * PCollection PCollections}, and {@link PTransform PTransforms}.
+ */
+ @Experimental
+ RunnerApi.Components toComponents() {
+ return componentsBuilder.build();
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/b7d7adc8/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SdkComponentsTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SdkComponentsTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SdkComponentsTest.java
new file mode 100644
index 0000000..c96e57c
--- /dev/null
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SdkComponentsTest.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.core.construction;
+
+import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.isEmptyOrNullString;
+import static org.hamcrest.Matchers.not;
+import static org.junit.Assert.assertThat;
+
+import org.apache.beam.sdk.coders.ByteArrayCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.IterableCoder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.SetCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.VarLongCoder;
+import org.apache.beam.sdk.io.CountingInput;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.AppliedPTransform;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.util.WindowingStrategy.AccumulationMode;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link SdkComponents}. */
+@RunWith(JUnit4.class)
+public class SdkComponentsTest {
+ @Rule
+ public TestPipeline pipeline = TestPipeline.create().enableAbandonedNodeEnforcement(false);
+ @Rule
+ public ExpectedException thrown = ExpectedException.none();
+
+ private SdkComponents components = SdkComponents.create();
+
+ @Test
+ public void registerCoder() {
+ Coder<?> coder =
+ KvCoder.of(StringUtf8Coder.of(), IterableCoder.of(SetCoder.of(ByteArrayCoder.of())));
+ String id = components.registerCoder(coder);
+ assertThat(components.registerCoder(coder), equalTo(id));
+ assertThat(id, not(isEmptyOrNullString()));
+ assertThat(components.registerCoder(VarLongCoder.of()), not(equalTo(id)));
+ }
+
+ @Test
+ public void registerTransform() {
+ Create.Values<Integer> create = Create.of(1, 2, 3);
+ PCollection<Integer> pt = pipeline.apply(create);
+ String userName = "my_transform/my_nesting";
+ AppliedPTransform<?, ?, ?> transform =
+ AppliedPTransform.<PBegin, PCollection<Integer>, Create.Values<Integer>>of(
+ userName, pipeline.begin().expand(), pt.expand(), create, pipeline);
+ String componentName = components.registerPTransform(transform);
+ assertThat(componentName, equalTo(userName));
+ assertThat(components.registerPTransform(transform), equalTo(componentName));
+ }
+
+ @Test
+ public void registerTransformIdEmptyFullName() {
+ Create.Values<Integer> create = Create.of(1, 2, 3);
+ PCollection<Integer> pt = pipeline.apply(create);
+ AppliedPTransform<?, ?, ?> transform =
+ AppliedPTransform.<PBegin, PCollection<Integer>, Create.Values<Integer>>of(
+ "", pipeline.begin().expand(), pt.expand(), create, pipeline);
+ String assignedName = components.registerPTransform(transform);
+
+ assertThat(assignedName, not(isEmptyOrNullString()));
+ }
+
+ @Test
+ public void registerPCollection() {
+ PCollection<Long> pCollection = pipeline.apply(CountingInput.unbounded()).setName("foo");
+ String id = components.registerPCollection(pCollection);
+ assertThat(id, equalTo("foo"));
+ }
+
+ @Test
+ public void registerPCollectionExistingNameCollision() {
+ PCollection<Long> pCollection =
+ pipeline.apply("FirstCount", CountingInput.unbounded()).setName("foo");
+ String firstId = components.registerPCollection(pCollection);
+ PCollection<Long> duplicate =
+ pipeline.apply("SecondCount", CountingInput.unbounded()).setName("foo");
+ String secondId = components.registerPCollection(duplicate);
+ assertThat(firstId, equalTo("foo"));
+ assertThat(secondId, containsString("foo"));
+ assertThat(secondId, not(equalTo("foo")));
+ }
+
+ @Test
+ public void registerWindowingStrategy() {
+ WindowingStrategy<?, ?> strategy =
+ WindowingStrategy.globalDefault().withMode(AccumulationMode.ACCUMULATING_FIRED_PANES);
+ String name = components.registerWindowingStrategy(strategy);
+ assertThat(name, not(isEmptyOrNullString()));
+ }
+
+ @Test
+ public void registerWindowingStrategyIdEqualStrategies() {
+ WindowingStrategy<?, ?> strategy =
+ WindowingStrategy.globalDefault().withMode(AccumulationMode.ACCUMULATING_FIRED_PANES);
+ String name = components.registerWindowingStrategy(strategy);
+ String duplicateName =
+ components.registerWindowingStrategy(
+ WindowingStrategy.globalDefault().withMode(AccumulationMode.ACCUMULATING_FIRED_PANES));
+ assertThat(name, equalTo(duplicateName));
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/b7d7adc8/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/AppliedPTransform.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/AppliedPTransform.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/AppliedPTransform.java
index 4de81ac..e78d795 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/AppliedPTransform.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/AppliedPTransform.java
@@ -38,6 +38,8 @@ import org.apache.beam.sdk.values.TaggedPValue;
public abstract class AppliedPTransform<
InputT extends PInput, OutputT extends POutput,
TransformT extends PTransform<? super InputT, OutputT>> {
+ // To prevent extension outside of this package.
+ AppliedPTransform() {}
public static <InputT extends PInput, OutputT extends POutput,
TransformT extends PTransform<? super InputT, OutputT>>