You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/10/14 17:27:21 UTC
[1/2] incubator-beam git commit: Do not modify the context in
NullableCoder
Repository: incubator-beam
Updated Branches:
refs/heads/master b7b68e6fb -> 2584ebeb8
Do not modify the context in NullableCoder
The NullableCoder does not encode any elements after the subcoder
encodes the input value for non-null values. As a result, the subcoder
should see the entire input stream if it is available.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/2c056b17
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/2c056b17
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/2c056b17
Branch: refs/heads/master
Commit: 2c056b171b6e329ca5c025eb6fbc81cff29a8950
Parents: b7b68e6
Author: Thomas Groh <tg...@google.com>
Authored: Fri Sep 23 08:58:07 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Fri Oct 14 10:27:15 2016 -0700
----------------------------------------------------------------------
.../apache/beam/sdk/coders/NullableCoder.java | 4 +-
.../beam/sdk/coders/NullableCoderTest.java | 49 +++++++++++++++++---
2 files changed, 44 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2c056b17/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/NullableCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/NullableCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/NullableCoder.java
index 9c6c7c0..29b697c 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/NullableCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/NullableCoder.java
@@ -79,7 +79,7 @@ public class NullableCoder<T> extends StandardCoder<T> {
outStream.write(ENCODE_NULL);
} else {
outStream.write(ENCODE_PRESENT);
- valueCoder.encode(value, outStream, context.nested());
+ valueCoder.encode(value, outStream, context);
}
}
@@ -94,7 +94,7 @@ public class NullableCoder<T> extends StandardCoder<T> {
"NullableCoder expects either a byte valued %s (null) or %s (present), got %s",
ENCODE_NULL, ENCODE_PRESENT, b));
}
- return valueCoder.decode(inStream, context.nested());
+ return valueCoder.decode(inStream, context);
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2c056b17/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/NullableCoderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/NullableCoderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/NullableCoderTest.java
index 61e7e41..425d5ba 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/NullableCoderTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/NullableCoderTest.java
@@ -17,6 +17,7 @@
*/
package org.apache.beam.sdk.coders;
+import static com.google.common.base.Preconditions.checkArgument;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.theInstance;
import static org.junit.Assert.assertEquals;
@@ -26,9 +27,13 @@ import static org.junit.Assert.assertTrue;
import com.google.common.collect.ImmutableList;
import java.io.ByteArrayInputStream;
+import java.io.IOException;
import java.io.InputStream;
+import java.io.OutputStream;
import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
+import org.apache.beam.sdk.coders.Coder.Context;
import org.apache.beam.sdk.testing.CoderProperties;
import org.junit.Rule;
import org.junit.Test;
@@ -76,14 +81,15 @@ public class NullableCoderTest {
* @see org.apache.beam.sdk.coders.PrintBase64Encodings
*/
private static final List<String> TEST_ENCODINGS = Arrays.asList(
- "AQA",
- "AQFh",
- "AQIxMw",
- "AQVoZWxsbw",
+ "AQ",
+ "AWE",
+ "ATEz",
+ "AWhlbGxv",
"AA",
- "AShhIGxvbmdlciBzdHJpbmcgd2l0aCBzcGFjZXMgYW5kIGFsbCB0aGF0",
- "ARlhIHN0cmluZyB3aXRoIGEgCiBuZXdsaW5l",
- "AQ_jgrnjgr_jg6rjg7PjgrA");
+ "AWEgbG9uZ2VyIHN0cmluZyB3aXRoIHNwYWNlcyBhbmQgYWxsIHRoYXQ",
+ "AWEgc3RyaW5nIHdpdGggYSAKIG5ld2xpbmU",
+ "AeOCueOCv-ODquODs-OCsA"
+ );
@Test
public void testWireFormatEncode() throws Exception {
@@ -135,8 +141,37 @@ public class NullableCoderTest {
}
@Test
+ public void testSubcoderRecievesEntireStream() throws Exception {
+ NullableCoder<String> coder = NullableCoder.of(new EntireStreamExpectingCoder());
+
+ CoderProperties.coderDecodeEncodeEqualInContext(coder, Context.OUTER, null);
+ CoderProperties.coderDecodeEncodeEqualInContext(coder, Context.OUTER, "foo");
+ }
+
+ @Test
public void testNestedNullableCoder() {
NullableCoder<Double> coder = NullableCoder.of(DoubleCoder.of());
assertThat(NullableCoder.of(coder), theInstance(coder));
}
+
+ private static class EntireStreamExpectingCoder extends DeterministicStandardCoder<String> {
+ @Override
+ public void encode(
+ String value, OutputStream outStream, Context context) throws IOException {
+ checkArgument(context.isWholeStream, "Expected to get entire stream");
+ StringUtf8Coder.of().encode(value, outStream, context);
+ }
+
+ @Override
+ public String decode(InputStream inStream, Context context)
+ throws CoderException, IOException {
+ checkArgument(context.isWholeStream, "Expected to get entire stream");
+ return StringUtf8Coder.of().decode(inStream, context);
+ }
+
+ @Override
+ public List<? extends Coder<?>> getCoderArguments() {
+ return Collections.emptyList();
+ }
+ }
}
[2/2] incubator-beam git commit: Closes #992
Posted by dh...@apache.org.
Closes #992
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/2584ebeb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/2584ebeb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/2584ebeb
Branch: refs/heads/master
Commit: 2584ebeb8c57314288a5eba480a8bd2c7dc1b1d2
Parents: b7b68e6 2c056b1
Author: Dan Halperin <dh...@google.com>
Authored: Fri Oct 14 10:27:16 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Fri Oct 14 10:27:16 2016 -0700
----------------------------------------------------------------------
.../apache/beam/sdk/coders/NullableCoder.java | 4 +-
.../beam/sdk/coders/NullableCoderTest.java | 49 +++++++++++++++++---
2 files changed, 44 insertions(+), 9 deletions(-)
----------------------------------------------------------------------