You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2017/01/04 00:36:21 UTC
[5/6] beam git commit: Use kind:* for certain well known coder types.
Use kind:* for certain well known coder types.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/6947d21b
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/6947d21b
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/6947d21b
Branch: refs/heads/master
Commit: 6947d21bc1876c408a23b5e98ef813bb59d5b8d1
Parents: 1b76d3d
Author: Luke Cwik <lc...@google.com>
Authored: Thu Dec 29 14:20:04 2016 -0800
Committer: Luke Cwik <lc...@google.com>
Committed: Tue Jan 3 16:35:37 2017 -0800
----------------------------------------------------------------------
.../apache/beam/sdk/coders/IterableCoder.java | 2 +-
.../org/apache/beam/sdk/coders/KvCoder.java | 2 +-
.../beam/sdk/coders/LengthPrefixCoder.java | 8 ++-
.../apache/beam/sdk/coders/StandardCoder.java | 23 ++++++-
.../sdk/transforms/windowing/GlobalWindow.java | 6 ++
.../org/apache/beam/sdk/util/WindowedValue.java | 2 +-
.../beam/sdk/coders/IterableCoderTest.java | 12 +++-
.../org/apache/beam/sdk/coders/KvCoderTest.java | 11 +++-
.../beam/sdk/coders/LengthPrefixCoderTest.java | 31 ++++++----
.../transforms/windowing/GlobalWindowTest.java | 64 ++++++++++++++++++++
.../apache/beam/sdk/util/WindowedValueTest.java | 11 ++++
11 files changed, 150 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/6947d21b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/IterableCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/IterableCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/IterableCoder.java
index cc6b970..a1f6fa3 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/IterableCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/IterableCoder.java
@@ -69,7 +69,7 @@ public class IterableCoder<T> extends IterableLikeCoder<T, Iterable<T>> {
@Override
protected CloudObject initializeCloudObject() {
- CloudObject result = CloudObject.forClass(getClass());
+ CloudObject result = CloudObject.forClassName("kind:stream");
addBoolean(result, PropertyNames.IS_STREAM_LIKE, true);
return result;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/6947d21b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/KvCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/KvCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/KvCoder.java
index 1e70a30..c9d05fc 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/KvCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/KvCoder.java
@@ -123,7 +123,7 @@ public class KvCoder<K, V> extends StandardCoder<KV<K, V>> {
@Override
protected CloudObject initializeCloudObject() {
- CloudObject result = CloudObject.forClass(getClass());
+ CloudObject result = CloudObject.forClassName("kind:pair");
addBoolean(result, PropertyNames.IS_PAIR_LIKE, true);
return result;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/6947d21b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/LengthPrefixCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/LengthPrefixCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/LengthPrefixCoder.java
index 7319200..d123a38 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/LengthPrefixCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/LengthPrefixCoder.java
@@ -30,6 +30,7 @@ import java.io.InputStream;
import java.io.OutputStream;
import java.util.List;
import javax.annotation.Nullable;
+import org.apache.beam.sdk.util.CloudObject;
import org.apache.beam.sdk.util.PropertyNames;
import org.apache.beam.sdk.util.VarInt;
@@ -65,7 +66,10 @@ public class LengthPrefixCoder<T> extends StandardCoder<T> {
this.valueCoder = valueCoder;
}
-
+ @Override
+ protected CloudObject initializeCloudObject() {
+ return CloudObject.forClassName("kind:length_prefix");
+ }
@Override
public void encode(T value, OutputStream outStream, Context context)
@@ -125,7 +129,7 @@ public class LengthPrefixCoder<T> extends StandardCoder<T> {
}
// If value is not a StandardCoder then fall back to the default StandardCoder behavior
- // of encoding and counting the bytes. The encoding will include the null indicator byte.
+ // of encoding and counting the bytes. The encoding will include the length prefix.
return super.getEncodedElementByteSize(value, context);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/6947d21b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StandardCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StandardCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StandardCoder.java
index c17c376..e9a1bd3 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StandardCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StandardCoder.java
@@ -121,8 +121,19 @@ public abstract class StandardCoder<T> implements Coder<T> {
}
/**
- * {@link StandardCoder} implementations should override {@link #initializeCloudObject}
- * if the default {@link CloudObject} representation needs to change.
+ * Adds the following properties to the {@link CloudObject} representation:
+ * <ul>
+ * <li>component_encodings: A list of coders represented as {@link CloudObject}s
+ * equivalent to the {@link #getCoderArguments}.</li>
+ * <li>encoding_id: An identifier for the binary format written by {@link #encode}. See
+ * {@link #getEncodingId} for further details.</li>
+ * <li>allowed_encodings: A collection of encodings supported by {@link #decode} in
+ * addition to the encoding from {@link #getEncodingId()} (which is assumed supported).
+ * See {@link #getAllowedEncodings} for further details.</li>
+ * </ul>
+ *
+ * <p>{@link StandardCoder} implementations should override {@link #initializeCloudObject}
+ * to customize the {@link CloudObject} representation.
*/
@Override
public final CloudObject asCloudObject() {
@@ -151,6 +162,14 @@ public abstract class StandardCoder<T> implements Coder<T> {
return result;
}
+ /**
+ * Subclasses should override this method to customize the {@link CloudObject}
+ * representation. {@link StandardCoder#asCloudObject} delegates to this method
+ * to provide an initial {@link CloudObject}.
+ *
+ * <p>The default implementation returns a {@link CloudObject} using
+ * {@link Object#getClass} for the type.
+ */
protected CloudObject initializeCloudObject() {
return CloudObject.forClass(getClass());
}
http://git-wip-us.apache.org/repos/asf/beam/blob/6947d21b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/GlobalWindow.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/GlobalWindow.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/GlobalWindow.java
index 58b059a..c27749d 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/GlobalWindow.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/GlobalWindow.java
@@ -20,6 +20,7 @@ package org.apache.beam.sdk.transforms.windowing;
import java.io.InputStream;
import java.io.OutputStream;
import org.apache.beam.sdk.coders.AtomicCoder;
+import org.apache.beam.sdk.util.CloudObject;
import org.joda.time.Duration;
import org.joda.time.Instant;
@@ -62,6 +63,11 @@ public class GlobalWindow extends BoundedWindow {
return GlobalWindow.INSTANCE;
}
+ @Override
+ protected CloudObject initializeCloudObject() {
+ return CloudObject.forClassName("kind:global_window");
+ }
+
private Coder() {}
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/6947d21b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java
index ce13317..6b75951 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java
@@ -697,7 +697,7 @@ public abstract class WindowedValue<T> {
@Override
public CloudObject initializeCloudObject() {
- CloudObject result = CloudObject.forClass(getClass());
+ CloudObject result = CloudObject.forClassName("kind:windowed_value");
addBoolean(result, PropertyNames.IS_WRAPPER, true);
return result;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/6947d21b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/IterableCoderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/IterableCoderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/IterableCoderTest.java
index 15ec44b..2df768e 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/IterableCoderTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/IterableCoderTest.java
@@ -19,13 +19,16 @@ package org.apache.beam.sdk.coders;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
import java.util.Arrays;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import org.apache.beam.sdk.testing.CoderProperties;
+import org.apache.beam.sdk.util.CloudObject;
import org.apache.beam.sdk.util.CoderUtils;
+import org.apache.beam.sdk.util.Structs;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
@@ -42,7 +45,14 @@ public class IterableCoderTest {
Collections.<Integer>emptyList(),
Collections.<Integer>singletonList(13),
Arrays.<Integer>asList(1, 2, 3, 4),
- new LinkedList<Integer>(Arrays.asList(7, 6, 5)));
+ new LinkedList<>(Arrays.asList(7, 6, 5)));
+
+ @Test
+ public void testCloudObjectRepresentation() throws Exception {
+ CloudObject cloudObject = TEST_CODER.asCloudObject();
+ assertEquals("kind:stream", cloudObject.getClassName());
+ assertTrue(Structs.getBoolean(cloudObject, "is_stream_like"));
+ }
@Test
public void testDecodeEncodeContentsInSameOrder() throws Exception {
http://git-wip-us.apache.org/repos/asf/beam/blob/6947d21b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/KvCoderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/KvCoderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/KvCoderTest.java
index 4c07c83..1d1f905 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/KvCoderTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/KvCoderTest.java
@@ -17,11 +17,16 @@
*/
package org.apache.beam.sdk.coders;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import org.apache.beam.sdk.testing.CoderProperties;
+import org.apache.beam.sdk.util.CloudObject;
import org.apache.beam.sdk.util.CoderUtils;
+import org.apache.beam.sdk.util.Structs;
import org.apache.beam.sdk.values.KV;
import org.junit.Rule;
import org.junit.Test;
@@ -96,8 +101,10 @@ public class KvCoderTest {
KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of());
@Test
- public void testEnc() {
- System.out.println(TEST_CODER.asCloudObject());
+ public void testCloudObjectRepresentation() throws Exception {
+ CloudObject cloudObject = TEST_CODER.asCloudObject();
+ assertEquals("kind:pair", cloudObject.getClassName());
+ assertTrue(Structs.getBoolean(cloudObject, "is_pair_like"));
}
private static final List<KV<String, Integer>> TEST_VALUES =
http://git-wip-us.apache.org/repos/asf/beam/blob/6947d21b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/LengthPrefixCoderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/LengthPrefixCoderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/LengthPrefixCoderTest.java
index e31c561..95d405d 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/LengthPrefixCoderTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/LengthPrefixCoderTest.java
@@ -25,6 +25,7 @@ import com.google.common.collect.ImmutableList;
import java.util.Arrays;
import java.util.List;
import org.apache.beam.sdk.testing.CoderProperties;
+import org.apache.beam.sdk.util.CloudObject;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
@@ -40,6 +41,22 @@ public class LengthPrefixCoderTest {
new byte[]{ 0xd, 0xe },
new byte[]{ });
+ /**
+ * Generated data to check that the wire format has not changed. To regenerate, see
+ * {@link org.apache.beam.sdk.coders.PrintBase64Encodings}.
+ */
+ private static final List<String> TEST_ENCODINGS = ImmutableList.of(
+ "AwoLDA",
+ "Ag0D",
+ "Ag0O",
+ "AA");
+
+ @Test
+ public void testCloudObjectRepresentation() throws Exception {
+ CloudObject cloudObject = TEST_CODER.asCloudObject();
+ assertEquals("kind:length_prefix", cloudObject.getClassName());
+ }
+
@Test
public void testCoderSerializable() throws Exception {
CoderProperties.coderSerializable(TEST_CODER);
@@ -76,10 +93,10 @@ public class LengthPrefixCoderTest {
@Test
public void testRegisterByteSizeObserver() throws Exception {
CoderProperties.testByteCount(TEST_CODER, Coder.Context.OUTER,
- new byte[][]{{ 0xa, 0xb, 0xc }});
+ new byte[][]{{ 0xa, 0xb, 0xc }});
CoderProperties.testByteCount(TEST_CODER, Coder.Context.NESTED,
- new byte[][]{{ 0xa, 0xb, 0xc }, {}, {}, { 0xd, 0xe }, {}});
+ new byte[][]{{ 0xa, 0xb, 0xc }, {}, {}, { 0xd, 0xe }, {}});
}
@Test
@@ -99,16 +116,6 @@ public class LengthPrefixCoderTest {
CoderProperties.coderHasEncodingId(TEST_CODER, EXPECTED_ENCODING_ID);
}
- /**
- * Generated data to check that the wire format has not changed. To regenerate, see
- * {@link org.apache.beam.sdk.coders.PrintBase64Encodings}.
- */
- private static final List<String> TEST_ENCODINGS = Arrays.asList(
- "AwoLDA",
- "Ag0D",
- "Ag0O",
- "AA");
-
@Test
public void testWireFormatEncode() throws Exception {
CoderProperties.coderEncodesBase64(TEST_CODER, TEST_VALUES, TEST_ENCODINGS);
http://git-wip-us.apache.org/repos/asf/beam/blob/6947d21b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/GlobalWindowTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/GlobalWindowTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/GlobalWindowTest.java
new file mode 100644
index 0000000..1857332
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/GlobalWindowTest.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.windowing;
+
+import static org.junit.Assert.assertEquals;
+
+import com.google.common.io.ByteStreams;
+import com.google.common.io.CountingOutputStream;
+import org.apache.beam.sdk.coders.Coder.Context;
+import org.apache.beam.sdk.testing.CoderProperties;
+import org.apache.beam.sdk.util.CloudObject;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link GlobalWindow}. */
+@RunWith(JUnit4.class)
+public class GlobalWindowTest {
+ @Test
+ public void testCoderCloudObjectRepresentation() throws Exception {
+ CloudObject cloudObject = GlobalWindow.Coder.INSTANCE.asCloudObject();
+ assertEquals("kind:global_window", cloudObject.getClassName());
+ }
+
+ @Test
+ public void testCoderBinaryRepresentation() throws Exception {
+ CountingOutputStream out = new CountingOutputStream(ByteStreams.nullOutputStream());
+ GlobalWindow.Coder.INSTANCE.encode(GlobalWindow.INSTANCE, out, Context.OUTER);
+ assertEquals(0, out.getCount());
+ GlobalWindow.Coder.INSTANCE.encode(GlobalWindow.INSTANCE, out, Context.NESTED);
+ assertEquals(0, out.getCount());
+ }
+
+ @Test
+ public void testCoderEncodeDecodeEquals() throws Exception {
+ CoderProperties.coderDecodeEncodeEqual(GlobalWindow.Coder.INSTANCE, GlobalWindow.INSTANCE);
+ }
+
+ @Test
+ public void testCoderIsSerializable() {
+ CoderProperties.coderSerializable(GlobalWindow.Coder.INSTANCE);
+ }
+
+ @Test
+ public void testCoderIsDeterministic() throws Exception {
+ CoderProperties.coderDeterministic(
+ GlobalWindow.Coder.INSTANCE, GlobalWindow.INSTANCE, GlobalWindow.INSTANCE);
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/6947d21b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/WindowedValueTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/WindowedValueTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/WindowedValueTest.java
index 0c69a59..1c67ef0 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/WindowedValueTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/WindowedValueTest.java
@@ -20,7 +20,9 @@ package org.apache.beam.sdk.util;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.emptyIterable;
import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
@@ -29,6 +31,7 @@ import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing;
@@ -64,6 +67,14 @@ public class WindowedValueTest {
}
@Test
+ public void testWindowedValueCoderCloudObjectRepresentation() throws Exception {
+ CloudObject cloudObject = WindowedValue.getFullCoder(
+ StringUtf8Coder.of(), GlobalWindow.Coder.INSTANCE).asCloudObject();
+ assertEquals("kind:windowed_value", cloudObject.getClassName());
+ assertTrue(Structs.getBoolean(cloudObject, "is_wrapper"));
+ }
+
+ @Test
public void testExplodeWindowsInNoWindowsEmptyIterable() {
WindowedValue<String> value =
WindowedValue.of(