You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2017/01/04 00:36:21 UTC

[5/6] beam git commit: Use kind:* for certain well known coder types.

Use kind:* for certain well known coder types.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/6947d21b
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/6947d21b
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/6947d21b

Branch: refs/heads/master
Commit: 6947d21bc1876c408a23b5e98ef813bb59d5b8d1
Parents: 1b76d3d
Author: Luke Cwik <lc...@google.com>
Authored: Thu Dec 29 14:20:04 2016 -0800
Committer: Luke Cwik <lc...@google.com>
Committed: Tue Jan 3 16:35:37 2017 -0800

----------------------------------------------------------------------
 .../apache/beam/sdk/coders/IterableCoder.java   |  2 +-
 .../org/apache/beam/sdk/coders/KvCoder.java     |  2 +-
 .../beam/sdk/coders/LengthPrefixCoder.java      |  8 ++-
 .../apache/beam/sdk/coders/StandardCoder.java   | 23 ++++++-
 .../sdk/transforms/windowing/GlobalWindow.java  |  6 ++
 .../org/apache/beam/sdk/util/WindowedValue.java |  2 +-
 .../beam/sdk/coders/IterableCoderTest.java      | 12 +++-
 .../org/apache/beam/sdk/coders/KvCoderTest.java | 11 +++-
 .../beam/sdk/coders/LengthPrefixCoderTest.java  | 31 ++++++----
 .../transforms/windowing/GlobalWindowTest.java  | 64 ++++++++++++++++++++
 .../apache/beam/sdk/util/WindowedValueTest.java | 11 ++++
 11 files changed, 150 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/6947d21b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/IterableCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/IterableCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/IterableCoder.java
index cc6b970..a1f6fa3 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/IterableCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/IterableCoder.java
@@ -69,7 +69,7 @@ public class IterableCoder<T> extends IterableLikeCoder<T, Iterable<T>> {
 
   @Override
   protected CloudObject initializeCloudObject() {
-    CloudObject result = CloudObject.forClass(getClass());
+    CloudObject result = CloudObject.forClassName("kind:stream");
     addBoolean(result, PropertyNames.IS_STREAM_LIKE, true);
     return result;
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/6947d21b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/KvCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/KvCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/KvCoder.java
index 1e70a30..c9d05fc 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/KvCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/KvCoder.java
@@ -123,7 +123,7 @@ public class KvCoder<K, V> extends StandardCoder<KV<K, V>> {
 
   @Override
   protected CloudObject initializeCloudObject() {
-    CloudObject result = CloudObject.forClass(getClass());
+    CloudObject result = CloudObject.forClassName("kind:pair");
     addBoolean(result, PropertyNames.IS_PAIR_LIKE, true);
     return result;
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/6947d21b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/LengthPrefixCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/LengthPrefixCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/LengthPrefixCoder.java
index 7319200..d123a38 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/LengthPrefixCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/LengthPrefixCoder.java
@@ -30,6 +30,7 @@ import java.io.InputStream;
 import java.io.OutputStream;
 import java.util.List;
 import javax.annotation.Nullable;
+import org.apache.beam.sdk.util.CloudObject;
 import org.apache.beam.sdk.util.PropertyNames;
 import org.apache.beam.sdk.util.VarInt;
 
@@ -65,7 +66,10 @@ public class LengthPrefixCoder<T> extends StandardCoder<T> {
     this.valueCoder = valueCoder;
   }
 
-
+  @Override
+  protected CloudObject initializeCloudObject() {
+    return CloudObject.forClassName("kind:length_prefix");
+  }
 
   @Override
   public void encode(T value, OutputStream outStream, Context context)
@@ -125,7 +129,7 @@ public class LengthPrefixCoder<T> extends StandardCoder<T> {
     }
 
     // If value is not a StandardCoder then fall back to the default StandardCoder behavior
-    // of encoding and counting the bytes. The encoding will include the null indicator byte.
+    // of encoding and counting the bytes. The encoding will include the length prefix.
     return super.getEncodedElementByteSize(value, context);
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/6947d21b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StandardCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StandardCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StandardCoder.java
index c17c376..e9a1bd3 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StandardCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StandardCoder.java
@@ -121,8 +121,19 @@ public abstract class StandardCoder<T> implements Coder<T> {
   }
 
   /**
-   * {@link StandardCoder} implementations should override {@link #initializeCloudObject}
-   * if the default {@link CloudObject} representation needs to change.
+   * Adds the following properties to the {@link CloudObject} representation:
+   * <ul>
+   *   <li>component_encodings: A list of coders represented as {@link CloudObject}s
+   *       equivalent to the {@link #getCoderArguments}.</li>
+   *   <li>encoding_id: An identifier for the binary format written by {@link #encode}. See
+   *       {@link #getEncodingId} for further details.</li>
+   *   <li>allowed_encodings: A collection of encodings supported by {@link #decode} in
+   *       addition to the encoding from {@link #getEncodingId()} (which is assumed supported).
+   *       See {@link #getAllowedEncodings} for further details.</li>
+   * </ul>
+   *
+   * <p>{@link StandardCoder} implementations should override {@link #initializeCloudObject}
+   * to customize the {@link CloudObject} representation.
    */
   @Override
   public final CloudObject asCloudObject() {
@@ -151,6 +162,14 @@ public abstract class StandardCoder<T> implements Coder<T> {
     return result;
   }
 
+  /**
+   * Subclasses should override this method to customize the {@link CloudObject}
+   * representation. {@link StandardCoder#asCloudObject} delegates to this method
+   * to provide an initial {@link CloudObject}.
+   *
+   * <p>The default implementation returns a {@link CloudObject} using
+   * {@link Object#getClass} for the type.
+   */
   protected CloudObject initializeCloudObject() {
     return CloudObject.forClass(getClass());
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/6947d21b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/GlobalWindow.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/GlobalWindow.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/GlobalWindow.java
index 58b059a..c27749d 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/GlobalWindow.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/GlobalWindow.java
@@ -20,6 +20,7 @@ package org.apache.beam.sdk.transforms.windowing;
 import java.io.InputStream;
 import java.io.OutputStream;
 import org.apache.beam.sdk.coders.AtomicCoder;
+import org.apache.beam.sdk.util.CloudObject;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
 
@@ -62,6 +63,11 @@ public class GlobalWindow extends BoundedWindow {
       return GlobalWindow.INSTANCE;
     }
 
+    @Override
+    protected CloudObject initializeCloudObject() {
+      return CloudObject.forClassName("kind:global_window");
+    }
+
     private Coder() {}
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/6947d21b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java
index ce13317..6b75951 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java
@@ -697,7 +697,7 @@ public abstract class WindowedValue<T> {
 
     @Override
     public CloudObject initializeCloudObject() {
-      CloudObject result = CloudObject.forClass(getClass());
+      CloudObject result = CloudObject.forClassName("kind:windowed_value");
       addBoolean(result, PropertyNames.IS_WRAPPER, true);
       return result;
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/6947d21b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/IterableCoderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/IterableCoderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/IterableCoderTest.java
index 15ec44b..2df768e 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/IterableCoderTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/IterableCoderTest.java
@@ -19,13 +19,16 @@ package org.apache.beam.sdk.coders;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
 
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.LinkedList;
 import java.util.List;
 import org.apache.beam.sdk.testing.CoderProperties;
+import org.apache.beam.sdk.util.CloudObject;
 import org.apache.beam.sdk.util.CoderUtils;
+import org.apache.beam.sdk.util.Structs;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
@@ -42,7 +45,14 @@ public class IterableCoderTest {
       Collections.<Integer>emptyList(),
       Collections.<Integer>singletonList(13),
       Arrays.<Integer>asList(1, 2, 3, 4),
-      new LinkedList<Integer>(Arrays.asList(7, 6, 5)));
+      new LinkedList<>(Arrays.asList(7, 6, 5)));
+
+  @Test
+  public void testCloudObjectRepresentation() throws Exception {
+    CloudObject cloudObject = TEST_CODER.asCloudObject();
+    assertEquals("kind:stream", cloudObject.getClassName());
+    assertTrue(Structs.getBoolean(cloudObject, "is_stream_like"));
+  }
 
   @Test
   public void testDecodeEncodeContentsInSameOrder() throws Exception {

http://git-wip-us.apache.org/repos/asf/beam/blob/6947d21b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/KvCoderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/KvCoderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/KvCoderTest.java
index 4c07c83..1d1f905 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/KvCoderTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/KvCoderTest.java
@@ -17,11 +17,16 @@
  */
 package org.apache.beam.sdk.coders;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import org.apache.beam.sdk.testing.CoderProperties;
+import org.apache.beam.sdk.util.CloudObject;
 import org.apache.beam.sdk.util.CoderUtils;
+import org.apache.beam.sdk.util.Structs;
 import org.apache.beam.sdk.values.KV;
 import org.junit.Rule;
 import org.junit.Test;
@@ -96,8 +101,10 @@ public class KvCoderTest {
       KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of());
 
   @Test
-  public void testEnc() {
-    System.out.println(TEST_CODER.asCloudObject());
+  public void testCloudObjectRepresentation() throws Exception {
+    CloudObject cloudObject = TEST_CODER.asCloudObject();
+    assertEquals("kind:pair", cloudObject.getClassName());
+    assertTrue(Structs.getBoolean(cloudObject, "is_pair_like"));
   }
 
   private static final List<KV<String, Integer>> TEST_VALUES =

http://git-wip-us.apache.org/repos/asf/beam/blob/6947d21b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/LengthPrefixCoderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/LengthPrefixCoderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/LengthPrefixCoderTest.java
index e31c561..95d405d 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/LengthPrefixCoderTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/LengthPrefixCoderTest.java
@@ -25,6 +25,7 @@ import com.google.common.collect.ImmutableList;
 import java.util.Arrays;
 import java.util.List;
 import org.apache.beam.sdk.testing.CoderProperties;
+import org.apache.beam.sdk.util.CloudObject;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
@@ -40,6 +41,22 @@ public class LengthPrefixCoderTest {
     new byte[]{ 0xd, 0xe },
     new byte[]{ });
 
+  /**
+   * Generated data to check that the wire format has not changed. To regenerate, see
+   * {@link org.apache.beam.sdk.coders.PrintBase64Encodings}.
+   */
+  private static final List<String> TEST_ENCODINGS = ImmutableList.of(
+      "AwoLDA",
+      "Ag0D",
+      "Ag0O",
+      "AA");
+
+  @Test
+  public void testCloudObjectRepresentation() throws Exception {
+    CloudObject cloudObject = TEST_CODER.asCloudObject();
+    assertEquals("kind:length_prefix", cloudObject.getClassName());
+  }
+
   @Test
   public void testCoderSerializable() throws Exception {
     CoderProperties.coderSerializable(TEST_CODER);
@@ -76,10 +93,10 @@ public class LengthPrefixCoderTest {
   @Test
   public void testRegisterByteSizeObserver() throws Exception {
     CoderProperties.testByteCount(TEST_CODER, Coder.Context.OUTER,
-                                   new byte[][]{{ 0xa, 0xb, 0xc }});
+        new byte[][]{{ 0xa, 0xb, 0xc }});
 
     CoderProperties.testByteCount(TEST_CODER, Coder.Context.NESTED,
-                                   new byte[][]{{ 0xa, 0xb, 0xc }, {}, {}, { 0xd, 0xe }, {}});
+        new byte[][]{{ 0xa, 0xb, 0xc }, {}, {}, { 0xd, 0xe }, {}});
   }
 
   @Test
@@ -99,16 +116,6 @@ public class LengthPrefixCoderTest {
     CoderProperties.coderHasEncodingId(TEST_CODER, EXPECTED_ENCODING_ID);
   }
 
-  /**
-   * Generated data to check that the wire format has not changed. To regenerate, see
-   * {@link org.apache.beam.sdk.coders.PrintBase64Encodings}.
-   */
-  private static final List<String> TEST_ENCODINGS = Arrays.asList(
-      "AwoLDA",
-      "Ag0D",
-      "Ag0O",
-      "AA");
-
   @Test
   public void testWireFormatEncode() throws Exception {
     CoderProperties.coderEncodesBase64(TEST_CODER, TEST_VALUES, TEST_ENCODINGS);

http://git-wip-us.apache.org/repos/asf/beam/blob/6947d21b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/GlobalWindowTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/GlobalWindowTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/GlobalWindowTest.java
new file mode 100644
index 0000000..1857332
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/GlobalWindowTest.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.windowing;
+
+import static org.junit.Assert.assertEquals;
+
+import com.google.common.io.ByteStreams;
+import com.google.common.io.CountingOutputStream;
+import org.apache.beam.sdk.coders.Coder.Context;
+import org.apache.beam.sdk.testing.CoderProperties;
+import org.apache.beam.sdk.util.CloudObject;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link GlobalWindow}. */
+@RunWith(JUnit4.class)
+public class GlobalWindowTest {
+  @Test
+  public void testCoderCloudObjectRepresentation() throws Exception {
+    CloudObject cloudObject = GlobalWindow.Coder.INSTANCE.asCloudObject();
+    assertEquals("kind:global_window", cloudObject.getClassName());
+  }
+
+  @Test
+  public void testCoderBinaryRepresentation() throws Exception {
+    CountingOutputStream out = new CountingOutputStream(ByteStreams.nullOutputStream());
+    GlobalWindow.Coder.INSTANCE.encode(GlobalWindow.INSTANCE, out, Context.OUTER);
+    assertEquals(0, out.getCount());
+    GlobalWindow.Coder.INSTANCE.encode(GlobalWindow.INSTANCE, out, Context.NESTED);
+    assertEquals(0, out.getCount());
+  }
+
+  @Test
+  public void testCoderEncodeDecodeEquals() throws Exception {
+    CoderProperties.coderDecodeEncodeEqual(GlobalWindow.Coder.INSTANCE, GlobalWindow.INSTANCE);
+  }
+
+  @Test
+  public void testCoderIsSerializable() {
+    CoderProperties.coderSerializable(GlobalWindow.Coder.INSTANCE);
+  }
+
+  @Test
+  public void testCoderIsDeterministic() throws Exception {
+    CoderProperties.coderDeterministic(
+        GlobalWindow.Coder.INSTANCE, GlobalWindow.INSTANCE, GlobalWindow.INSTANCE);
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/6947d21b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/WindowedValueTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/WindowedValueTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/WindowedValueTest.java
index 0c69a59..1c67ef0 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/WindowedValueTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/WindowedValueTest.java
@@ -20,7 +20,9 @@ package org.apache.beam.sdk.util;
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.emptyIterable;
 import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
@@ -29,6 +31,7 @@ import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing;
@@ -64,6 +67,14 @@ public class WindowedValueTest {
   }
 
   @Test
+  public void testWindowedValueCoderCloudObjectRepresentation() throws Exception {
+    CloudObject cloudObject = WindowedValue.getFullCoder(
+        StringUtf8Coder.of(), GlobalWindow.Coder.INSTANCE).asCloudObject();
+    assertEquals("kind:windowed_value", cloudObject.getClassName());
+    assertTrue(Structs.getBoolean(cloudObject, "is_wrapper"));
+  }
+
+  @Test
   public void testExplodeWindowsInNoWindowsEmptyIterable() {
     WindowedValue<String> value =
         WindowedValue.of(