You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tg...@apache.org on 2017/05/01 18:18:59 UTC

[1/2] beam git commit: This closes #2689

Repository: beam
Updated Branches:
  refs/heads/master fe2a32000 -> 23609efd8


This closes #2689


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/23609efd
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/23609efd
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/23609efd

Branch: refs/heads/master
Commit: 23609efd83a662717f7a49821eb67ac870470bb2
Parents: fe2a320 d1e4875
Author: Thomas Groh <tg...@google.com>
Authored: Mon May 1 11:18:36 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Mon May 1 11:18:36 2017 -0700

----------------------------------------------------------------------
 .../core/construction/CoderTranslator.java      |  44 ++++++++
 .../core/construction/CoderTranslators.java     | 107 +++++++++++++++++++
 .../beam/runners/core/construction/Coders.java  |  71 ++++++------
 .../runners/core/construction/CodersTest.java   |  18 +++-
 .../beam/sdk/coders/LengthPrefixCoder.java      |   4 +
 5 files changed, 208 insertions(+), 36 deletions(-)
----------------------------------------------------------------------



[2/2] beam git commit: Add A CoderTranslator Interface

Posted by tg...@apache.org.
Add A CoderTranslator Interface

This will enable the removal of StandardCoder.getComponents


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d1e48750
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d1e48750
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d1e48750

Branch: refs/heads/master
Commit: d1e487506373fb7078650e42a307f117023316ee
Parents: fe2a320
Author: Thomas Groh <tg...@google.com>
Authored: Tue Apr 25 17:24:27 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Mon May 1 11:18:36 2017 -0700

----------------------------------------------------------------------
 .../core/construction/CoderTranslator.java      |  44 ++++++++
 .../core/construction/CoderTranslators.java     | 107 +++++++++++++++++++
 .../beam/runners/core/construction/Coders.java  |  71 ++++++------
 .../runners/core/construction/CodersTest.java   |  18 +++-
 .../beam/sdk/coders/LengthPrefixCoder.java      |   4 +
 5 files changed, 208 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/d1e48750/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CoderTranslator.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CoderTranslator.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CoderTranslator.java
new file mode 100644
index 0000000..26d8c1d
--- /dev/null
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CoderTranslator.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.core.construction;
+
+import java.util.List;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.coders.Coder;
+
+/**
+ * An interface that translates coders to components and back.
+ *
+ * <p>This interface is highly experimental, and incomplete. Coders must in the general case have
+ * the capability to encode an additional payload, which is not currently supported. This exists as
+ * a temporary measure.
+ */
+@Experimental(Kind.CORE_RUNNERS_ONLY)
+public interface CoderTranslator<T extends Coder<?>> {
+  /**
+  * Extract all component {@link Coder coders} within a coder.
+   */
+  List<? extends Coder<?>> getComponents(T from);
+
+  /**
+   * Create a {@link Coder} from its component {@link Coder coders}.
+   */
+  T fromComponents(List<Coder<?>> components);
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/d1e48750/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CoderTranslators.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CoderTranslators.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CoderTranslators.java
new file mode 100644
index 0000000..989a8b6
--- /dev/null
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CoderTranslators.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.core.construction;
+
+import com.google.common.collect.ImmutableList;
+import java.util.Collections;
+import java.util.List;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.IterableCoder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.LengthPrefixCoder;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.InstanceBuilder;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder;
+
+/** {@link CoderTranslator} implementations for known coder types. */
+class CoderTranslators {
+  private CoderTranslators() {}
+
+  static <T extends Coder<?>> CoderTranslator<T> atomic(final Class<T> clazz) {
+    return new CoderTranslator<T>() {
+      @Override
+      public List<? extends Coder<?>> getComponents(T from) {
+        return Collections.emptyList();
+      }
+
+      @Override
+      public T fromComponents(List<Coder<?>> components) {
+        return InstanceBuilder.ofType(clazz).build();
+      }
+    };
+  }
+
+  static CoderTranslator<KvCoder<?, ?>> kv() {
+    return new CoderTranslator<KvCoder<?, ?>>() {
+      @Override
+      public List<? extends Coder<?>> getComponents(KvCoder<?, ?> from) {
+        return ImmutableList.of(from.getKeyCoder(), from.getValueCoder());
+      }
+
+      @Override
+      public KvCoder<?, ?> fromComponents(List<Coder<?>> components) {
+        return KvCoder.of(components.get(0), components.get(1));
+      }
+    };
+  }
+
+  static CoderTranslator<IterableCoder<?>> iterable() {
+    return new CoderTranslator<IterableCoder<?>>() {
+      @Override
+      public List<? extends Coder<?>> getComponents(IterableCoder<?> from) {
+        return Collections.singletonList(from.getElemCoder());
+      }
+
+      @Override
+      public IterableCoder<?> fromComponents(List<Coder<?>> components) {
+        return IterableCoder.of(components.get(0));
+      }
+    };
+  }
+
+  static CoderTranslator<LengthPrefixCoder<?>> lengthPrefix() {
+    return new CoderTranslator<LengthPrefixCoder<?>>() {
+      @Override
+      public List<? extends Coder<?>> getComponents(LengthPrefixCoder<?> from) {
+        return Collections.singletonList(from.getValueCoder());
+      }
+
+      @Override
+      public LengthPrefixCoder<?> fromComponents(List<Coder<?>> components) {
+        return LengthPrefixCoder.of(components.get(0));
+      }
+    };
+  }
+
+  static CoderTranslator<FullWindowedValueCoder<?>> fullWindowedValue() {
+    return new CoderTranslator<FullWindowedValueCoder<?>>() {
+      @Override
+      public List<? extends Coder<?>> getComponents(FullWindowedValueCoder<?> from) {
+        return ImmutableList.of(from.getValueCoder(), from.getWindowCoder());
+      }
+
+      @Override
+      public FullWindowedValueCoder<?> fromComponents(List<Coder<?>> components) {
+        return WindowedValue.getFullCoder(
+            components.get(0), (Coder<BoundedWindow>) components.get(1));
+      }
+    };
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/d1e48750/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Coders.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Coders.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Coders.java
index 094f21f..6c2caa9 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Coders.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Coders.java
@@ -23,6 +23,7 @@ import static com.google.common.base.Preconditions.checkArgument;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.BiMap;
 import com.google.common.collect.ImmutableBiMap;
+import com.google.common.collect.ImmutableMap;
 import com.google.protobuf.Any;
 import com.google.protobuf.ByteString;
 import com.google.protobuf.BytesValue;
@@ -30,6 +31,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Map;
 import org.apache.beam.sdk.coders.ByteArrayCoder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.IterableCoder;
@@ -44,7 +46,6 @@ import org.apache.beam.sdk.common.runner.v1.RunnerApi.SdkFunctionSpec;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.IntervalWindow.IntervalWindowCoder;
 import org.apache.beam.sdk.util.SerializableUtils;
-import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder;
 
 /** Converts to and from Beam Runner API representations of {@link Coder Coders}. */
@@ -67,6 +68,22 @@ public class Coders {
           .put(FullWindowedValueCoder.class, "urn:beam:coders:windowed_value:0.1")
           .build();
 
+  @VisibleForTesting
+  static final Map<Class<? extends StructuredCoder>, CoderTranslator<? extends StructuredCoder>>
+      KNOWN_TRANSLATORS =
+          ImmutableMap
+              .<Class<? extends StructuredCoder>, CoderTranslator<? extends StructuredCoder>>
+                  builder()
+              .put(ByteArrayCoder.class, CoderTranslators.atomic(ByteArrayCoder.class))
+              .put(VarLongCoder.class, CoderTranslators.atomic(VarLongCoder.class))
+              .put(IntervalWindowCoder.class, CoderTranslators.atomic(IntervalWindowCoder.class))
+              .put(GlobalWindow.Coder.class, CoderTranslators.atomic(GlobalWindow.Coder.class))
+              .put(KvCoder.class, CoderTranslators.kv())
+              .put(IterableCoder.class, CoderTranslators.iterable())
+              .put(LengthPrefixCoder.class, CoderTranslators.lengthPrefix())
+              .put(FullWindowedValueCoder.class, CoderTranslators.fullWindowedValue())
+              .build();
+
   public static RunnerApi.MessageWithComponents toProto(Coder<?> coder) throws IOException {
     SdkComponents components = SdkComponents.create();
     RunnerApi.Coder coderProto = toProto(coder, components);
@@ -94,18 +111,26 @@ public class Coders {
         coder,
         coder.getClass().getName());
     StructuredCoder<?> stdCoder = (StructuredCoder<?>) coder;
-    List<String> componentIds = new ArrayList<>();
-    for (Coder<?> componentCoder : stdCoder.getComponents()) {
-      componentIds.add(components.registerCoder(componentCoder));
-    }
+    CoderTranslator translator = KNOWN_TRANSLATORS.get(stdCoder.getClass());
+    List<String> componentIds = registerComponents(coder, translator, components);
     return RunnerApi.Coder.newBuilder()
         .addAllComponentCoderIds(componentIds)
         .setSpec(
             SdkFunctionSpec.newBuilder()
-                .setSpec(FunctionSpec.newBuilder().setUrn(KNOWN_CODER_URNS.get(coder.getClass()))))
+                .setSpec(
+                    FunctionSpec.newBuilder().setUrn(KNOWN_CODER_URNS.get(stdCoder.getClass()))))
         .build();
   }
 
+  private static <T extends Coder<?>> List<String> registerComponents(
+      T coder, CoderTranslator<T> translator, SdkComponents components) throws IOException {
+    List<String> componentIds = new ArrayList<>();
+    for (Coder<?> component : translator.getComponents(coder)) {
+      componentIds.add(components.registerCoder(component));
+    }
+    return componentIds;
+  }
+
   private static RunnerApi.Coder toCustomCoder(Coder<?> coder) throws IOException {
     RunnerApi.Coder.Builder coderBuilder = RunnerApi.Coder.newBuilder();
     return coderBuilder
@@ -141,30 +166,14 @@ public class Coders {
       Coder<?> innerCoder = fromProto(components.getCodersOrThrow(componentId), components);
       coderComponents.add(innerCoder);
     }
-    switch (coderUrn) {
-      case "urn:beam:coders:bytes:0.1":
-        return ByteArrayCoder.of();
-      case "urn:beam:coders:kv:0.1":
-        return KvCoder.of(coderComponents);
-      case "urn:beam:coders:varint:0.1":
-        return VarLongCoder.of();
-      case "urn:beam:coders:interval_window:0.1":
-        return IntervalWindowCoder.of();
-      case "urn:beam:coders:length_prefix:0.1":
-        checkArgument(
-            coderComponents.size() == 1, "Expecting 1 component, got %s", coderComponents.size());
-        return LengthPrefixCoder.of(coderComponents.get(0));
-      case "urn:beam:coders:stream:0.1":
-        return IterableCoder.of(coderComponents);
-      case "urn:beam:coders:global_window:0.1":
-        return GlobalWindow.Coder.INSTANCE;
-      case "urn:beam:coders:windowed_value:0.1":
-        return WindowedValue.FullWindowedValueCoder.of(coderComponents);
-      default:
-        throw new IllegalStateException(
-            String.format(
-                "Unknown coder URN %s. Known URNs: %s", coderUrn, KNOWN_CODER_URNS.values()));
-    }
+    Class<? extends StructuredCoder> coderType = KNOWN_CODER_URNS.inverse().get(coderUrn);
+    CoderTranslator<?> translator = KNOWN_TRANSLATORS.get(coderType);
+    checkArgument(
+        translator != null,
+        "Unknown Coder URN %s. Known URNs: %s",
+        coderUrn,
+        KNOWN_CODER_URNS.values());
+    return translator.fromComponents(coderComponents);
   }
 
   private static Coder<?> fromCustomCoder(
@@ -179,6 +188,6 @@ public class Coders {
                 .unpack(BytesValue.class)
                 .getValue()
                 .toByteArray(),
-            protoCoder.getSpec().getSpec().getUrn());
+            "Custom Coder Bytes");
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/d1e48750/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CodersTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CodersTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CodersTest.java
index ecd0fa5..32a78fa 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CodersTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CodersTest.java
@@ -18,7 +18,6 @@
 
 package org.apache.beam.runners.core.construction;
 
-import static com.google.common.base.Preconditions.checkState;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.not;
 import static org.junit.Assert.assertThat;
@@ -92,11 +91,20 @@ public class CodersTest {
       }
       Set<Class<? extends StructuredCoder>> missingKnownCoders = new HashSet<>(knownCoderClasses);
       missingKnownCoders.removeAll(knownCoderTests);
-      checkState(
-          missingKnownCoders.isEmpty(),
-          "Missing validation of known coder %s in %s",
+      assertThat(
+          String.format(
+              "Missing validation of known coder %s in %s",
+              missingKnownCoders, CodersTest.class.getSimpleName()),
           missingKnownCoders,
-          CodersTest.class.getSimpleName());
+          Matchers.empty());
+    }
+
+    @Test
+    public void validateCoderTranslators() {
+      assertThat(
+          "Every Known Coder must have a Known Translator",
+          Coders.KNOWN_CODER_URNS.keySet(),
+          equalTo(Coders.KNOWN_TRANSLATORS.keySet()));
     }
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/d1e48750/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/LengthPrefixCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/LengthPrefixCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/LengthPrefixCoder.java
index 0972b1e..6a1f8ed 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/LengthPrefixCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/LengthPrefixCoder.java
@@ -86,6 +86,10 @@ public class LengthPrefixCoder<T> extends StructuredCoder<T> {
     return valueCoder.decode(ByteStreams.limit(inStream, size), Context.OUTER);
   }
 
+  public Coder<?> getValueCoder() {
+    return valueCoder;
+  }
+
   @Override
   public List<? extends Coder<?>> getCoderArguments() {
     return ImmutableList.of(valueCoder);