You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2017/05/09 04:21:24 UTC
[04/13] beam git commit: automated context removal or redirection
http://git-wip-us.apache.org/repos/asf/beam/blob/b7f3341e/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CustomCoderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CustomCoderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CustomCoderTest.java
index dfd4ea2..13a7261 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CustomCoderTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CustomCoderTest.java
@@ -48,13 +48,13 @@ public class CustomCoderTest {
}
@Override
- public void encode(KV<String, Long> kv, OutputStream out, Context context)
+ public void encode(KV<String, Long> kv, OutputStream out)
throws IOException {
new DataOutputStream(out).writeLong(kv.getValue());
}
@Override
- public KV<String, Long> decode(InputStream inStream, Context context)
+ public KV<String, Long> decode(InputStream inStream)
throws IOException {
return KV.of(key, new DataInputStream(inStream).readLong());
}
http://git-wip-us.apache.org/repos/asf/beam/blob/b7f3341e/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/NullableCoderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/NullableCoderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/NullableCoderTest.java
index d6d7de8..9fb0b82 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/NullableCoderTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/NullableCoderTest.java
@@ -167,6 +167,12 @@ public class NullableCoderTest {
private static class EntireStreamExpectingCoder extends AtomicCoder<String> {
@Override
+ public void encode(String value, OutputStream outStream)
+ throws IOException {
+ encode(value, outStream, Context.NESTED);
+ }
+
+ @Override
public void encode(
String value, OutputStream outStream, Context context) throws IOException {
checkArgument(context.isWholeStream, "Expected to get entire stream");
@@ -174,6 +180,11 @@ public class NullableCoderTest {
}
@Override
+ public String decode(InputStream inStream) throws CoderException, IOException {
+ return decode(inStream, Context.NESTED);
+ }
+
+ @Override
public String decode(InputStream inStream, Context context)
throws CoderException, IOException {
checkArgument(context.isWholeStream, "Expected to get entire stream");
http://git-wip-us.apache.org/repos/asf/beam/blob/b7f3341e/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/StructuredCoderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/StructuredCoderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/StructuredCoderTest.java
index af2c94e..7aa2080 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/StructuredCoderTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/StructuredCoderTest.java
@@ -47,7 +47,7 @@ public class StructuredCoderTest {
private static final long serialVersionUID = 0L;
@Override
- public void encode(@Nullable Boolean value, OutputStream outStream, Context context)
+ public void encode(@Nullable Boolean value, OutputStream outStream)
throws CoderException, IOException {
if (value == null) {
outStream.write(2);
@@ -61,7 +61,7 @@ public class StructuredCoderTest {
@Override
@Nullable
public Boolean decode(
- InputStream inStream, org.apache.beam.sdk.coders.Coder.Context context)
+ InputStream inStream)
throws CoderException, IOException {
int value = inStream.read();
if (value == 0) {
@@ -110,7 +110,7 @@ public class StructuredCoderTest {
@Override
public void encode(
- @Nullable ObjectIdentityBoolean value, OutputStream outStream, Context context)
+ @Nullable ObjectIdentityBoolean value, OutputStream outStream)
throws CoderException, IOException {
if (value == null) {
outStream.write(2);
@@ -124,7 +124,7 @@ public class StructuredCoderTest {
@Override
@Nullable
public ObjectIdentityBoolean decode(
- InputStream inStream, org.apache.beam.sdk.coders.Coder.Context context)
+ InputStream inStream)
throws CoderException, IOException {
int value = inStream.read();
if (value == 0) {
@@ -213,13 +213,13 @@ public class StructuredCoderTest {
private static class Foo<T> extends StructuredCoder<T> {
@Override
- public void encode(T value, OutputStream outStream, Coder.Context context)
+ public void encode(T value, OutputStream outStream)
throws CoderException, IOException {
throw new UnsupportedOperationException();
}
@Override
- public T decode(InputStream inStream, Coder.Context context)
+ public T decode(InputStream inStream)
throws CoderException, IOException {
throw new UnsupportedOperationException();
}
http://git-wip-us.apache.org/repos/asf/beam/blob/b7f3341e/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java
index 83f348c..37db4ef 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java
@@ -98,12 +98,12 @@ public class PAssertTest implements Serializable {
}
@Override
- public void encode(NotSerializableObject value, OutputStream outStream, Context context)
+ public void encode(NotSerializableObject value, OutputStream outStream)
throws CoderException, IOException {
}
@Override
- public NotSerializableObject decode(InputStream inStream, Context context)
+ public NotSerializableObject decode(InputStream inStream)
throws CoderException, IOException {
return new NotSerializableObject();
}
http://git-wip-us.apache.org/repos/asf/beam/blob/b7f3341e/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/SerializableMatchersTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/SerializableMatchersTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/SerializableMatchersTest.java
index db5ff2e..375be33 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/SerializableMatchersTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/SerializableMatchersTest.java
@@ -153,11 +153,11 @@ public class SerializableMatchersTest implements Serializable {
private static class NotSerializableClassCoder extends AtomicCoder<NotSerializableClass> {
@Override
- public void encode(NotSerializableClass value, OutputStream outStream, Coder.Context context) {
+ public void encode(NotSerializableClass value, OutputStream outStream) {
}
@Override
- public NotSerializableClass decode(InputStream inStream, Coder.Context context) {
+ public NotSerializableClass decode(InputStream inStream) {
return new NotSerializableClass();
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/b7f3341e/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/WindowSupplierTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/WindowSupplierTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/WindowSupplierTest.java
index 546683b..3939800 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/WindowSupplierTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/WindowSupplierTest.java
@@ -75,14 +75,14 @@ public class WindowSupplierTest {
private static class FailingCoder extends AtomicCoder<BoundedWindow> {
@Override
public void encode(
- BoundedWindow value, OutputStream outStream, Context context)
+ BoundedWindow value, OutputStream outStream)
throws CoderException, IOException {
throw new CoderException("Test Encode Exception");
}
@Override
public BoundedWindow decode(
- InputStream inStream, Context context) throws CoderException, IOException {
+ InputStream inStream) throws CoderException, IOException {
throw new CoderException("Test Decode Exception");
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/b7f3341e/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineFnsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineFnsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineFnsTest.java
index 8a4d563..33c652a 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineFnsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineFnsTest.java
@@ -336,12 +336,23 @@ public class CombineFnsTest {
private static final UserStringCoder INSTANCE = new UserStringCoder();
@Override
+ public void encode(UserString value, OutputStream outStream)
+ throws CoderException, IOException {
+ encode(value, outStream, Context.NESTED);
+ }
+
+ @Override
public void encode(UserString value, OutputStream outStream, Context context)
throws CoderException, IOException {
StringUtf8Coder.of().encode(value.strValue, outStream, context);
}
@Override
+ public UserString decode(InputStream inStream) throws CoderException, IOException {
+ return decode(inStream, Context.NESTED);
+ }
+
+ @Override
public UserString decode(InputStream inStream, Context context)
throws CoderException, IOException {
return UserString.of(StringUtf8Coder.of().decode(inStream, context));
http://git-wip-us.apache.org/repos/asf/beam/blob/b7f3341e/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
index e4b016b..bd8aee4 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
@@ -876,6 +876,12 @@ public class CombineTest implements Serializable {
*/
private class CountSumCoder extends AtomicCoder<CountSum> {
@Override
+ public void encode(CountSum value, OutputStream outStream, OutputStream outStream)
+ throws CoderException, IOException {
+ encode(outStream, outStream, Context.NESTED);
+ }
+
+ @Override
public void encode(CountSum value, OutputStream outStream,
Context context) throws CoderException, IOException {
LONG_CODER.encode(value.count, outStream);
@@ -883,6 +889,11 @@ public class CombineTest implements Serializable {
}
@Override
+ public CountSum decode(InputStream inStream) throws CoderException, IOException {
+ return decode(inStream, Coder.Context.NESTED);
+ }
+
+ @Override
public CountSum decode(InputStream inStream, Coder.Context context)
throws CoderException, IOException {
long count = LONG_CODER.decode(inStream);
@@ -925,12 +936,23 @@ public class CombineTest implements Serializable {
public static Coder<Accumulator> getCoder() {
return new AtomicCoder<Accumulator>() {
@Override
+ public void encode(Accumulator accumulator, OutputStream outStream)
+ throws CoderException, IOException {
+ encode(accumulator, outStream, Coder.Context.NESTED);
+ }
+
+ @Override
public void encode(Accumulator accumulator, OutputStream outStream, Coder.Context context)
throws CoderException, IOException {
StringUtf8Coder.of().encode(accumulator.value, outStream, context);
}
@Override
+ public Accumulator decode(InputStream inStream) throws CoderException, IOException {
+ return decode(inStream, Coder.Context.NESTED);
+ }
+
+ @Override
public Accumulator decode(InputStream inStream, Coder.Context context)
throws CoderException, IOException {
return new Accumulator(StringUtf8Coder.of().decode(inStream, context));
http://git-wip-us.apache.org/repos/asf/beam/blob/b7f3341e/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java
index 7e8a1dd..a05d31c 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java
@@ -134,11 +134,11 @@ public class CreateTest {
private static class RecordCoder extends AtomicCoder<Record> {
@Override
- public void encode(Record value, OutputStream outStream, Context context)
+ public void encode(Record value, OutputStream outStream)
throws CoderException, IOException {}
@Override
- public Record decode(InputStream inStream, Context context) throws CoderException, IOException {
+ public Record decode(InputStream inStream) throws CoderException, IOException {
return null;
}
}
@@ -207,15 +207,14 @@ public class CreateTest {
@Override
public void encode(
UnserializableRecord value,
- OutputStream outStream,
- org.apache.beam.sdk.coders.Coder.Context context)
+ OutputStream outStream)
throws CoderException, IOException {
stringCoder.encode(value.myString, outStream);
}
@Override
public UnserializableRecord decode(
- InputStream inStream, org.apache.beam.sdk.coders.Coder.Context context)
+ InputStream inStream)
throws CoderException, IOException {
return new UnserializableRecord(stringCoder.decode(inStream));
}
http://git-wip-us.apache.org/repos/asf/beam/blob/b7f3341e/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
index aba33eb..0cd885c 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
@@ -469,13 +469,13 @@ public class GroupByKeyTest {
private DeterministicKeyCoder() {}
@Override
- public void encode(BadEqualityKey value, OutputStream outStream, Context context)
+ public void encode(BadEqualityKey value, OutputStream outStream)
throws IOException {
new DataOutputStream(outStream).writeLong(value.key);
}
@Override
- public BadEqualityKey decode(InputStream inStream, Context context)
+ public BadEqualityKey decode(InputStream inStream)
throws IOException {
return new BadEqualityKey(new DataInputStream(inStream).readLong());
}
http://git-wip-us.apache.org/repos/asf/beam/blob/b7f3341e/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
index d2cb980..3697211 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
@@ -986,12 +986,12 @@ public class ParDoTest implements Serializable {
}
@Override
- public void encode(TestDummy value, OutputStream outStream, Context context)
+ public void encode(TestDummy value, OutputStream outStream)
throws CoderException, IOException {
}
@Override
- public TestDummy decode(InputStream inStream, Context context)
+ public TestDummy decode(InputStream inStream)
throws CoderException, IOException {
return new TestDummy();
}
@@ -1090,12 +1090,23 @@ public class ParDoTest implements Serializable {
}
@Override
+ public void encode(MyInteger value, OutputStream outStream)
+ throws CoderException, IOException {
+ encode(value, outStream, Context.NESTED);
+ }
+
+ @Override
public void encode(MyInteger value, OutputStream outStream, Context context)
throws CoderException, IOException {
delegate.encode(value.getValue(), outStream, context);
}
@Override
+ public MyInteger decode(InputStream inStream) throws CoderException {
+ return decode(inStream, Context.NESTED);
+ }
+
+ @Override
public MyInteger decode(InputStream inStream, Context context) throws CoderException,
IOException {
return new MyInteger(delegate.decode(inStream, context));
http://git-wip-us.apache.org/repos/asf/beam/blob/b7f3341e/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java
index 84f3d69..cdd03d9 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java
@@ -507,12 +507,23 @@ public class ViewTest implements Serializable {
private static class NonDeterministicStringCoder extends AtomicCoder<String> {
@Override
+ public void encode(String value, OutputStream outStream)
+ throws CoderException, IOException {
+ encode(value, outStream, Coder.Context.NESTED);
+ }
+
+ @Override
public void encode(String value, OutputStream outStream, Coder.Context context)
throws CoderException, IOException {
StringUtf8Coder.of().encode(value, outStream, context);
}
@Override
+ public String decode(InputStream inStream) throws CoderException, IOException {
+ return decode(inStream, Coder.Context.NESTED);
+ }
+
+ @Override
public String decode(InputStream inStream, Coder.Context context)
throws CoderException, IOException {
return StringUtf8Coder.of().decode(inStream, context);
http://git-wip-us.apache.org/repos/asf/beam/blob/b7f3341e/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
index 489493a..a8cd35e 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
@@ -277,10 +277,10 @@ public class DoFnInvokersTest {
}
@Override
- public void encode(SomeRestriction value, OutputStream outStream, Context context) {}
+ public void encode(SomeRestriction value, OutputStream outStream) {}
@Override
- public SomeRestriction decode(InputStream inStream, Context context) {
+ public SomeRestriction decode(InputStream inStream) {
return null;
}
}
@@ -400,10 +400,10 @@ public class DoFnInvokersTest {
@Override
public void encode(
- RestrictionWithDefaultTracker value, OutputStream outStream, Context context) {}
+ RestrictionWithDefaultTracker value, OutputStream outStream) {}
@Override
- public RestrictionWithDefaultTracker decode(InputStream inStream, Context context) {
+ public RestrictionWithDefaultTracker decode(InputStream inStream) {
return null;
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/b7f3341e/sdks/java/core/src/test/java/org/apache/beam/sdk/util/CoderUtilsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/CoderUtilsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/CoderUtilsTest.java
index 7230a8b..f36e5e1 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/CoderUtilsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/CoderUtilsTest.java
@@ -50,12 +50,12 @@ public class CoderUtilsTest {
}
@Override
- public void encode(Integer value, OutputStream outStream, Context context) {
+ public void encode(Integer value, OutputStream outStream) {
throw new RuntimeException("not expecting to be called");
}
@Override
- public Integer decode(InputStream inStream, Context context) {
+ public Integer decode(InputStream inStream) {
throw new RuntimeException("not expecting to be called");
}
http://git-wip-us.apache.org/repos/asf/beam/blob/b7f3341e/sdks/java/core/src/test/java/org/apache/beam/sdk/util/SerializableUtilsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/SerializableUtilsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/SerializableUtilsTest.java
index 6ba1d4a..9a80730 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/SerializableUtilsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/SerializableUtilsTest.java
@@ -89,12 +89,12 @@ public class SerializableUtilsTest {
private final Object unserializableField = new Object();
@Override
- public void encode(Object value, OutputStream outStream, Context context)
+ public void encode(Object value, OutputStream outStream)
throws CoderException, IOException {
}
@Override
- public Object decode(InputStream inStream, Context context)
+ public Object decode(InputStream inStream)
throws CoderException, IOException {
return unserializableField;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/b7f3341e/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ByteStringCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ByteStringCoder.java b/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ByteStringCoder.java
index 325c69d..73c7977 100644
--- a/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ByteStringCoder.java
+++ b/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ByteStringCoder.java
@@ -49,6 +49,12 @@ public class ByteStringCoder extends AtomicCoder<ByteString> {
private ByteStringCoder() {}
@Override
+ public void encode(ByteString value, OutputStream outStream)
+ throws IOException, CoderException {
+ encode(value, outStream, Context.NESTED);
+ }
+
+ @Override
public void encode(ByteString value, OutputStream outStream, Context context)
throws IOException, CoderException {
if (value == null) {
@@ -63,6 +69,11 @@ public class ByteStringCoder extends AtomicCoder<ByteString> {
}
@Override
+ public ByteString decode(InputStream inStream) throws IOException {
+ return decode(inStream, Context.NESTED);
+ }
+
+ @Override
public ByteString decode(InputStream inStream, Context context) throws IOException {
if (context.isWholeStream) {
return ByteString.readFrom(inStream);
http://git-wip-us.apache.org/repos/asf/beam/blob/b7f3341e/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoCoder.java b/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoCoder.java
index 968a2fa..f73bf2b 100644
--- a/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoCoder.java
+++ b/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoCoder.java
@@ -168,6 +168,12 @@ public class ProtoCoder<T extends Message> extends CustomCoder<T> {
}
@Override
+ public void encode(T value, OutputStream outStream)
+ throws IOException {
+ encode(value, outStream, Context.NESTED);
+ }
+
+ @Override
public void encode(T value, OutputStream outStream, Context context) throws IOException {
if (value == null) {
throw new CoderException("cannot encode a null " + protoMessageClass.getSimpleName());
@@ -180,6 +186,11 @@ public class ProtoCoder<T extends Message> extends CustomCoder<T> {
}
@Override
+ public T decode(InputStream inStream) throws IOException {
+ return decode(inStream, Context.NESTED);
+ }
+
+ @Override
public T decode(InputStream inStream, Context context) throws IOException {
if (context.isWholeStream) {
return getParser().parseFrom(inStream, getExtensionRegistry());
http://git-wip-us.apache.org/repos/asf/beam/blob/b7f3341e/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestinationCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestinationCoder.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestinationCoder.java
index 33b9f77..f034a03 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestinationCoder.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestinationCoder.java
@@ -38,7 +38,7 @@ public class TableDestinationCoder extends AtomicCoder<TableDestination> {
}
@Override
- public void encode(TableDestination value, OutputStream outStream, Context context)
+ public void encode(TableDestination value, OutputStream outStream)
throws IOException {
if (value == null) {
throw new CoderException("cannot encode a null value");
http://git-wip-us.apache.org/repos/asf/beam/blob/b7f3341e/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowInfoCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowInfoCoder.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowInfoCoder.java
index 8ae75c5..c4707da 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowInfoCoder.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowInfoCoder.java
@@ -38,6 +38,12 @@ class TableRowInfoCoder extends AtomicCoder<TableRowInfo> {
}
@Override
+ public void encode(TableRowInfo value, OutputStream outStream)
+ throws IOException {
+ encode(value, outStream, Context.NESTED);
+ }
+
+ @Override
public void encode(TableRowInfo value, OutputStream outStream, Context context)
throws IOException {
if (value == null) {
@@ -48,6 +54,11 @@ class TableRowInfoCoder extends AtomicCoder<TableRowInfo> {
}
@Override
+ public TableRowInfo decode(InputStream inStream) throws IOException {
+ return decode(inStream, Context.NESTED);
+ }
+
+ @Override
public TableRowInfo decode(InputStream inStream, Context context)
throws IOException {
return new TableRowInfo(
http://git-wip-us.apache.org/repos/asf/beam/blob/b7f3341e/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowJsonCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowJsonCoder.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowJsonCoder.java
index cfec991..e4b6f1f 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowJsonCoder.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowJsonCoder.java
@@ -38,6 +38,12 @@ public class TableRowJsonCoder extends AtomicCoder<TableRow> {
}
@Override
+ public void encode(TableRow value, OutputStream outStream)
+ throws IOException {
+ encode(value, outStream, Context.NESTED);
+ }
+
+ @Override
public void encode(TableRow value, OutputStream outStream, Context context)
throws IOException {
String strValue = MAPPER.writeValueAsString(value);
@@ -45,6 +51,11 @@ public class TableRowJsonCoder extends AtomicCoder<TableRow> {
}
@Override
+ public TableRow decode(InputStream inStream) throws IOException {
+ return decode(inStream, Context.NESTED);
+ }
+
+ @Override
public TableRow decode(InputStream inStream, Context context)
throws IOException {
String strValue = StringUtf8Coder.of().decode(inStream, context);
http://git-wip-us.apache.org/repos/asf/beam/blob/b7f3341e/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java
index 9e83271..f014039 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java
@@ -101,7 +101,7 @@ class WriteBundlesToFiles<DestinationT>
}
@Override
- public void encode(Result<DestinationT> value, OutputStream outStream, Context context)
+ public void encode(Result<DestinationT> value, OutputStream outStream)
throws IOException {
if (value == null) {
throw new CoderException("cannot encode a null value");
@@ -112,7 +112,7 @@ class WriteBundlesToFiles<DestinationT>
}
@Override
- public Result<DestinationT> decode(InputStream inStream, Context context) throws IOException {
+ public Result<DestinationT> decode(InputStream inStream) throws IOException {
String filename = stringCoder.decode(inStream);
long fileByteSize = longCoder.decode(inStream);
DestinationT destination = destinationCoder.decode(inStream);
http://git-wip-us.apache.org/repos/asf/beam/blob/b7f3341e/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessagePayloadOnlyCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessagePayloadOnlyCoder.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessagePayloadOnlyCoder.java
index d120f72..5df2bcf 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessagePayloadOnlyCoder.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessagePayloadOnlyCoder.java
@@ -34,12 +34,23 @@ public class PubsubMessagePayloadOnlyCoder extends CustomCoder<PubsubMessage> {
}
@Override
+ public void encode(PubsubMessage value, OutputStream outStream)
+ throws IOException {
+ encode(value, outStream, Context.NESTED);
+ }
+
+ @Override
public void encode(PubsubMessage value, OutputStream outStream, Context context)
throws IOException {
PAYLOAD_CODER.encode(value.getPayload(), outStream, context);
}
@Override
+ public PubsubMessage decode(InputStream inStream) throws IOException {
+ return decode(inStream, Context.NESTED);
+ }
+
+ @Override
public PubsubMessage decode(InputStream inStream, Context context) throws IOException {
return new PubsubMessage(
PAYLOAD_CODER.decode(inStream, context), ImmutableMap.<String, String>of());
http://git-wip-us.apache.org/repos/asf/beam/blob/b7f3341e/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageWithAttributesCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageWithAttributesCoder.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageWithAttributesCoder.java
index 5907c9e..bcf7656 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageWithAttributesCoder.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageWithAttributesCoder.java
@@ -45,6 +45,12 @@ public class PubsubMessageWithAttributesCoder extends CustomCoder<PubsubMessage>
return new PubsubMessageWithAttributesCoder();
}
+ @Override
+ public void encode(PubsubMessage value, OutputStream outStream)
+ throws IOException {
+ encode(value, outStream, Context.NESTED);
+ }
+
public void encode(PubsubMessage value, OutputStream outStream, Context context)
throws IOException {
PAYLOAD_CODER.encode(value.getPayload(), outStream);
@@ -52,6 +58,11 @@ public class PubsubMessageWithAttributesCoder extends CustomCoder<PubsubMessage>
}
@Override
+ public PubsubMessage decode(InputStream inStream) throws IOException {
+ return decode(inStream, Context.NESTED);
+ }
+
+ @Override
public PubsubMessage decode(InputStream inStream, Context context) throws IOException {
byte[] payload = PAYLOAD_CODER.decode(inStream);
Map<String, String> attributes = ATTRIBUTES_CODER.decode(inStream, context);
http://git-wip-us.apache.org/repos/asf/beam/blob/b7f3341e/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java
index ae320c7..ad38e28 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java
@@ -108,7 +108,7 @@ public class PubsubUnboundedSink extends PTransform<PCollection<PubsubMessage>,
@Override
public void encode(
- OutgoingMessage value, OutputStream outStream, Context context)
+ OutgoingMessage value, OutputStream outStream)
throws CoderException, IOException {
ByteArrayCoder.of().encode(value.elementBytes, outStream);
ATTRIBUTES_CODER.encode(value.attributes, outStream);
@@ -118,7 +118,7 @@ public class PubsubUnboundedSink extends PTransform<PCollection<PubsubMessage>,
@Override
public OutgoingMessage decode(
- InputStream inStream, Context context) throws CoderException, IOException {
+ InputStream inStream) throws CoderException, IOException {
byte[] elementBytes = ByteArrayCoder.of().decode(inStream);
Map<String, String> attributes = ATTRIBUTES_CODER.decode(inStream);
long timestampMsSinceEpoch = BigEndianLongCoder.of().decode(inStream);
http://git-wip-us.apache.org/repos/asf/beam/blob/b7f3341e/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java
index e53976e..db8c1b7 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java
@@ -369,6 +369,12 @@ public class PubsubUnboundedSource extends PTransform<PBegin, PCollection<Pubsub
private PubsubCheckpointCoder() {}
@Override
+ public void encode(PubsubCheckpoint value, OutputStream outStream)
+ throws IOException {
+ encode(value, outStream, Context.NESTED);
+ }
+
+ @Override
public void encode(PubsubCheckpoint value, OutputStream outStream, Context context)
throws IOException {
SUBSCRIPTION_PATH_CODER.encode(
@@ -379,6 +385,11 @@ public class PubsubUnboundedSource extends PTransform<PBegin, PCollection<Pubsub
}
@Override
+ public PubsubCheckpoint decode(InputStream inStream) throws IOException {
+ return decode(inStream, Context.NESTED);
+ }
+
+ @Override
public PubsubCheckpoint decode(InputStream inStream, Context context) throws IOException {
String path = SUBSCRIPTION_PATH_CODER.decode(inStream);
List<String> notYetReadIds = LIST_CODER.decode(inStream, context);
http://git-wip-us.apache.org/repos/asf/beam/blob/b7f3341e/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
index d60c721..70d5377 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
@@ -748,12 +748,23 @@ public class BigQueryIOTest implements Serializable {
*/
private static class PartitionedGlobalWindowCoder extends AtomicCoder<PartitionedGlobalWindow> {
@Override
+ public void encode(PartitionedGlobalWindow window, OutputStream outStream)
+ throws IOException, CoderException {
+ encode(window, outStream, Context.NESTED);
+ }
+
+ @Override
public void encode(PartitionedGlobalWindow window, OutputStream outStream, Context context)
throws IOException, CoderException {
StringUtf8Coder.of().encode(window.value, outStream, context);
}
@Override
+ public PartitionedGlobalWindow decode(InputStream inStream) throws IOException, CoderException {
+ return decode(inStream, Context.NESTED);
+ }
+
+ @Override
public PartitionedGlobalWindow decode(InputStream inStream, Context context)
throws IOException, CoderException {
return new PartitionedGlobalWindow(StringUtf8Coder.of().decode(inStream, context));
http://git-wip-us.apache.org/repos/asf/beam/blob/b7f3341e/sdks/java/io/hadoop-common/src/main/java/org/apache/beam/sdk/io/hadoop/WritableCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop-common/src/main/java/org/apache/beam/sdk/io/hadoop/WritableCoder.java b/sdks/java/io/hadoop-common/src/main/java/org/apache/beam/sdk/io/hadoop/WritableCoder.java
index 8fddfe0..8d2598a 100644
--- a/sdks/java/io/hadoop-common/src/main/java/org/apache/beam/sdk/io/hadoop/WritableCoder.java
+++ b/sdks/java/io/hadoop-common/src/main/java/org/apache/beam/sdk/io/hadoop/WritableCoder.java
@@ -68,13 +68,13 @@ public class WritableCoder<T extends Writable> extends CustomCoder<T> {
}
@Override
- public void encode(T value, OutputStream outStream, Context context) throws IOException {
+ public void encode(T value, OutputStream outStream) throws IOException {
value.write(new DataOutputStream(outStream));
}
@SuppressWarnings("unchecked")
@Override
- public T decode(InputStream inStream, Context context) throws IOException {
+ public T decode(InputStream inStream) throws IOException {
try {
if (type == NullWritable.class) {
// NullWritable has no default constructor
http://git-wip-us.apache.org/repos/asf/beam/blob/b7f3341e/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseMutationCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseMutationCoder.java b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseMutationCoder.java
index 7cc043c..501fe09 100644
--- a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseMutationCoder.java
+++ b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseMutationCoder.java
@@ -44,16 +44,14 @@ class HBaseMutationCoder extends AtomicCoder<Mutation> implements Serializable {
}
@Override
- public void encode(Mutation mutation, OutputStream outStream,
- Coder.Context context) throws IOException {
+ public void encode(Mutation mutation, OutputStream outStream) throws IOException {
MutationType type = getType(mutation);
MutationProto proto = ProtobufUtil.toMutation(type, mutation);
proto.writeDelimitedTo(outStream);
}
@Override
- public Mutation decode(InputStream inStream,
- Coder.Context context) throws IOException {
+ public Mutation decode(InputStream inStream) throws IOException {
return ProtobufUtil.toMutation(MutationProto.parseDelimitedFrom(inStream));
}
http://git-wip-us.apache.org/repos/asf/beam/blob/b7f3341e/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseResultCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseResultCoder.java b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseResultCoder.java
index 24a5f7f..1d06635 100644
--- a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseResultCoder.java
+++ b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseResultCoder.java
@@ -41,13 +41,13 @@ class HBaseResultCoder extends AtomicCoder<Result> implements Serializable {
}
@Override
- public void encode(Result value, OutputStream outputStream, Coder.Context context)
+ public void encode(Result value, OutputStream outputStream)
throws IOException {
ProtobufUtil.toResult(value).writeDelimitedTo(outputStream);
}
@Override
- public Result decode(InputStream inputStream, Coder.Context context)
+ public Result decode(InputStream inputStream)
throws IOException {
return ProtobufUtil.toResult(ClientProtos.Result.parseDelimitedFrom(inputStream));
}
http://git-wip-us.apache.org/repos/asf/beam/blob/b7f3341e/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
index ba84c2a..e21945f 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
@@ -1597,13 +1597,13 @@ public class KafkaIO {
private static class NullOnlyCoder<T> extends AtomicCoder<T> {
@Override
- public void encode(T value, OutputStream outStream, Context context) {
+ public void encode(T value, OutputStream outStream) {
checkArgument(value == null, "Can only encode nulls");
// Encode as no bytes.
}
@Override
- public T decode(InputStream inStream, Context context) {
+ public T decode(InputStream inStream) {
return null;
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/b7f3341e/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecordCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecordCoder.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecordCoder.java
index d838a0d..1971060 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecordCoder.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecordCoder.java
@@ -50,6 +50,12 @@ public class KafkaRecordCoder<K, V> extends StructuredCoder<KafkaRecord<K, V>> {
}
@Override
+ public void encode(KafkaRecord<K, V> value, OutputStream outStream)
+ throws CoderException, IOException {
+ encode(value, outStream, Context.NESTED);
+ }
+
+ @Override
public void encode(KafkaRecord<K, V> value, OutputStream outStream, Context context)
throws CoderException, IOException {
Context nested = context.nested();
@@ -61,6 +67,11 @@ public class KafkaRecordCoder<K, V> extends StructuredCoder<KafkaRecord<K, V>> {
}
@Override
+ public KafkaRecord<K, V> decode(InputStream inStream) throws CoderException, IOException {
+ return decode(inStream, Context.NESTED);
+ }
+
+ @Override
public KafkaRecord<K, V> decode(InputStream inStream, Context context)
throws CoderException, IOException {
Context nested = context.nested();
http://git-wip-us.apache.org/repos/asf/beam/blob/b7f3341e/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisRecordCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisRecordCoder.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisRecordCoder.java
index c6a0174..f233e27 100644
--- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisRecordCoder.java
+++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisRecordCoder.java
@@ -43,7 +43,7 @@ class KinesisRecordCoder extends AtomicCoder<KinesisRecord> {
}
@Override
- public void encode(KinesisRecord value, OutputStream outStream, Context context) throws
+ public void encode(KinesisRecord value, OutputStream outStream) throws
IOException {
BYTE_ARRAY_CODER.encode(value.getData().array(), outStream);
STRING_CODER.encode(value.getSequenceNumber(), outStream);
@@ -56,7 +56,7 @@ class KinesisRecordCoder extends AtomicCoder<KinesisRecord> {
}
@Override
- public KinesisRecord decode(InputStream inStream, Context context) throws IOException {
+ public KinesisRecord decode(InputStream inStream) throws IOException {
ByteBuffer data = ByteBuffer.wrap(BYTE_ARRAY_CODER.decode(inStream));
String sequenceNumber = STRING_CODER.decode(inStream);
String partitionKey = STRING_CODER.decode(inStream);
http://git-wip-us.apache.org/repos/asf/beam/blob/b7f3341e/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/JAXBCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/JAXBCoder.java b/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/JAXBCoder.java
index 5b2ec02..d4c0440 100644
--- a/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/JAXBCoder.java
+++ b/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/JAXBCoder.java
@@ -88,15 +88,8 @@ public class JAXBCoder<T> extends CustomCoder<T> {
}
@Override
- public void encode(T value, OutputStream outStream) throws IOException {
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- try {
- jaxbMarshaller.get().marshal(value, baos);
- } catch (JAXBException e) {
- throw new CoderException(e);
- }
- VarInt.encode(baos.size(), outStream);
- baos.writeTo(outStream);
+ public void encode(T value, OutputStream outStream) throws CoderException, IOException {
+ encode(value, outStream, Context.NESTED);
}
@Override
@@ -109,11 +102,23 @@ public class JAXBCoder<T> extends CustomCoder<T> {
throw new CoderException(e);
}
} else {
- encode(value, outStream);
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ try {
+ jaxbMarshaller.get().marshal(value, baos);
+ } catch (JAXBException e) {
+ throw new CoderException(e);
+ }
+ VarInt.encode(baos.size(), outStream);
+ baos.writeTo(outStream);
}
}
@Override
+ public T decode(InputStream inStream) throws CoderException, IOException {
+ return decode(inStream, Context.NESTED);
+ }
+
+ @Override
public T decode(InputStream inStream, Context context) throws CoderException, IOException {
try {
if (!context.isWholeStream) {
http://git-wip-us.apache.org/repos/asf/beam/blob/b7f3341e/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/JAXBCoderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/JAXBCoderTest.java b/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/JAXBCoderTest.java
index 5386a61..c175e4a 100644
--- a/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/JAXBCoderTest.java
+++ b/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/JAXBCoderTest.java
@@ -178,18 +178,27 @@ public class JAXBCoderTest {
}
@Override
+ public void encode(TestType value, OutputStream outStream)
+ throws CoderException, IOException {
+ encode(value, outStream, Context.NESTED);
+ }
+
+ @Override
public void encode(TestType value, OutputStream outStream, Context context)
throws CoderException, IOException {
- Context nestedContext = context.nested();
VarIntCoder.of().encode(3, outStream);
jaxbCoder.encode(value, outStream);
VarLongCoder.of().encode(22L, outStream, context);
}
@Override
+ public TestType decode(InputStream inStream) throws CoderException, IOException {
+ return decode(inStream, Context.NESTED);
+ }
+
+ @Override
public TestType decode(InputStream inStream, Context context)
throws CoderException, IOException {
- Context nestedContext = context.nested();
VarIntCoder.of().decode(inStream);
TestType result = jaxbCoder.decode(inStream);
VarLongCoder.of().decode(inStream, context);