You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2017/01/04 00:36:19 UTC

[3/6] beam git commit: [BEAM-1231] Use well known coder types in Java

[BEAM-1231] Use well known coder types in Java


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/3de4108e
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/3de4108e
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/3de4108e

Branch: refs/heads/master
Commit: 3de4108e3ab96ec0f860d3d40160c7b7e4f5d0f7
Parents: d86db15
Author: Luke Cwik <lc...@google.com>
Authored: Thu Dec 29 12:03:31 2016 -0800
Committer: Luke Cwik <lc...@google.com>
Committed: Tue Jan 3 16:35:37 2017 -0800

----------------------------------------------------------------------
 .../beam/sdk/coders/LengthPrefixCoder.java      | 139 +++++++++++++++++++
 .../org/apache/beam/sdk/util/CoderUtils.java    |  28 ++--
 .../beam/sdk/coders/LengthPrefixCoderTest.java  | 116 ++++++++++++++++
 3 files changed, 270 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/3de4108e/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/LengthPrefixCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/LengthPrefixCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/LengthPrefixCoder.java
new file mode 100644
index 0000000..dd9af32
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/LengthPrefixCoder.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.coders;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.collect.ImmutableList;
+import com.google.common.io.ByteStreams;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.List;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.util.PropertyNames;
+import org.apache.beam.sdk.util.VarInt;
+
+/**
+ * A {@link Coder} which is able to take any existing coder and wrap it such that it is only
+ * invoked in the {@link org.apache.beam.sdk.coders.Coder.Context#OUTER outer context}. The data
+ * representing the element is prefixed with a length using a variable integer encoding.
+ *
+ * @param <T> the type of the values being transcoded
+ */
+public class LengthPrefixCoder<T> extends StandardCoder<T> {
+
+  public static <T> LengthPrefixCoder<T> of(
+      Coder<T> valueCoder) {
+    checkNotNull(valueCoder, "Coder not expected to be null");
+    return new LengthPrefixCoder<>(valueCoder);
+  }
+
+  @JsonCreator
+  public static LengthPrefixCoder<?> of(
+      @JsonProperty(PropertyNames.COMPONENT_ENCODINGS)
+      List<Coder<?>> components) {
+    checkArgument(components.size() == 1,
+                  "Expecting 1 components, got " + components.size());
+    return of(components.get(0));
+  }
+
+  /////////////////////////////////////////////////////////////////////////////
+
+  private final Coder<T> valueCoder;
+
+  private LengthPrefixCoder(Coder<T> valueCoder) {
+    this.valueCoder = valueCoder;
+  }
+
+  @Override
+  public void encode(T value, OutputStream outStream, Context context)
+      throws CoderException, IOException {
+    ByteArrayOutputStream bos = new ByteArrayOutputStream();
+    valueCoder.encode(value, bos, Context.OUTER);
+    VarInt.encode(bos.size(), outStream);
+    bos.writeTo(outStream);
+  }
+
+  @Override
+  public T decode(InputStream inStream, Context context) throws CoderException, IOException {
+    long size = VarInt.decodeLong(inStream);
+    return valueCoder.decode(ByteStreams.limit(inStream, size), Context.OUTER);
+  }
+
+  @Override
+  public List<? extends Coder<?>> getCoderArguments() {
+    return ImmutableList.of(valueCoder);
+  }
+
+  /**
+   * {@code LengthPrefixCoder} is deterministic if the nested {@code Coder} is.
+   *
+   * {@inheritDoc}
+   */
+  @Override
+  public void verifyDeterministic() throws NonDeterministicException {
+    valueCoder.verifyDeterministic();
+  }
+
+  /**
+   * {@code LengthPrefixCoder} is consistent with equals if the nested {@code Coder} is.
+   *
+   * {@inheritDoc}
+   */
+  @Override
+  public boolean consistentWithEquals() {
+    return valueCoder.consistentWithEquals();
+  }
+
+  /**
+   * Overridden to short-circuit the default {@code StandardCoder} behavior of encoding and
+   * counting the bytes. The size is known to be the size of the value plus the number of bytes
+   * required to prefix the length.
+   *
+   * {@inheritDoc}
+   */
+  @Override
+  protected long getEncodedElementByteSize(T value, Context context) throws Exception {
+    if (valueCoder instanceof StandardCoder) {
+      // If valueCoder is a StandardCoder then we can ask it directly for the encoded size of
+      // the value, adding the number of bytes to represent the length.
+      long valueSize = ((StandardCoder<T>) valueCoder).getEncodedElementByteSize(
+          value, Context.OUTER);
+      return VarInt.getLength(valueSize) + valueSize;
+    }
+
+    // If value is not a StandardCoder then fall back to the default StandardCoder behavior
+    // of encoding and counting the bytes. The encoding will include the null indicator byte.
+    return super.getEncodedElementByteSize(value, context);
+  }
+
+  /**
+   * {@code LengthPrefixCoder} is cheap if {@code valueCoder} is cheap.
+   *
+   * {@inheritDoc}
+   */
+  @Override
+  public boolean isRegisterByteSizeObserverCheap(@Nullable T value, Context context) {
+    return valueCoder.isRegisterByteSizeObserverCheap(value, Context.OUTER);
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/3de4108e/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CoderUtils.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CoderUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CoderUtils.java
index 36bf789..7b93b59 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CoderUtils.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CoderUtils.java
@@ -31,6 +31,7 @@ import com.fasterxml.jackson.databind.module.SimpleModule;
 import com.fasterxml.jackson.databind.type.TypeFactory;
 import com.google.api.client.util.Base64;
 import com.google.common.base.Throwables;
+import com.google.common.collect.ImmutableMap;
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
@@ -39,10 +40,13 @@ import java.io.OutputStream;
 import java.lang.ref.SoftReference;
 import java.lang.reflect.ParameterizedType;
 import java.lang.reflect.TypeVariable;
+import java.util.Map;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
 import org.apache.beam.sdk.coders.IterableCoder;
 import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.LengthPrefixCoder;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.values.TypeDescriptor;
 
 /**
@@ -51,15 +55,15 @@ import org.apache.beam.sdk.values.TypeDescriptor;
 public final class CoderUtils {
   private CoderUtils() {}  // Non-instantiable
 
-  /**
-   * Coder class-name alias for a key-value type.
-   */
-  public static final String KIND_PAIR = "kind:pair";
-
-  /**
-   * Coder class-name alias for a stream type.
-   */
-  public static final String KIND_STREAM = "kind:stream";
+  /** A mapping from well known coder types to their implementing classes. */
+  private static final Map<String, Class<?>> WELL_KNOWN_CODER_TYPES =
+      ImmutableMap.<String, Class<?>>builder()
+      .put("kind:pair", KvCoder.class)
+      .put("kind:stream", IterableCoder.class)
+      .put("kind:global_window", GlobalWindow.Coder.class)
+      .put("kind:length_prefix", LengthPrefixCoder.class)
+      .put("kind:windowed_value", WindowedValue.FullWindowedValueCoder.class)
+      .build();
 
   private static ThreadLocal<SoftReference<ExposedByteArrayOutputStream>>
       threadLocalOutputStream = new ThreadLocal<>();
@@ -266,10 +270,8 @@ public final class CoderUtils {
             return Class.forName(id);
           }
 
-          if (id.equals(KIND_STREAM)) {
-            return IterableCoder.class;
-          } else if (id.equals(KIND_PAIR)) {
-            return KvCoder.class;
+          if (WELL_KNOWN_CODER_TYPES.containsKey(id)) {
+            return WELL_KNOWN_CODER_TYPES.get(id);
           }
 
           // Otherwise, see if the ID is the name of a class in

http://git-wip-us.apache.org/repos/asf/beam/blob/3de4108e/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/LengthPrefixCoderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/LengthPrefixCoderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/LengthPrefixCoderTest.java
new file mode 100644
index 0000000..e31c561
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/LengthPrefixCoderTest.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.coders;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import com.google.common.collect.ImmutableList;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.beam.sdk.testing.CoderProperties;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link LengthPrefixCoder}. */
+@RunWith(JUnit4.class)
+public class LengthPrefixCoderTest {
+  private static final StandardCoder<byte[]> TEST_CODER = LengthPrefixCoder.of(ByteArrayCoder.of());
+
+  private static final List<byte[]> TEST_VALUES = Arrays.asList(
+    new byte[]{ 0xa, 0xb, 0xc },
+    new byte[]{ 0xd, 0x3 },
+    new byte[]{ 0xd, 0xe },
+    new byte[]{ });
+
+  @Test
+  public void testCoderSerializable() throws Exception {
+    CoderProperties.coderSerializable(TEST_CODER);
+  }
+
+  @Test
+  public void testEncodedSize() throws Exception {
+    assertEquals(4L,
+        TEST_CODER.getEncodedElementByteSize(TEST_VALUES.get(0), Coder.Context.NESTED));
+    assertEquals(4L,
+        TEST_CODER.getEncodedElementByteSize(TEST_VALUES.get(0), Coder.Context.OUTER));
+  }
+
+  @Test
+  public void testObserverIsCheap() throws Exception {
+    NullableCoder<Double> coder = NullableCoder.of(DoubleCoder.of());
+    assertTrue(coder.isRegisterByteSizeObserverCheap(5.0, Coder.Context.OUTER));
+  }
+
+  @Test
+  public void testObserverIsNotCheap() throws Exception {
+    NullableCoder<List<String>> coder = NullableCoder.of(ListCoder.of(StringUtf8Coder.of()));
+    assertFalse(coder.isRegisterByteSizeObserverCheap(
+        ImmutableList.of("hi", "test"), Coder.Context.OUTER));
+  }
+
+  @Test
+  public void testDecodeEncodeEquals() throws Exception {
+    for (byte[] value : TEST_VALUES) {
+      CoderProperties.coderDecodeEncodeEqual(TEST_CODER, value);
+    }
+  }
+
+  @Test
+  public void testRegisterByteSizeObserver() throws Exception {
+    CoderProperties.testByteCount(TEST_CODER, Coder.Context.OUTER,
+                                   new byte[][]{{ 0xa, 0xb, 0xc }});
+
+    CoderProperties.testByteCount(TEST_CODER, Coder.Context.NESTED,
+                                   new byte[][]{{ 0xa, 0xb, 0xc }, {}, {}, { 0xd, 0xe }, {}});
+  }
+
+  @Test
+  public void testStructuralValueConsistentWithEquals() throws Exception {
+    for (byte[] value1 : TEST_VALUES) {
+      for (byte[] value2 : TEST_VALUES) {
+        CoderProperties.structuralValueConsistentWithEquals(TEST_CODER, value1, value2);
+      }
+    }
+  }
+
+  // If this changes, it implies the binary format has changed.
+  private static final String EXPECTED_ENCODING_ID = "";
+
+  @Test
+  public void testEncodingId() throws Exception {
+    CoderProperties.coderHasEncodingId(TEST_CODER, EXPECTED_ENCODING_ID);
+  }
+
+  /**
+   * Generated data to check that the wire format has not changed. To regenerate, see
+   * {@link org.apache.beam.sdk.coders.PrintBase64Encodings}.
+   */
+  private static final List<String> TEST_ENCODINGS = Arrays.asList(
+      "AwoLDA",
+      "Ag0D",
+      "Ag0O",
+      "AA");
+
+  @Test
+  public void testWireFormatEncode() throws Exception {
+    CoderProperties.coderEncodesBase64(TEST_CODER, TEST_VALUES, TEST_ENCODINGS);
+  }
+}