You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2017/01/04 00:36:19 UTC
[3/6] beam git commit: [BEAM-1231] Use well known coder types in Java
[BEAM-1231] Use well known coder types in Java
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/3de4108e
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/3de4108e
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/3de4108e
Branch: refs/heads/master
Commit: 3de4108e3ab96ec0f860d3d40160c7b7e4f5d0f7
Parents: d86db15
Author: Luke Cwik <lc...@google.com>
Authored: Thu Dec 29 12:03:31 2016 -0800
Committer: Luke Cwik <lc...@google.com>
Committed: Tue Jan 3 16:35:37 2017 -0800
----------------------------------------------------------------------
.../beam/sdk/coders/LengthPrefixCoder.java | 139 +++++++++++++++++++
.../org/apache/beam/sdk/util/CoderUtils.java | 28 ++--
.../beam/sdk/coders/LengthPrefixCoderTest.java | 116 ++++++++++++++++
3 files changed, 270 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/3de4108e/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/LengthPrefixCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/LengthPrefixCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/LengthPrefixCoder.java
new file mode 100644
index 0000000..dd9af32
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/LengthPrefixCoder.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.coders;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.collect.ImmutableList;
+import com.google.common.io.ByteStreams;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.List;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.util.PropertyNames;
+import org.apache.beam.sdk.util.VarInt;
+
+/**
+ * A {@link Coder} which is able to take any existing coder and wrap it such that it is only
+ * invoked in the {@link org.apache.beam.sdk.coders.Coder.Context#OUTER outer context}. The data
+ * representing the element is prefixed with a length using a variable integer encoding.
+ *
+ * @param <T> the type of the values being transcoded
+ */
+public class LengthPrefixCoder<T> extends StandardCoder<T> {
+
+ public static <T> LengthPrefixCoder<T> of(
+ Coder<T> valueCoder) {
+ checkNotNull(valueCoder, "Coder not expected to be null");
+ return new LengthPrefixCoder<>(valueCoder);
+ }
+
+ @JsonCreator
+ public static LengthPrefixCoder<?> of(
+ @JsonProperty(PropertyNames.COMPONENT_ENCODINGS)
+ List<Coder<?>> components) {
+ checkArgument(components.size() == 1,
+ "Expecting 1 components, got " + components.size());
+ return of(components.get(0));
+ }
+
+ /////////////////////////////////////////////////////////////////////////////
+
+ private final Coder<T> valueCoder;
+
+ private LengthPrefixCoder(Coder<T> valueCoder) {
+ this.valueCoder = valueCoder;
+ }
+
+ @Override
+ public void encode(T value, OutputStream outStream, Context context)
+ throws CoderException, IOException {
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ valueCoder.encode(value, bos, Context.OUTER);
+ VarInt.encode(bos.size(), outStream);
+ bos.writeTo(outStream);
+ }
+
+ @Override
+ public T decode(InputStream inStream, Context context) throws CoderException, IOException {
+ long size = VarInt.decodeLong(inStream);
+ return valueCoder.decode(ByteStreams.limit(inStream, size), Context.OUTER);
+ }
+
+ @Override
+ public List<? extends Coder<?>> getCoderArguments() {
+ return ImmutableList.of(valueCoder);
+ }
+
+ /**
+ * {@code LengthPrefixCoder} is deterministic if the nested {@code Coder} is.
+ *
+ * {@inheritDoc}
+ */
+ @Override
+ public void verifyDeterministic() throws NonDeterministicException {
+ valueCoder.verifyDeterministic();
+ }
+
+ /**
+ * {@code LengthPrefixCoder} is consistent with equals if the nested {@code Coder} is.
+ *
+ * {@inheritDoc}
+ */
+ @Override
+ public boolean consistentWithEquals() {
+ return valueCoder.consistentWithEquals();
+ }
+
+ /**
+ * Overridden to short-circuit the default {@code StandardCoder} behavior of encoding and
+ * counting the bytes. The size is known to be the size of the value plus the number of bytes
+ * required to prefix the length.
+ *
+ * {@inheritDoc}
+ */
+ @Override
+ protected long getEncodedElementByteSize(T value, Context context) throws Exception {
+ if (valueCoder instanceof StandardCoder) {
+ // If valueCoder is a StandardCoder then we can ask it directly for the encoded size of
+ // the value, adding the number of bytes to represent the length.
+ long valueSize = ((StandardCoder<T>) valueCoder).getEncodedElementByteSize(
+ value, Context.OUTER);
+ return VarInt.getLength(valueSize) + valueSize;
+ }
+
+ // If value is not a StandardCoder then fall back to the default StandardCoder behavior
+ // of encoding and counting the bytes. The encoding will include the null indicator byte.
+ return super.getEncodedElementByteSize(value, context);
+ }
+
+ /**
+ * {@code LengthPrefixCoder} is cheap if {@code valueCoder} is cheap.
+ *
+ * {@inheritDoc}
+ */
+ @Override
+ public boolean isRegisterByteSizeObserverCheap(@Nullable T value, Context context) {
+ return valueCoder.isRegisterByteSizeObserverCheap(value, Context.OUTER);
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/3de4108e/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CoderUtils.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CoderUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CoderUtils.java
index 36bf789..7b93b59 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CoderUtils.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CoderUtils.java
@@ -31,6 +31,7 @@ import com.fasterxml.jackson.databind.module.SimpleModule;
import com.fasterxml.jackson.databind.type.TypeFactory;
import com.google.api.client.util.Base64;
import com.google.common.base.Throwables;
+import com.google.common.collect.ImmutableMap;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
@@ -39,10 +40,13 @@ import java.io.OutputStream;
import java.lang.ref.SoftReference;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.TypeVariable;
+import java.util.Map;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.IterableCoder;
import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.LengthPrefixCoder;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.values.TypeDescriptor;
/**
@@ -51,15 +55,15 @@ import org.apache.beam.sdk.values.TypeDescriptor;
public final class CoderUtils {
private CoderUtils() {} // Non-instantiable
- /**
- * Coder class-name alias for a key-value type.
- */
- public static final String KIND_PAIR = "kind:pair";
-
- /**
- * Coder class-name alias for a stream type.
- */
- public static final String KIND_STREAM = "kind:stream";
+ /** A mapping from well known coder types to their implementing classes. */
+ private static final Map<String, Class<?>> WELL_KNOWN_CODER_TYPES =
+ ImmutableMap.<String, Class<?>>builder()
+ .put("kind:pair", KvCoder.class)
+ .put("kind:stream", IterableCoder.class)
+ .put("kind:global_window", GlobalWindow.Coder.class)
+ .put("kind:length_prefix", LengthPrefixCoder.class)
+ .put("kind:windowed_value", WindowedValue.FullWindowedValueCoder.class)
+ .build();
private static ThreadLocal<SoftReference<ExposedByteArrayOutputStream>>
threadLocalOutputStream = new ThreadLocal<>();
@@ -266,10 +270,8 @@ public final class CoderUtils {
return Class.forName(id);
}
- if (id.equals(KIND_STREAM)) {
- return IterableCoder.class;
- } else if (id.equals(KIND_PAIR)) {
- return KvCoder.class;
+ if (WELL_KNOWN_CODER_TYPES.containsKey(id)) {
+ return WELL_KNOWN_CODER_TYPES.get(id);
}
// Otherwise, see if the ID is the name of a class in
http://git-wip-us.apache.org/repos/asf/beam/blob/3de4108e/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/LengthPrefixCoderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/LengthPrefixCoderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/LengthPrefixCoderTest.java
new file mode 100644
index 0000000..e31c561
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/LengthPrefixCoderTest.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.coders;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import com.google.common.collect.ImmutableList;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.beam.sdk.testing.CoderProperties;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link LengthPrefixCoder}. */
+@RunWith(JUnit4.class)
+public class LengthPrefixCoderTest {
+ private static final StandardCoder<byte[]> TEST_CODER = LengthPrefixCoder.of(ByteArrayCoder.of());
+
+ private static final List<byte[]> TEST_VALUES = Arrays.asList(
+ new byte[]{ 0xa, 0xb, 0xc },
+ new byte[]{ 0xd, 0x3 },
+ new byte[]{ 0xd, 0xe },
+ new byte[]{ });
+
+ @Test
+ public void testCoderSerializable() throws Exception {
+ CoderProperties.coderSerializable(TEST_CODER);
+ }
+
+ @Test
+ public void testEncodedSize() throws Exception {
+ assertEquals(4L,
+ TEST_CODER.getEncodedElementByteSize(TEST_VALUES.get(0), Coder.Context.NESTED));
+ assertEquals(4L,
+ TEST_CODER.getEncodedElementByteSize(TEST_VALUES.get(0), Coder.Context.OUTER));
+ }
+
+ @Test
+ public void testObserverIsCheap() throws Exception {
+ NullableCoder<Double> coder = NullableCoder.of(DoubleCoder.of());
+ assertTrue(coder.isRegisterByteSizeObserverCheap(5.0, Coder.Context.OUTER));
+ }
+
+ @Test
+ public void testObserverIsNotCheap() throws Exception {
+ NullableCoder<List<String>> coder = NullableCoder.of(ListCoder.of(StringUtf8Coder.of()));
+ assertFalse(coder.isRegisterByteSizeObserverCheap(
+ ImmutableList.of("hi", "test"), Coder.Context.OUTER));
+ }
+
+ @Test
+ public void testDecodeEncodeEquals() throws Exception {
+ for (byte[] value : TEST_VALUES) {
+ CoderProperties.coderDecodeEncodeEqual(TEST_CODER, value);
+ }
+ }
+
+ @Test
+ public void testRegisterByteSizeObserver() throws Exception {
+ CoderProperties.testByteCount(TEST_CODER, Coder.Context.OUTER,
+ new byte[][]{{ 0xa, 0xb, 0xc }});
+
+ CoderProperties.testByteCount(TEST_CODER, Coder.Context.NESTED,
+ new byte[][]{{ 0xa, 0xb, 0xc }, {}, {}, { 0xd, 0xe }, {}});
+ }
+
+ @Test
+ public void testStructuralValueConsistentWithEquals() throws Exception {
+ for (byte[] value1 : TEST_VALUES) {
+ for (byte[] value2 : TEST_VALUES) {
+ CoderProperties.structuralValueConsistentWithEquals(TEST_CODER, value1, value2);
+ }
+ }
+ }
+
+ // If this changes, it implies the binary format has changed.
+ private static final String EXPECTED_ENCODING_ID = "";
+
+ @Test
+ public void testEncodingId() throws Exception {
+ CoderProperties.coderHasEncodingId(TEST_CODER, EXPECTED_ENCODING_ID);
+ }
+
+ /**
+ * Generated data to check that the wire format has not changed. To regenerate, see
+ * {@link org.apache.beam.sdk.coders.PrintBase64Encodings}.
+ */
+ private static final List<String> TEST_ENCODINGS = Arrays.asList(
+ "AwoLDA",
+ "Ag0D",
+ "Ag0O",
+ "AA");
+
+ @Test
+ public void testWireFormatEncode() throws Exception {
+ CoderProperties.coderEncodesBase64(TEST_CODER, TEST_VALUES, TEST_ENCODINGS);
+ }
+}