You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tg...@apache.org on 2017/05/01 18:18:59 UTC
[1/2] beam git commit: This closes #2689
Repository: beam
Updated Branches:
refs/heads/master fe2a32000 -> 23609efd8
This closes #2689
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/23609efd
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/23609efd
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/23609efd
Branch: refs/heads/master
Commit: 23609efd83a662717f7a49821eb67ac870470bb2
Parents: fe2a320 d1e4875
Author: Thomas Groh <tg...@google.com>
Authored: Mon May 1 11:18:36 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Mon May 1 11:18:36 2017 -0700
----------------------------------------------------------------------
.../core/construction/CoderTranslator.java | 44 ++++++++
.../core/construction/CoderTranslators.java | 107 +++++++++++++++++++
.../beam/runners/core/construction/Coders.java | 71 ++++++------
.../runners/core/construction/CodersTest.java | 18 +++-
.../beam/sdk/coders/LengthPrefixCoder.java | 4 +
5 files changed, 208 insertions(+), 36 deletions(-)
----------------------------------------------------------------------
[2/2] beam git commit: Add A CoderTranslator Interface
Posted by tg...@apache.org.
Add A CoderTranslator Interface
This will enable the removal of StandardCoder.getComponents
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d1e48750
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d1e48750
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d1e48750
Branch: refs/heads/master
Commit: d1e487506373fb7078650e42a307f117023316ee
Parents: fe2a320
Author: Thomas Groh <tg...@google.com>
Authored: Tue Apr 25 17:24:27 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Mon May 1 11:18:36 2017 -0700
----------------------------------------------------------------------
.../core/construction/CoderTranslator.java | 44 ++++++++
.../core/construction/CoderTranslators.java | 107 +++++++++++++++++++
.../beam/runners/core/construction/Coders.java | 71 ++++++------
.../runners/core/construction/CodersTest.java | 18 +++-
.../beam/sdk/coders/LengthPrefixCoder.java | 4 +
5 files changed, 208 insertions(+), 36 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/d1e48750/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CoderTranslator.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CoderTranslator.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CoderTranslator.java
new file mode 100644
index 0000000..26d8c1d
--- /dev/null
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CoderTranslator.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.core.construction;
+
+import java.util.List;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.coders.Coder;
+
+/**
+ * An interface that translates coders to components and back.
+ *
+ * <p>This interface is highly experimental, and incomplete. Coders must in the general case have
+ * the capability to encode an additional payload, which is not currently supported. This exists as
+ * a temporary measure.
+ */
+@Experimental(Kind.CORE_RUNNERS_ONLY)
+public interface CoderTranslator<T extends Coder<?>> {
+ /**
+ * Extract all component {@link Coder coders} within a coder.
+ */
+ List<? extends Coder<?>> getComponents(T from);
+
+ /**
+ * Create a {@link Coder} from its component {@link Coder coders}.
+ */
+ T fromComponents(List<Coder<?>> components);
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/d1e48750/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CoderTranslators.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CoderTranslators.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CoderTranslators.java
new file mode 100644
index 0000000..989a8b6
--- /dev/null
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CoderTranslators.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.core.construction;
+
+import com.google.common.collect.ImmutableList;
+import java.util.Collections;
+import java.util.List;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.IterableCoder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.LengthPrefixCoder;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.InstanceBuilder;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder;
+
+/** {@link CoderTranslator} implementations for known coder types. */
+class CoderTranslators {
+ private CoderTranslators() {}
+
+ static <T extends Coder<?>> CoderTranslator<T> atomic(final Class<T> clazz) {
+ return new CoderTranslator<T>() {
+ @Override
+ public List<? extends Coder<?>> getComponents(T from) {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public T fromComponents(List<Coder<?>> components) {
+ return InstanceBuilder.ofType(clazz).build();
+ }
+ };
+ }
+
+ static CoderTranslator<KvCoder<?, ?>> kv() {
+ return new CoderTranslator<KvCoder<?, ?>>() {
+ @Override
+ public List<? extends Coder<?>> getComponents(KvCoder<?, ?> from) {
+ return ImmutableList.of(from.getKeyCoder(), from.getValueCoder());
+ }
+
+ @Override
+ public KvCoder<?, ?> fromComponents(List<Coder<?>> components) {
+ return KvCoder.of(components.get(0), components.get(1));
+ }
+ };
+ }
+
+ static CoderTranslator<IterableCoder<?>> iterable() {
+ return new CoderTranslator<IterableCoder<?>>() {
+ @Override
+ public List<? extends Coder<?>> getComponents(IterableCoder<?> from) {
+ return Collections.singletonList(from.getElemCoder());
+ }
+
+ @Override
+ public IterableCoder<?> fromComponents(List<Coder<?>> components) {
+ return IterableCoder.of(components.get(0));
+ }
+ };
+ }
+
+ static CoderTranslator<LengthPrefixCoder<?>> lengthPrefix() {
+ return new CoderTranslator<LengthPrefixCoder<?>>() {
+ @Override
+ public List<? extends Coder<?>> getComponents(LengthPrefixCoder<?> from) {
+ return Collections.singletonList(from.getValueCoder());
+ }
+
+ @Override
+ public LengthPrefixCoder<?> fromComponents(List<Coder<?>> components) {
+ return LengthPrefixCoder.of(components.get(0));
+ }
+ };
+ }
+
+ static CoderTranslator<FullWindowedValueCoder<?>> fullWindowedValue() {
+ return new CoderTranslator<FullWindowedValueCoder<?>>() {
+ @Override
+ public List<? extends Coder<?>> getComponents(FullWindowedValueCoder<?> from) {
+ return ImmutableList.of(from.getValueCoder(), from.getWindowCoder());
+ }
+
+ @Override
+ public FullWindowedValueCoder<?> fromComponents(List<Coder<?>> components) {
+ return WindowedValue.getFullCoder(
+ components.get(0), (Coder<BoundedWindow>) components.get(1));
+ }
+ };
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/d1e48750/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Coders.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Coders.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Coders.java
index 094f21f..6c2caa9 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Coders.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Coders.java
@@ -23,6 +23,7 @@ import static com.google.common.base.Preconditions.checkArgument;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.BiMap;
import com.google.common.collect.ImmutableBiMap;
+import com.google.common.collect.ImmutableMap;
import com.google.protobuf.Any;
import com.google.protobuf.ByteString;
import com.google.protobuf.BytesValue;
@@ -30,6 +31,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
+import java.util.Map;
import org.apache.beam.sdk.coders.ByteArrayCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.IterableCoder;
@@ -44,7 +46,6 @@ import org.apache.beam.sdk.common.runner.v1.RunnerApi.SdkFunctionSpec;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow.IntervalWindowCoder;
import org.apache.beam.sdk.util.SerializableUtils;
-import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder;
/** Converts to and from Beam Runner API representations of {@link Coder Coders}. */
@@ -67,6 +68,22 @@ public class Coders {
.put(FullWindowedValueCoder.class, "urn:beam:coders:windowed_value:0.1")
.build();
+ @VisibleForTesting
+ static final Map<Class<? extends StructuredCoder>, CoderTranslator<? extends StructuredCoder>>
+ KNOWN_TRANSLATORS =
+ ImmutableMap
+ .<Class<? extends StructuredCoder>, CoderTranslator<? extends StructuredCoder>>
+ builder()
+ .put(ByteArrayCoder.class, CoderTranslators.atomic(ByteArrayCoder.class))
+ .put(VarLongCoder.class, CoderTranslators.atomic(VarLongCoder.class))
+ .put(IntervalWindowCoder.class, CoderTranslators.atomic(IntervalWindowCoder.class))
+ .put(GlobalWindow.Coder.class, CoderTranslators.atomic(GlobalWindow.Coder.class))
+ .put(KvCoder.class, CoderTranslators.kv())
+ .put(IterableCoder.class, CoderTranslators.iterable())
+ .put(LengthPrefixCoder.class, CoderTranslators.lengthPrefix())
+ .put(FullWindowedValueCoder.class, CoderTranslators.fullWindowedValue())
+ .build();
+
public static RunnerApi.MessageWithComponents toProto(Coder<?> coder) throws IOException {
SdkComponents components = SdkComponents.create();
RunnerApi.Coder coderProto = toProto(coder, components);
@@ -94,18 +111,26 @@ public class Coders {
coder,
coder.getClass().getName());
StructuredCoder<?> stdCoder = (StructuredCoder<?>) coder;
- List<String> componentIds = new ArrayList<>();
- for (Coder<?> componentCoder : stdCoder.getComponents()) {
- componentIds.add(components.registerCoder(componentCoder));
- }
+ CoderTranslator translator = KNOWN_TRANSLATORS.get(stdCoder.getClass());
+ List<String> componentIds = registerComponents(coder, translator, components);
return RunnerApi.Coder.newBuilder()
.addAllComponentCoderIds(componentIds)
.setSpec(
SdkFunctionSpec.newBuilder()
- .setSpec(FunctionSpec.newBuilder().setUrn(KNOWN_CODER_URNS.get(coder.getClass()))))
+ .setSpec(
+ FunctionSpec.newBuilder().setUrn(KNOWN_CODER_URNS.get(stdCoder.getClass()))))
.build();
}
+ private static <T extends Coder<?>> List<String> registerComponents(
+ T coder, CoderTranslator<T> translator, SdkComponents components) throws IOException {
+ List<String> componentIds = new ArrayList<>();
+ for (Coder<?> component : translator.getComponents(coder)) {
+ componentIds.add(components.registerCoder(component));
+ }
+ return componentIds;
+ }
+
private static RunnerApi.Coder toCustomCoder(Coder<?> coder) throws IOException {
RunnerApi.Coder.Builder coderBuilder = RunnerApi.Coder.newBuilder();
return coderBuilder
@@ -141,30 +166,14 @@ public class Coders {
Coder<?> innerCoder = fromProto(components.getCodersOrThrow(componentId), components);
coderComponents.add(innerCoder);
}
- switch (coderUrn) {
- case "urn:beam:coders:bytes:0.1":
- return ByteArrayCoder.of();
- case "urn:beam:coders:kv:0.1":
- return KvCoder.of(coderComponents);
- case "urn:beam:coders:varint:0.1":
- return VarLongCoder.of();
- case "urn:beam:coders:interval_window:0.1":
- return IntervalWindowCoder.of();
- case "urn:beam:coders:length_prefix:0.1":
- checkArgument(
- coderComponents.size() == 1, "Expecting 1 component, got %s", coderComponents.size());
- return LengthPrefixCoder.of(coderComponents.get(0));
- case "urn:beam:coders:stream:0.1":
- return IterableCoder.of(coderComponents);
- case "urn:beam:coders:global_window:0.1":
- return GlobalWindow.Coder.INSTANCE;
- case "urn:beam:coders:windowed_value:0.1":
- return WindowedValue.FullWindowedValueCoder.of(coderComponents);
- default:
- throw new IllegalStateException(
- String.format(
- "Unknown coder URN %s. Known URNs: %s", coderUrn, KNOWN_CODER_URNS.values()));
- }
+ Class<? extends StructuredCoder> coderType = KNOWN_CODER_URNS.inverse().get(coderUrn);
+ CoderTranslator<?> translator = KNOWN_TRANSLATORS.get(coderType);
+ checkArgument(
+ translator != null,
+ "Unknown Coder URN %s. Known URNs: %s",
+ coderUrn,
+ KNOWN_CODER_URNS.values());
+ return translator.fromComponents(coderComponents);
}
private static Coder<?> fromCustomCoder(
@@ -179,6 +188,6 @@ public class Coders {
.unpack(BytesValue.class)
.getValue()
.toByteArray(),
- protoCoder.getSpec().getSpec().getUrn());
+ "Custom Coder Bytes");
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/d1e48750/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CodersTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CodersTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CodersTest.java
index ecd0fa5..32a78fa 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CodersTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CodersTest.java
@@ -18,7 +18,6 @@
package org.apache.beam.runners.core.construction;
-import static com.google.common.base.Preconditions.checkState;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.not;
import static org.junit.Assert.assertThat;
@@ -92,11 +91,20 @@ public class CodersTest {
}
Set<Class<? extends StructuredCoder>> missingKnownCoders = new HashSet<>(knownCoderClasses);
missingKnownCoders.removeAll(knownCoderTests);
- checkState(
- missingKnownCoders.isEmpty(),
- "Missing validation of known coder %s in %s",
+ assertThat(
+ String.format(
+ "Missing validation of known coder %s in %s",
+ missingKnownCoders, CodersTest.class.getSimpleName()),
missingKnownCoders,
- CodersTest.class.getSimpleName());
+ Matchers.empty());
+ }
+
+ @Test
+ public void validateCoderTranslators() {
+ assertThat(
+ "Every Known Coder must have a Known Translator",
+ Coders.KNOWN_CODER_URNS.keySet(),
+ equalTo(Coders.KNOWN_TRANSLATORS.keySet()));
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/d1e48750/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/LengthPrefixCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/LengthPrefixCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/LengthPrefixCoder.java
index 0972b1e..6a1f8ed 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/LengthPrefixCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/LengthPrefixCoder.java
@@ -86,6 +86,10 @@ public class LengthPrefixCoder<T> extends StructuredCoder<T> {
return valueCoder.decode(ByteStreams.limit(inStream, size), Context.OUTER);
}
+ public Coder<?> getValueCoder() {
+ return valueCoder;
+ }
+
@Override
public List<? extends Coder<?>> getCoderArguments() {
return ImmutableList.of(valueCoder);