You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tg...@apache.org on 2017/04/25 01:03:24 UTC
[1/3] beam git commit: This closes #2612
Repository: beam
Updated Branches:
refs/heads/master 02e60a5c5 -> 7e6f1b78b
This closes #2612
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/7e6f1b78
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/7e6f1b78
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/7e6f1b78
Branch: refs/heads/master
Commit: 7e6f1b78bb7962d117a847fa75556710ad075799
Parents: 02e60a5 7ad9efc
Author: Thomas Groh <tg...@google.com>
Authored: Mon Apr 24 16:47:29 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Mon Apr 24 16:47:29 2017 -0700
----------------------------------------------------------------------
.../direct/CloningBundleFactoryTest.java | 10 ++---
.../beam/runners/direct/DirectRunnerTest.java | 5 +--
.../UnboundedReadEvaluatorFactoryTest.java | 4 +-
.../runners/dataflow/internal/IsmFormat.java | 41 +++++++++++++++-----
.../runners/dataflow/util/RandomAccessData.java | 14 +++++--
.../apache/beam/sdk/coders/BigDecimalCoder.java | 31 +++++++++------
.../beam/sdk/coders/BigEndianIntegerCoder.java | 12 ++++--
.../beam/sdk/coders/BigEndianLongCoder.java | 10 ++++-
.../apache/beam/sdk/coders/BigIntegerCoder.java | 23 +++++++----
.../apache/beam/sdk/coders/ByteArrayCoder.java | 18 ++++++++-
.../org/apache/beam/sdk/coders/ByteCoder.java | 9 +++--
.../apache/beam/sdk/coders/ByteStringCoder.java | 7 ++--
.../org/apache/beam/sdk/coders/CustomCoder.java | 22 ++++++++++-
.../org/apache/beam/sdk/coders/DoubleCoder.java | 9 +++--
.../apache/beam/sdk/coders/DurationCoder.java | 19 +++++----
.../apache/beam/sdk/coders/InstantCoder.java | 25 +++++++-----
.../beam/sdk/coders/SerializableCoder.java | 2 +-
.../apache/beam/sdk/coders/StringUtf8Coder.java | 7 ++--
.../beam/sdk/coders/TextualIntegerCoder.java | 14 +++++--
.../org/apache/beam/sdk/coders/VarIntCoder.java | 12 ++++--
.../apache/beam/sdk/coders/VarLongCoder.java | 22 +++++++++--
.../org/apache/beam/sdk/coders/VoidCoder.java | 5 ++-
.../org/apache/beam/sdk/io/FileBasedSink.java | 9 +----
.../org/apache/beam/sdk/transforms/Mean.java | 10 ++++-
.../sdk/transforms/windowing/GlobalWindow.java | 17 +++++++-
.../transforms/windowing/IntervalWindow.java | 20 ++++++++--
.../beam/sdk/transforms/windowing/PaneInfo.java | 7 +++-
.../org/apache/beam/sdk/util/BitSetCoder.java | 13 +++----
.../apache/beam/sdk/testing/PAssertTest.java | 4 +-
.../sdk/testing/SerializableMatchersTest.java | 4 +-
.../beam/sdk/testing/WindowSupplierTest.java | 4 +-
.../apache/beam/sdk/transforms/CreateTest.java | 3 +-
.../beam/sdk/transforms/GroupByKeyTest.java | 9 +++--
.../apache/beam/sdk/transforms/ParDoTest.java | 6 ++-
.../apache/beam/sdk/util/CoderUtilsTest.java | 4 +-
.../sdk/extensions/protobuf/ProtoCoder.java | 4 +-
.../io/gcp/bigquery/TableDestinationCoder.java | 6 +--
.../sdk/io/gcp/bigquery/TableRowInfoCoder.java | 6 +--
.../sdk/io/gcp/bigquery/TableRowJsonCoder.java | 12 ++++--
.../io/gcp/bigquery/WriteBundlesToFiles.java | 7 ++--
.../sdk/io/gcp/pubsub/PubsubUnboundedSink.java | 4 +-
.../io/gcp/pubsub/PubsubUnboundedSource.java | 4 +-
.../sdk/io/gcp/bigquery/BigQueryIOTest.java | 10 +++--
.../beam/sdk/io/hbase/HBaseMutationCoder.java | 5 +--
.../beam/sdk/io/hbase/HBaseResultCoder.java | 5 +--
sdks/java/io/kafka/pom.xml | 5 ---
.../org/apache/beam/sdk/io/kafka/KafkaIO.java | 4 +-
.../beam/sdk/io/kafka/KafkaRecordCoder.java | 16 +-------
.../beam/sdk/io/kinesis/KinesisRecordCoder.java | 4 +-
sdks/java/io/xml/pom.xml | 5 ---
.../org/apache/beam/sdk/io/xml/JAXBCoder.java | 39 ++-----------------
51 files changed, 342 insertions(+), 225 deletions(-)
----------------------------------------------------------------------
[3/3] beam git commit: Stop Extending AtomicCoder
Posted by tg...@apache.org.
Stop Extending AtomicCoder
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/7ad9efc7
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/7ad9efc7
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/7ad9efc7
Branch: refs/heads/master
Commit: 7ad9efc7a678f042d67cd90536f21aca4736e4c3
Parents: 02e60a5
Author: Thomas Groh <tg...@google.com>
Authored: Thu Apr 20 09:39:12 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Mon Apr 24 16:47:29 2017 -0700
----------------------------------------------------------------------
.../direct/CloningBundleFactoryTest.java | 10 ++---
.../beam/runners/direct/DirectRunnerTest.java | 5 +--
.../UnboundedReadEvaluatorFactoryTest.java | 4 +-
.../runners/dataflow/internal/IsmFormat.java | 41 +++++++++++++++-----
.../runners/dataflow/util/RandomAccessData.java | 14 +++++--
.../apache/beam/sdk/coders/BigDecimalCoder.java | 31 +++++++++------
.../beam/sdk/coders/BigEndianIntegerCoder.java | 12 ++++--
.../beam/sdk/coders/BigEndianLongCoder.java | 10 ++++-
.../apache/beam/sdk/coders/BigIntegerCoder.java | 23 +++++++----
.../apache/beam/sdk/coders/ByteArrayCoder.java | 18 ++++++++-
.../org/apache/beam/sdk/coders/ByteCoder.java | 9 +++--
.../apache/beam/sdk/coders/ByteStringCoder.java | 7 ++--
.../org/apache/beam/sdk/coders/CustomCoder.java | 22 ++++++++++-
.../org/apache/beam/sdk/coders/DoubleCoder.java | 9 +++--
.../apache/beam/sdk/coders/DurationCoder.java | 19 +++++----
.../apache/beam/sdk/coders/InstantCoder.java | 25 +++++++-----
.../beam/sdk/coders/SerializableCoder.java | 2 +-
.../apache/beam/sdk/coders/StringUtf8Coder.java | 7 ++--
.../beam/sdk/coders/TextualIntegerCoder.java | 14 +++++--
.../org/apache/beam/sdk/coders/VarIntCoder.java | 12 ++++--
.../apache/beam/sdk/coders/VarLongCoder.java | 22 +++++++++--
.../org/apache/beam/sdk/coders/VoidCoder.java | 5 ++-
.../org/apache/beam/sdk/io/FileBasedSink.java | 9 +----
.../org/apache/beam/sdk/transforms/Mean.java | 10 ++++-
.../sdk/transforms/windowing/GlobalWindow.java | 17 +++++++-
.../transforms/windowing/IntervalWindow.java | 20 ++++++++--
.../beam/sdk/transforms/windowing/PaneInfo.java | 7 +++-
.../org/apache/beam/sdk/util/BitSetCoder.java | 13 +++----
.../apache/beam/sdk/testing/PAssertTest.java | 4 +-
.../sdk/testing/SerializableMatchersTest.java | 4 +-
.../beam/sdk/testing/WindowSupplierTest.java | 4 +-
.../apache/beam/sdk/transforms/CreateTest.java | 3 +-
.../beam/sdk/transforms/GroupByKeyTest.java | 9 +++--
.../apache/beam/sdk/transforms/ParDoTest.java | 6 ++-
.../apache/beam/sdk/util/CoderUtilsTest.java | 4 +-
.../sdk/extensions/protobuf/ProtoCoder.java | 4 +-
.../io/gcp/bigquery/TableDestinationCoder.java | 6 +--
.../sdk/io/gcp/bigquery/TableRowInfoCoder.java | 6 +--
.../sdk/io/gcp/bigquery/TableRowJsonCoder.java | 12 ++++--
.../io/gcp/bigquery/WriteBundlesToFiles.java | 7 ++--
.../sdk/io/gcp/pubsub/PubsubUnboundedSink.java | 4 +-
.../io/gcp/pubsub/PubsubUnboundedSource.java | 4 +-
.../sdk/io/gcp/bigquery/BigQueryIOTest.java | 10 +++--
.../beam/sdk/io/hbase/HBaseMutationCoder.java | 5 +--
.../beam/sdk/io/hbase/HBaseResultCoder.java | 5 +--
sdks/java/io/kafka/pom.xml | 5 ---
.../org/apache/beam/sdk/io/kafka/KafkaIO.java | 4 +-
.../beam/sdk/io/kafka/KafkaRecordCoder.java | 16 +-------
.../beam/sdk/io/kinesis/KinesisRecordCoder.java | 4 +-
sdks/java/io/xml/pom.xml | 5 ---
.../org/apache/beam/sdk/io/xml/JAXBCoder.java | 39 ++-----------------
51 files changed, 342 insertions(+), 225 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/7ad9efc7/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CloningBundleFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CloningBundleFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CloningBundleFactoryTest.java
index 3d14a12..c6054b6 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CloningBundleFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CloningBundleFactoryTest.java
@@ -33,8 +33,8 @@ import java.io.InputStream;
import java.io.OutputStream;
import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
-import org.apache.beam.sdk.coders.AtomicCoder;
import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.CustomCoder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarIntCoder;
@@ -175,7 +175,7 @@ public class CloningBundleFactoryTest {
}
static class Record {}
- static class RecordNoEncodeCoder extends AtomicCoder<Record> {
+ static class RecordNoEncodeCoder extends CustomCoder<Record> {
@Override
public void encode(
@@ -194,7 +194,7 @@ public class CloningBundleFactoryTest {
}
}
- static class RecordNoDecodeCoder extends AtomicCoder<Record> {
+ static class RecordNoDecodeCoder extends CustomCoder<Record> {
@Override
public void encode(
Record value,
@@ -210,7 +210,7 @@ public class CloningBundleFactoryTest {
}
}
- private static class RecordStructuralValueCoder extends AtomicCoder<Record> {
+ private static class RecordStructuralValueCoder extends CustomCoder<Record> {
@Override
public void encode(
Record value,
@@ -242,7 +242,7 @@ public class CloningBundleFactoryTest {
}
private static class RecordNotConsistentWithEqualsStructuralValueCoder
- extends AtomicCoder<Record> {
+ extends CustomCoder<Record> {
@Override
public void encode(
Record value,
http://git-wip-us.apache.org/repos/asf/beam/blob/7ad9efc7/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
index 86d7f05..c55f84a 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
@@ -44,9 +44,9 @@ import org.apache.beam.sdk.AggregatorRetrievalException;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.PipelineResult.State;
-import org.apache.beam.sdk.coders.AtomicCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.CustomCoder;
import org.apache.beam.sdk.coders.ListCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarIntCoder;
@@ -530,7 +530,6 @@ public class DirectRunnerTest implements Serializable {
p.run();
}
-
@Test
public void testAggregatorNotPresentInGraph() throws AggregatorRetrievalException {
Pipeline p = getPipeline();
@@ -560,7 +559,7 @@ public class DirectRunnerTest implements Serializable {
}
}
- private static class LongNoDecodeCoder extends AtomicCoder<Long> {
+ private static class LongNoDecodeCoder extends CustomCoder<Long> {
@Override
public void encode(
Long value, OutputStream outStream, Context context) throws IOException {
http://git-wip-us.apache.org/repos/asf/beam/blob/7ad9efc7/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
index 567ee98..521ba3f 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
@@ -48,10 +48,10 @@ import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
import org.apache.beam.runners.direct.UnboundedReadDeduplicator.NeverDeduplicator;
import org.apache.beam.runners.direct.UnboundedReadEvaluatorFactory.UnboundedSourceShard;
-import org.apache.beam.sdk.coders.AtomicCoder;
import org.apache.beam.sdk.coders.BigEndianLongCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.CustomCoder;
import org.apache.beam.sdk.coders.VarLongCoder;
import org.apache.beam.sdk.io.CountingSource;
import org.apache.beam.sdk.io.Read;
@@ -588,7 +588,7 @@ public class UnboundedReadEvaluatorFactoryTest {
return finalized;
}
- public static class Coder extends AtomicCoder<TestCheckpointMark> {
+ public static class Coder extends CustomCoder<TestCheckpointMark> {
@Override
public void encode(
TestCheckpointMark value,
http://git-wip-us.apache.org/repos/asf/beam/blob/7ad9efc7/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/IsmFormat.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/IsmFormat.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/IsmFormat.java
index 33c27f8..97824dc 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/IsmFormat.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/IsmFormat.java
@@ -37,11 +37,11 @@ import java.util.ArrayList;
import java.util.List;
import javax.annotation.Nullable;
import org.apache.beam.runners.dataflow.util.RandomAccessData;
-import org.apache.beam.sdk.coders.AtomicCoder;
import org.apache.beam.sdk.coders.ByteArrayCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.Coder.NonDeterministicException;
import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.CustomCoder;
import org.apache.beam.sdk.coders.ListCoder;
import org.apache.beam.sdk.coders.StandardCoder;
import org.apache.beam.sdk.coders.VarIntCoder;
@@ -628,17 +628,15 @@ public class IsmFormat {
* <li>indexOffset (variable length long encoding)</li>
* </ul>
*/
- public static class IsmShardCoder extends AtomicCoder<IsmShard> {
+ public static class IsmShardCoder extends CustomCoder<IsmShard> {
private static final IsmShardCoder INSTANCE = new IsmShardCoder();
/** Returns an IsmShardCoder. */
- @JsonCreator
public static IsmShardCoder of() {
return INSTANCE;
}
- private IsmShardCoder() {
- }
+ private IsmShardCoder() {}
@Override
public void encode(IsmShard value, OutputStream outStream, Coder.Context context)
@@ -661,9 +659,20 @@ public class IsmFormat {
}
@Override
+ public void verifyDeterministic() {
+ VarIntCoder.of().verifyDeterministic();
+ VarLongCoder.of().verifyDeterministic();
+ }
+
+ @Override
public boolean consistentWithEquals() {
return true;
}
+
+ @Override
+ public String getEncodingId() {
+ return "";
+ }
}
/**
@@ -689,10 +698,9 @@ public class IsmFormat {
}
/** A {@link Coder} for {@link KeyPrefix}. */
- public static final class KeyPrefixCoder extends AtomicCoder<KeyPrefix> {
+ public static final class KeyPrefixCoder extends CustomCoder<KeyPrefix> {
private static final KeyPrefixCoder INSTANCE = new KeyPrefixCoder();
- @JsonCreator
public static KeyPrefixCoder of() {
return INSTANCE;
}
@@ -711,6 +719,9 @@ public class IsmFormat {
}
@Override
+ public void verifyDeterministic() {}
+
+ @Override
public boolean consistentWithEquals() {
return true;
}
@@ -727,6 +738,11 @@ public class IsmFormat {
return VarInt.getLength(value.getSharedKeySize())
+ VarInt.getLength(value.getUnsharedKeySize());
}
+
+ @Override
+ public String getEncodingId() {
+ return "";
+ }
}
/**
@@ -759,10 +775,9 @@ public class IsmFormat {
}
/** A {@link Coder} for {@link Footer}. */
- public static final class FooterCoder extends AtomicCoder<Footer> {
+ public static final class FooterCoder extends CustomCoder<Footer> {
private static final FooterCoder INSTANCE = new FooterCoder();
- @JsonCreator
public static FooterCoder of() {
return INSTANCE;
}
@@ -791,6 +806,9 @@ public class IsmFormat {
}
@Override
+ public void verifyDeterministic() {}
+
+ @Override
public boolean consistentWithEquals() {
return true;
}
@@ -805,5 +823,10 @@ public class IsmFormat {
throws Exception {
return Footer.FIXED_LENGTH;
}
+
+ @Override
+ public String getEncodingId() {
+ return "";
+ }
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/7ad9efc7/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/RandomAccessData.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/RandomAccessData.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/RandomAccessData.java
index 11eec19..4b07ca2 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/RandomAccessData.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/RandomAccessData.java
@@ -20,7 +20,6 @@ package org.apache.beam.runners.dataflow.util;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
-import com.fasterxml.jackson.annotation.JsonCreator;
import com.google.common.base.MoreObjects;
import com.google.common.io.ByteStreams;
import com.google.common.primitives.UnsignedBytes;
@@ -31,10 +30,10 @@ import java.io.OutputStream;
import java.util.Arrays;
import java.util.Comparator;
import javax.annotation.concurrent.NotThreadSafe;
-import org.apache.beam.sdk.coders.AtomicCoder;
import org.apache.beam.sdk.coders.ByteArrayCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.CustomCoder;
import org.apache.beam.sdk.util.VarInt;
/**
@@ -56,10 +55,9 @@ public class RandomAccessData {
*
* <p>This coder does not support encoding positive infinity.
*/
- public static class RandomAccessDataCoder extends AtomicCoder<RandomAccessData> {
+ public static class RandomAccessDataCoder extends CustomCoder<RandomAccessData> {
private static final RandomAccessDataCoder INSTANCE = new RandomAccessDataCoder();
- @JsonCreator
public static RandomAccessDataCoder of() {
return INSTANCE;
}
@@ -90,6 +88,9 @@ public class RandomAccessData {
}
@Override
+ public void verifyDeterministic() {}
+
+ @Override
public boolean consistentWithEquals() {
return true;
}
@@ -112,6 +113,11 @@ public class RandomAccessData {
}
return size + value.size;
}
+
+ @Override
+ public String getEncodingId() {
+ return "";
+ }
}
public static final UnsignedLexicographicalComparator UNSIGNED_LEXICOGRAPHICAL_COMPARATOR =
http://git-wip-us.apache.org/repos/asf/beam/blob/7ad9efc7/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigDecimalCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigDecimalCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigDecimalCoder.java
index 36c8265..90ace99 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigDecimalCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigDecimalCoder.java
@@ -19,7 +19,6 @@ package org.apache.beam.sdk.coders;
import static com.google.common.base.Preconditions.checkNotNull;
-import com.fasterxml.jackson.annotation.JsonCreator;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
@@ -33,9 +32,8 @@ import java.math.MathContext;
* {@link BigInteger}, when scaled (with unlimited precision, aka {@link MathContext#UNLIMITED}),
* yields the expected {@link BigDecimal}.
*/
-public class BigDecimalCoder extends AtomicCoder<BigDecimal> {
+public class BigDecimalCoder extends CustomCoder<BigDecimal> {
- @JsonCreator
public static BigDecimalCoder of() {
return INSTANCE;
}
@@ -44,8 +42,8 @@ public class BigDecimalCoder extends AtomicCoder<BigDecimal> {
private static final BigDecimalCoder INSTANCE = new BigDecimalCoder();
- private final VarIntCoder integerCoder = VarIntCoder.of();
- private final BigIntegerCoder bigIntegerCoder = BigIntegerCoder.of();
+ private static final VarIntCoder VAR_INT_CODER = VarIntCoder.of();
+ private static final BigIntegerCoder BIG_INT_CODER = BigIntegerCoder.of();
private BigDecimalCoder() {}
@@ -53,18 +51,24 @@ public class BigDecimalCoder extends AtomicCoder<BigDecimal> {
public void encode(BigDecimal value, OutputStream outStream, Context context)
throws IOException, CoderException {
checkNotNull(value, String.format("cannot encode a null %s", BigDecimal.class.getSimpleName()));
- integerCoder.encode(value.scale(), outStream, context.nested());
- bigIntegerCoder.encode(value.unscaledValue(), outStream, context);
+ VAR_INT_CODER.encode(value.scale(), outStream, context.nested());
+ BIG_INT_CODER.encode(value.unscaledValue(), outStream, context);
}
@Override
public BigDecimal decode(InputStream inStream, Context context)
throws IOException, CoderException {
- int scale = integerCoder.decode(inStream, context.nested());
- BigInteger bigInteger = bigIntegerCoder.decode(inStream, context);
+ int scale = VAR_INT_CODER.decode(inStream, context.nested());
+ BigInteger bigInteger = BIG_INT_CODER.decode(inStream, context);
return new BigDecimal(bigInteger, scale);
}
+ @Override
+ public void verifyDeterministic() throws NonDeterministicException {
+ VAR_INT_CODER.verifyDeterministic();
+ BIG_INT_CODER.verifyDeterministic();
+ }
+
/**
* {@inheritDoc}
*
@@ -75,6 +79,11 @@ public class BigDecimalCoder extends AtomicCoder<BigDecimal> {
return true;
}
+ @Override
+ public String getEncodingId() {
+ return "";
+ }
+
/**
* {@inheritDoc}
*
@@ -95,7 +104,7 @@ public class BigDecimalCoder extends AtomicCoder<BigDecimal> {
@Override
protected long getEncodedElementByteSize(BigDecimal value, Context context) throws Exception {
checkNotNull(value, String.format("cannot encode a null %s", BigDecimal.class.getSimpleName()));
- return integerCoder.getEncodedElementByteSize(value.scale(), context.nested())
- + bigIntegerCoder.getEncodedElementByteSize(value.unscaledValue(), context);
+ return VAR_INT_CODER.getEncodedElementByteSize(value.scale(), context.nested())
+ + BIG_INT_CODER.getEncodedElementByteSize(value.unscaledValue(), context);
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/7ad9efc7/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigEndianIntegerCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigEndianIntegerCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigEndianIntegerCoder.java
index 2922416..8f45a56 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigEndianIntegerCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigEndianIntegerCoder.java
@@ -17,7 +17,6 @@
*/
package org.apache.beam.sdk.coders;
-import com.fasterxml.jackson.annotation.JsonCreator;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.EOFException;
@@ -30,9 +29,8 @@ import org.apache.beam.sdk.values.TypeDescriptor;
/**
* A {@link BigEndianIntegerCoder} encodes {@link Integer Integers} in 4 bytes, big-endian.
*/
-public class BigEndianIntegerCoder extends AtomicCoder<Integer> {
+public class BigEndianIntegerCoder extends CustomCoder<Integer> {
- @JsonCreator
public static BigEndianIntegerCoder of() {
return INSTANCE;
}
@@ -65,6 +63,9 @@ public class BigEndianIntegerCoder extends AtomicCoder<Integer> {
}
}
+ @Override
+ public void verifyDeterministic() {}
+
/**
* {@inheritDoc}
*
@@ -75,6 +76,11 @@ public class BigEndianIntegerCoder extends AtomicCoder<Integer> {
return true;
}
+ @Override
+ public String getEncodingId() {
+ return "";
+ }
+
/**
* {@inheritDoc}
*
http://git-wip-us.apache.org/repos/asf/beam/blob/7ad9efc7/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigEndianLongCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigEndianLongCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigEndianLongCoder.java
index 26aadde..2d47697 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigEndianLongCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigEndianLongCoder.java
@@ -30,7 +30,7 @@ import org.apache.beam.sdk.values.TypeDescriptor;
/**
* A {@link BigEndianLongCoder} encodes {@link Long}s in 8 bytes, big-endian.
*/
-public class BigEndianLongCoder extends AtomicCoder<Long> {
+public class BigEndianLongCoder extends CustomCoder<Long> {
@JsonCreator
public static BigEndianLongCoder of() {
@@ -65,6 +65,9 @@ public class BigEndianLongCoder extends AtomicCoder<Long> {
}
}
+ @Override
+ public void verifyDeterministic() {}
+
/**
* {@inheritDoc}
*
@@ -75,6 +78,11 @@ public class BigEndianLongCoder extends AtomicCoder<Long> {
return true;
}
+ @Override
+ public String getEncodingId() {
+ return "";
+ }
+
/**
* {@inheritDoc}
*
http://git-wip-us.apache.org/repos/asf/beam/blob/7ad9efc7/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigIntegerCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigIntegerCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigIntegerCoder.java
index daba983..40331b8 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigIntegerCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigIntegerCoder.java
@@ -19,7 +19,6 @@ package org.apache.beam.sdk.coders;
import static com.google.common.base.Preconditions.checkNotNull;
-import com.fasterxml.jackson.annotation.JsonCreator;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
@@ -29,9 +28,8 @@ import java.math.BigInteger;
* A {@link BigIntegerCoder} encodes a {@link BigInteger} as a byte array containing the big endian
* two's-complement representation, encoded via {@link ByteArrayCoder}.
*/
-public class BigIntegerCoder extends AtomicCoder<BigInteger> {
+public class BigIntegerCoder extends CustomCoder<BigInteger> {
- @JsonCreator
public static BigIntegerCoder of() {
return INSTANCE;
}
@@ -39,22 +37,26 @@ public class BigIntegerCoder extends AtomicCoder<BigInteger> {
/////////////////////////////////////////////////////////////////////////////
private static final BigIntegerCoder INSTANCE = new BigIntegerCoder();
+ private static final ByteArrayCoder BYTE_ARRAY_CODER = ByteArrayCoder.of();
private BigIntegerCoder() {}
- private final ByteArrayCoder byteArrayCoder = ByteArrayCoder.of();
-
@Override
public void encode(BigInteger value, OutputStream outStream, Context context)
throws IOException, CoderException {
checkNotNull(value, String.format("cannot encode a null %s", BigInteger.class.getSimpleName()));
- byteArrayCoder.encode(value.toByteArray(), outStream, context);
+ BYTE_ARRAY_CODER.encode(value.toByteArray(), outStream, context);
}
@Override
public BigInteger decode(InputStream inStream, Context context)
throws IOException, CoderException {
- return new BigInteger(byteArrayCoder.decode(inStream, context));
+ return new BigInteger(BYTE_ARRAY_CODER.decode(inStream, context));
+ }
+
+ @Override
+ public void verifyDeterministic() throws NonDeterministicException {
+ BYTE_ARRAY_CODER.verifyDeterministic();
}
/**
@@ -67,6 +69,11 @@ public class BigIntegerCoder extends AtomicCoder<BigInteger> {
return true;
}
+ @Override
+ public String getEncodingId() {
+ return "";
+ }
+
/**
* {@inheritDoc}
*
@@ -85,6 +92,6 @@ public class BigIntegerCoder extends AtomicCoder<BigInteger> {
@Override
protected long getEncodedElementByteSize(BigInteger value, Context context) throws Exception {
checkNotNull(value, String.format("cannot encode a null %s", BigInteger.class.getSimpleName()));
- return byteArrayCoder.getEncodedElementByteSize(value.toByteArray(), context);
+ return BYTE_ARRAY_CODER.getEncodedElementByteSize(value.toByteArray(), context);
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/7ad9efc7/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ByteArrayCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ByteArrayCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ByteArrayCoder.java
index a9449c6..dd34f28 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ByteArrayCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ByteArrayCoder.java
@@ -22,6 +22,8 @@ import com.google.common.io.ByteStreams;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
+import java.util.Collections;
+import java.util.List;
import org.apache.beam.sdk.util.ExposedByteArrayOutputStream;
import org.apache.beam.sdk.util.StreamUtils;
import org.apache.beam.sdk.util.VarInt;
@@ -38,13 +40,19 @@ import org.apache.beam.sdk.values.TypeDescriptor;
* encoded via a {@link VarIntCoder}.</li>
* </ul>
*/
-public class ByteArrayCoder extends AtomicCoder<byte[]> {
+public class ByteArrayCoder extends StandardCoder<byte[]> {
@JsonCreator
public static ByteArrayCoder of() {
return INSTANCE;
}
+ /**
+ * Returns an empty list. {@link ByteArrayCoder} has no components.
+ */
+ public static <T> List<Object> getInstanceComponents(T ignored) {
+ return Collections.emptyList();
+ }
/////////////////////////////////////////////////////////////////////////////
@@ -103,6 +111,14 @@ public class ByteArrayCoder extends AtomicCoder<byte[]> {
}
}
+ @Override
+ public List<? extends Coder<?>> getCoderArguments() {
+ return null;
+ }
+
+ @Override
+ public void verifyDeterministic() throws NonDeterministicException {}
+
/**
* {@inheritDoc}
*
http://git-wip-us.apache.org/repos/asf/beam/blob/7ad9efc7/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ByteCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ByteCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ByteCoder.java
index 0eda58d..2ef166b 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ByteCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ByteCoder.java
@@ -17,7 +17,6 @@
*/
package org.apache.beam.sdk.coders;
-import com.fasterxml.jackson.annotation.JsonCreator;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
@@ -28,9 +27,8 @@ import org.apache.beam.sdk.values.TypeDescriptor;
/**
* A {@link ByteCoder} encodes {@link Byte} values in 1 byte using Java serialization.
*/
-public class ByteCoder extends AtomicCoder<Byte> {
+public class ByteCoder extends CustomCoder<Byte> {
- @JsonCreator
public static ByteCoder of() {
return INSTANCE;
}
@@ -87,6 +85,11 @@ public class ByteCoder extends AtomicCoder<Byte> {
return true;
}
+ @Override
+ public String getEncodingId() {
+ return "";
+ }
+
/**
* {@inheritDoc}
*
http://git-wip-us.apache.org/repos/asf/beam/blob/7ad9efc7/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ByteStringCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ByteStringCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ByteStringCoder.java
index 1e3634c..1b27b5b 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ByteStringCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ByteStringCoder.java
@@ -17,7 +17,6 @@
*/
package org.apache.beam.sdk.coders;
-import com.fasterxml.jackson.annotation.JsonCreator;
import com.google.common.io.ByteStreams;
import com.google.protobuf.ByteString;
import java.io.IOException;
@@ -32,9 +31,8 @@ import org.apache.beam.sdk.values.TypeDescriptor;
* <p>When this code is used in a nested {@link Coder.Context}, the serialized {@link ByteString}
* objects are first delimited by their size.
*/
-public class ByteStringCoder extends AtomicCoder<ByteString> {
+public class ByteStringCoder extends CustomCoder<ByteString> {
- @JsonCreator
public static ByteStringCoder of() {
return INSTANCE;
}
@@ -84,6 +82,9 @@ public class ByteStringCoder extends AtomicCoder<ByteString> {
return VarInt.getLength(size) + size;
}
+ @Override
+ public void verifyDeterministic() {}
+
/**
* {@inheritDoc}
*
http://git-wip-us.apache.org/repos/asf/beam/blob/7ad9efc7/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CustomCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CustomCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CustomCoder.java
index 59d29de..fbf65df 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CustomCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CustomCoder.java
@@ -22,6 +22,8 @@ import static org.apache.beam.sdk.util.Structs.addString;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import java.io.Serializable;
+import java.util.Collections;
+import java.util.List;
import org.apache.beam.sdk.util.CloudObject;
import org.apache.beam.sdk.util.SerializableUtils;
import org.apache.beam.sdk.util.StringUtils;
@@ -39,7 +41,7 @@ import org.apache.beam.sdk.util.StringUtils;
*
* @param <T> the type of elements handled by this coder
*/
-public abstract class CustomCoder<T> extends AtomicCoder<T>
+public abstract class CustomCoder<T> extends StandardCoder<T>
implements Serializable {
@JsonCreator
public static CustomCoder<?> of(
@@ -62,6 +64,24 @@ public abstract class CustomCoder<T> extends AtomicCoder<T>
}
/**
+ * {@inheritDoc}.
+ *
+ * <p>Returns an empty list. A {@link CustomCoder} has no default argument {@link Coder coders}.
+ */
+ @Override
+ public List<? extends Coder<?>> getCoderArguments() {
+ return Collections.emptyList();
+ }
+
+ /**
+ * Returns an empty list. A {@link CustomCoder} by default will not have component coders that are
+ * used for inference.
+ */
+ public static <T> List<Object> getInstanceComponents(T exampleValue) {
+ return Collections.emptyList();
+ }
+
+ /**
* {@inheritDoc}
*
* @return A thin {@link CloudObject} wrapping of the Java serialization of {@code this}.
http://git-wip-us.apache.org/repos/asf/beam/blob/7ad9efc7/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DoubleCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DoubleCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DoubleCoder.java
index 771c851..8731e5a 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DoubleCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DoubleCoder.java
@@ -17,7 +17,6 @@
*/
package org.apache.beam.sdk.coders;
-import com.fasterxml.jackson.annotation.JsonCreator;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.EOFException;
@@ -30,9 +29,8 @@ import org.apache.beam.sdk.values.TypeDescriptor;
/**
* A {@link DoubleCoder} encodes {@link Double} values in 8 bytes using Java serialization.
*/
-public class DoubleCoder extends AtomicCoder<Double> {
+public class DoubleCoder extends CustomCoder<Double> {
- @JsonCreator
public static DoubleCoder of() {
return INSTANCE;
}
@@ -89,6 +87,11 @@ public class DoubleCoder extends AtomicCoder<Double> {
return true;
}
+ @Override
+ public String getEncodingId() {
+ return "";
+ }
+
/**
* {@inheritDoc}
*
http://git-wip-us.apache.org/repos/asf/beam/blob/7ad9efc7/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DurationCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DurationCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DurationCoder.java
index c6f0b18..10a83ef 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DurationCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DurationCoder.java
@@ -17,7 +17,6 @@
*/
package org.apache.beam.sdk.coders;
-import com.fasterxml.jackson.annotation.JsonCreator;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
@@ -30,9 +29,8 @@ import org.joda.time.ReadableDuration;
* A {@link Coder} that encodes a joda {@link Duration} as a {@link Long} using the format of
* {@link VarLongCoder}.
*/
-public class DurationCoder extends AtomicCoder<ReadableDuration> {
+public class DurationCoder extends CustomCoder<ReadableDuration> {
- @JsonCreator
public static DurationCoder of() {
return INSTANCE;
}
@@ -43,7 +41,7 @@ public class DurationCoder extends AtomicCoder<ReadableDuration> {
private static final TypeDescriptor<ReadableDuration> TYPE_DESCRIPTOR =
new TypeDescriptor<ReadableDuration>() {};
- private final VarLongCoder longCoder = VarLongCoder.of();
+ private static final VarLongCoder LONG_CODER = VarLongCoder.of();
private DurationCoder() {}
@@ -61,13 +59,18 @@ public class DurationCoder extends AtomicCoder<ReadableDuration> {
if (value == null) {
throw new CoderException("cannot encode a null ReadableDuration");
}
- longCoder.encode(toLong(value), outStream, context);
+ LONG_CODER.encode(toLong(value), outStream, context);
}
@Override
public ReadableDuration decode(InputStream inStream, Context context)
throws CoderException, IOException {
- return fromLong(longCoder.decode(inStream, context));
+ return fromLong(LONG_CODER.decode(inStream, context));
+ }
+
+ @Override
+ public void verifyDeterministic() {
+ LONG_CODER.verifyDeterministic();
}
/**
@@ -87,13 +90,13 @@ public class DurationCoder extends AtomicCoder<ReadableDuration> {
*/
@Override
public boolean isRegisterByteSizeObserverCheap(ReadableDuration value, Context context) {
- return longCoder.isRegisterByteSizeObserverCheap(toLong(value), context);
+ return LONG_CODER.isRegisterByteSizeObserverCheap(toLong(value), context);
}
@Override
public void registerByteSizeObserver(
ReadableDuration value, ElementByteSizeObserver observer, Context context) throws Exception {
- longCoder.registerByteSizeObserver(toLong(value), observer, context);
+ LONG_CODER.registerByteSizeObserver(toLong(value), observer, context);
}
@Override
http://git-wip-us.apache.org/repos/asf/beam/blob/7ad9efc7/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/InstantCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/InstantCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/InstantCoder.java
index 325a7db..48b7275 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/InstantCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/InstantCoder.java
@@ -17,7 +17,6 @@
*/
package org.apache.beam.sdk.coders;
-import com.fasterxml.jackson.annotation.JsonCreator;
import com.google.common.base.Converter;
import java.io.IOException;
import java.io.InputStream;
@@ -30,9 +29,7 @@ import org.joda.time.Instant;
* A {@link Coder} for joda {@link Instant} that encodes it as a big endian {@link Long}
* shifted such that lexicographic ordering of the bytes corresponds to chronological order.
*/
-public class InstantCoder extends AtomicCoder<Instant> {
-
- @JsonCreator
+public class InstantCoder extends CustomCoder<Instant> {
public static InstantCoder of() {
return INSTANCE;
}
@@ -42,7 +39,7 @@ public class InstantCoder extends AtomicCoder<Instant> {
private static final InstantCoder INSTANCE = new InstantCoder();
private static final TypeDescriptor<Instant> TYPE_DESCRIPTOR = new TypeDescriptor<Instant>() {};
- private final BigEndianLongCoder longCoder = BigEndianLongCoder.of();
+ private static final BigEndianLongCoder LONG_CODER = BigEndianLongCoder.of();
private InstantCoder() {}
@@ -76,13 +73,18 @@ public class InstantCoder extends AtomicCoder<Instant> {
if (value == null) {
throw new CoderException("cannot encode a null Instant");
}
- longCoder.encode(ORDER_PRESERVING_CONVERTER.convert(value), outStream, context);
+ LONG_CODER.encode(ORDER_PRESERVING_CONVERTER.convert(value), outStream, context);
}
@Override
public Instant decode(InputStream inStream, Context context)
throws CoderException, IOException {
- return ORDER_PRESERVING_CONVERTER.reverse().convert(longCoder.decode(inStream, context));
+ return ORDER_PRESERVING_CONVERTER.reverse().convert(LONG_CODER.decode(inStream, context));
+ }
+
+ @Override
+ public void verifyDeterministic() {
+ LONG_CODER.verifyDeterministic();
}
/**
@@ -95,6 +97,11 @@ public class InstantCoder extends AtomicCoder<Instant> {
return true;
}
+ @Override
+ public String getEncodingId() {
+ return "";
+ }
+
/**
* {@inheritDoc}
*
@@ -102,14 +109,14 @@ public class InstantCoder extends AtomicCoder<Instant> {
*/
@Override
public boolean isRegisterByteSizeObserverCheap(Instant value, Context context) {
- return longCoder.isRegisterByteSizeObserverCheap(
+ return LONG_CODER.isRegisterByteSizeObserverCheap(
ORDER_PRESERVING_CONVERTER.convert(value), context);
}
@Override
public void registerByteSizeObserver(
Instant value, ElementByteSizeObserver observer, Context context) throws Exception {
- longCoder.registerByteSizeObserver(
+ LONG_CODER.registerByteSizeObserver(
ORDER_PRESERVING_CONVERTER.convert(value), observer, context);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/7ad9efc7/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/SerializableCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/SerializableCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/SerializableCoder.java
index 49f5b8d..1314a6c 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/SerializableCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/SerializableCoder.java
@@ -46,7 +46,7 @@ import org.apache.beam.sdk.values.TypeDescriptor;
*
* @param <T> the type of elements handled by this coder
*/
-public class SerializableCoder<T extends Serializable> extends AtomicCoder<T> {
+public class SerializableCoder<T extends Serializable> extends CustomCoder<T> {
/**
* Returns a {@link SerializableCoder} instance for the provided element type.
http://git-wip-us.apache.org/repos/asf/beam/blob/7ad9efc7/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StringUtf8Coder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StringUtf8Coder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StringUtf8Coder.java
index cd124ef..f0a0969 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StringUtf8Coder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StringUtf8Coder.java
@@ -17,7 +17,6 @@
*/
package org.apache.beam.sdk.coders;
-import com.fasterxml.jackson.annotation.JsonCreator;
import com.google.common.base.Utf8;
import com.google.common.io.ByteStreams;
import com.google.common.io.CountingOutputStream;
@@ -39,9 +38,8 @@ import org.apache.beam.sdk.values.TypeDescriptor;
* If in a nested context, prefixes the string with an integer length field,
* encoded via a {@link VarIntCoder}.
*/
-public class StringUtf8Coder extends AtomicCoder<String> {
+public class StringUtf8Coder extends CustomCoder<String> {
- @JsonCreator
public static StringUtf8Coder of() {
return INSTANCE;
}
@@ -105,6 +103,9 @@ public class StringUtf8Coder extends AtomicCoder<String> {
}
}
+ @Override
+ public void verifyDeterministic() {}
+
/**
* {@inheritDoc}
*
http://git-wip-us.apache.org/repos/asf/beam/blob/7ad9efc7/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/TextualIntegerCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/TextualIntegerCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/TextualIntegerCoder.java
index 1b79e90..817817b 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/TextualIntegerCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/TextualIntegerCoder.java
@@ -17,7 +17,6 @@
*/
package org.apache.beam.sdk.coders;
-import com.fasterxml.jackson.annotation.JsonCreator;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
@@ -27,9 +26,8 @@ import org.apache.beam.sdk.values.TypeDescriptor;
* A {@link Coder} that encodes {@code Integer Integers} as the ASCII bytes of
* their textual, decimal, representation.
*/
-public class TextualIntegerCoder extends AtomicCoder<Integer> {
+public class TextualIntegerCoder extends CustomCoder<Integer> {
- @JsonCreator
public static TextualIntegerCoder of() {
return new TextualIntegerCoder();
}
@@ -62,6 +60,16 @@ public class TextualIntegerCoder extends AtomicCoder<Integer> {
}
@Override
+ public void verifyDeterministic() {
+ StringUtf8Coder.of().verifyDeterministic();
+ }
+
+ @Override
+ public String getEncodingId() {
+ return "";
+ }
+
+ @Override
public TypeDescriptor<Integer> getEncodedTypeDescriptor() {
return TYPE_DESCRIPTOR;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/7ad9efc7/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VarIntCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VarIntCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VarIntCoder.java
index ec9d8f4..9c654a9 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VarIntCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VarIntCoder.java
@@ -17,7 +17,6 @@
*/
package org.apache.beam.sdk.coders;
-import com.fasterxml.jackson.annotation.JsonCreator;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
@@ -31,9 +30,8 @@ import org.apache.beam.sdk.values.TypeDescriptor;
* numbers always take 5 bytes, so {@link BigEndianIntegerCoder} may be preferable for
* integers that are known to often be large or negative.
*/
-public class VarIntCoder extends AtomicCoder<Integer> {
+public class VarIntCoder extends CustomCoder<Integer> {
- @JsonCreator
public static VarIntCoder of() {
return INSTANCE;
}
@@ -66,6 +64,9 @@ public class VarIntCoder extends AtomicCoder<Integer> {
}
}
+ @Override
+ public void verifyDeterministic() {}
+
/**
* {@inheritDoc}
*
@@ -76,6 +77,11 @@ public class VarIntCoder extends AtomicCoder<Integer> {
return true;
}
+ @Override
+ public String getEncodingId() {
+ return "";
+ }
+
/**
* {@inheritDoc}
*
http://git-wip-us.apache.org/repos/asf/beam/blob/7ad9efc7/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VarLongCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VarLongCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VarLongCoder.java
index 3f1334d..16474ba 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VarLongCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VarLongCoder.java
@@ -17,12 +17,13 @@
*/
package org.apache.beam.sdk.coders;
-import com.fasterxml.jackson.annotation.JsonCreator;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.UTFDataFormatException;
+import java.util.Collections;
+import java.util.List;
import org.apache.beam.sdk.util.VarInt;
import org.apache.beam.sdk.values.TypeDescriptor;
@@ -31,13 +32,18 @@ import org.apache.beam.sdk.values.TypeDescriptor;
* numbers always take 10 bytes, so {@link BigEndianLongCoder} may be preferable for
* longs that are known to often be large or negative.
*/
-public class VarLongCoder extends AtomicCoder<Long> {
-
- @JsonCreator
+public class VarLongCoder extends StandardCoder<Long> {
public static VarLongCoder of() {
return INSTANCE;
}
+ /**
+ * Returns an empty list. {@link VarLongCoder} has no components.
+ */
+ public static <T> List<Object> getInstanceComponents(T ignored) {
+ return Collections.emptyList();
+ }
+
/////////////////////////////////////////////////////////////////////////////
private static final VarLongCoder INSTANCE = new VarLongCoder();
@@ -66,6 +72,14 @@ public class VarLongCoder extends AtomicCoder<Long> {
}
}
+ @Override
+ public List<? extends Coder<?>> getCoderArguments() {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public void verifyDeterministic() {}
+
/**
* {@inheritDoc}
*
http://git-wip-us.apache.org/repos/asf/beam/blob/7ad9efc7/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VoidCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VoidCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VoidCoder.java
index 318485d..a65fa5e 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VoidCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/VoidCoder.java
@@ -25,7 +25,7 @@ import org.apache.beam.sdk.values.TypeDescriptor;
/**
* A {@link Coder} for {@link Void}. Uses zero bytes per {@link Void}.
*/
-public class VoidCoder extends AtomicCoder<Void> {
+public class VoidCoder extends CustomCoder<Void> {
@JsonCreator
public static VoidCoder of() {
@@ -50,6 +50,9 @@ public class VoidCoder extends AtomicCoder<Void> {
return null;
}
+ @Override
+ public void verifyDeterministic() {}
+
/**
* {@inheritDoc}
*
http://git-wip-us.apache.org/repos/asf/beam/blob/7ad9efc7/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
index d9682e7..6045148 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
@@ -22,11 +22,9 @@ import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.base.Strings.isNullOrEmpty;
-import com.fasterxml.jackson.annotation.JsonCreator;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Ordering;
-
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
@@ -41,12 +39,10 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.zip.GZIPOutputStream;
-
import javax.annotation.Nullable;
-
-import org.apache.beam.sdk.coders.AtomicCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.CustomCoder;
import org.apache.beam.sdk.coders.NullableCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy.Context;
@@ -936,11 +932,10 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
/**
* A coder for FileResult objects.
*/
- public static final class FileResultCoder extends AtomicCoder<FileResult> {
+ public static final class FileResultCoder extends CustomCoder<FileResult> {
private static final FileResultCoder INSTANCE = new FileResultCoder();
private final Coder<String> stringCoder = NullableCoder.of(StringUtf8Coder.of());
- @JsonCreator
public static FileResultCoder of() {
return INSTANCE;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/7ad9efc7/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Mean.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Mean.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Mean.java
index 5e7c003..a6808cf 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Mean.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Mean.java
@@ -22,11 +22,11 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Objects;
-import org.apache.beam.sdk.coders.AtomicCoder;
import org.apache.beam.sdk.coders.BigEndianLongCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.beam.sdk.coders.CustomCoder;
import org.apache.beam.sdk.coders.DoubleCoder;
import org.apache.beam.sdk.transforms.Combine.AccumulatingCombineFn.Accumulator;
@@ -181,7 +181,7 @@ public class Mean {
}
static class CountSumCoder<NumT extends Number>
- extends AtomicCoder<CountSum<NumT>> {
+ extends CustomCoder<CountSum<NumT>> {
private static final Coder<Long> LONG_CODER = BigEndianLongCoder.of();
private static final Coder<Double> DOUBLE_CODER = DoubleCoder.of();
@@ -199,5 +199,11 @@ public class Mean {
LONG_CODER.decode(inStream, context.nested()),
DOUBLE_CODER.decode(inStream, context));
}
+
+ @Override
+ public void verifyDeterministic() throws NonDeterministicException {
+ LONG_CODER.verifyDeterministic();
+ DOUBLE_CODER.verifyDeterministic();
+ }
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/7ad9efc7/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/GlobalWindow.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/GlobalWindow.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/GlobalWindow.java
index ad6a9fd..ffc8011 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/GlobalWindow.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/GlobalWindow.java
@@ -19,7 +19,9 @@ package org.apache.beam.sdk.transforms.windowing;
import java.io.InputStream;
import java.io.OutputStream;
-import org.apache.beam.sdk.coders.AtomicCoder;
+import java.util.Collections;
+import java.util.List;
+import org.apache.beam.sdk.coders.StandardCoder;
import org.apache.beam.sdk.util.CloudObject;
import org.joda.time.Duration;
import org.joda.time.Instant;
@@ -62,7 +64,7 @@ public class GlobalWindow extends BoundedWindow {
/**
* {@link Coder} for encoding and decoding {@code GlobalWindow}s.
*/
- public static class Coder extends AtomicCoder<GlobalWindow> {
+ public static class Coder extends StandardCoder<GlobalWindow> {
public static final Coder INSTANCE = new Coder();
@Override
@@ -86,6 +88,17 @@ public class GlobalWindow extends BoundedWindow {
return CloudObject.forClassName("kind:global_window");
}
+ @Override
+ public final List<org.apache.beam.sdk.coders.Coder<?>> getCoderArguments() {
+ return Collections.emptyList();
+ }
+
+ /**
+ * Returns an empty list. The Global Window Coder has no components.
+ */
+ public static <T> List<Object> getInstanceComponents(T exampleValue) {
+ return Collections.emptyList();
+ }
private Coder() {}
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/7ad9efc7/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/IntervalWindow.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/IntervalWindow.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/IntervalWindow.java
index eff4d99..aaa2e83 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/IntervalWindow.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/IntervalWindow.java
@@ -17,15 +17,16 @@
*/
package org.apache.beam.sdk.transforms.windowing;
-import com.fasterxml.jackson.annotation.JsonCreator;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
-import org.apache.beam.sdk.coders.AtomicCoder;
+import java.util.Collections;
+import java.util.List;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.DurationCoder;
import org.apache.beam.sdk.coders.InstantCoder;
+import org.apache.beam.sdk.coders.StandardCoder;
import org.apache.beam.sdk.util.CloudObject;
import org.joda.time.Duration;
import org.joda.time.Instant;
@@ -167,18 +168,24 @@ public class IntervalWindow extends BoundedWindow
/**
* Encodes an {@link IntervalWindow} as a pair of its upper bound and duration.
*/
- public static class IntervalWindowCoder extends AtomicCoder<IntervalWindow> {
+ public static class IntervalWindowCoder extends StandardCoder<IntervalWindow> {
private static final IntervalWindowCoder INSTANCE = new IntervalWindowCoder();
private static final Coder<Instant> instantCoder = InstantCoder.of();
private static final Coder<ReadableDuration> durationCoder = DurationCoder.of();
- @JsonCreator
public static IntervalWindowCoder of() {
return INSTANCE;
}
+ /**
+ * Returns an empty list. {@link IntervalWindowCoder} has no components.
+ */
+ public static <T> List<Object> getInstanceComponents(T value) {
+ return Collections.emptyList();
+ }
+
@Override
public void encode(IntervalWindow window, OutputStream outStream, Context context)
throws IOException, CoderException {
@@ -206,6 +213,11 @@ public class IntervalWindow extends BoundedWindow
}
@Override
+ public List<? extends Coder<?>> getCoderArguments() {
+ return Collections.emptyList();
+ }
+
+ @Override
protected CloudObject initializeCloudObject() {
return CloudObject.forClassName("kind:interval_window");
}
http://git-wip-us.apache.org/repos/asf/beam/blob/7ad9efc7/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PaneInfo.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PaneInfo.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PaneInfo.java
index c73eb39..158bb04 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PaneInfo.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PaneInfo.java
@@ -26,9 +26,9 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Objects;
-import org.apache.beam.sdk.coders.AtomicCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.CustomCoder;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.util.VarInt;
@@ -306,7 +306,7 @@ public final class PaneInfo {
/**
* A Coder for encoding PaneInfo instances.
*/
- public static class PaneInfoCoder extends AtomicCoder<PaneInfo> {
+ public static class PaneInfoCoder extends CustomCoder<PaneInfo> {
private enum Encoding {
FIRST,
ONE_INDEX,
@@ -383,5 +383,8 @@ public final class PaneInfo {
}
return new PaneInfo(base.isFirst, base.isLast, base.timing, index, onTimeIndex);
}
+
+ @Override
+ public void verifyDeterministic() {}
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/7ad9efc7/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BitSetCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BitSetCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BitSetCoder.java
index 72524bd..eda4e5f 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BitSetCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BitSetCoder.java
@@ -21,17 +21,17 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.BitSet;
-import org.apache.beam.sdk.coders.AtomicCoder;
import org.apache.beam.sdk.coders.ByteArrayCoder;
import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.CustomCoder;
/**
* Coder for the BitSet used to track child-trigger finished states.
*/
-public class BitSetCoder extends AtomicCoder<BitSet> {
+public class BitSetCoder extends CustomCoder<BitSet> {
private static final BitSetCoder INSTANCE = new BitSetCoder();
- private static final ByteArrayCoder byteArrayCoder = ByteArrayCoder.of();
+ private static final ByteArrayCoder BYTE_ARRAY_CODER = ByteArrayCoder.of();
private BitSetCoder() {}
@@ -42,19 +42,18 @@ public class BitSetCoder extends AtomicCoder<BitSet> {
@Override
public void encode(BitSet value, OutputStream outStream, Context context)
throws CoderException, IOException {
- byteArrayCoder.encodeAndOwn(value.toByteArray(), outStream, context);
+ BYTE_ARRAY_CODER.encodeAndOwn(value.toByteArray(), outStream, context);
}
@Override
public BitSet decode(InputStream inStream, Context context)
throws CoderException, IOException {
- return BitSet.valueOf(byteArrayCoder.decode(inStream, context));
+ return BitSet.valueOf(BYTE_ARRAY_CODER.decode(inStream, context));
}
@Override
public void verifyDeterministic() throws NonDeterministicException {
verifyDeterministic(
- "BitSetCoder requires its byteArrayCoder to be deterministic.",
- byteArrayCoder);
+ "BitSetCoder requires its ByteArrayCoder to be deterministic.", BYTE_ARRAY_CODER);
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/7ad9efc7/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java
index 2ef892c..cfe7436 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java
@@ -35,8 +35,8 @@ import java.io.Serializable;
import java.util.Collections;
import java.util.regex.Pattern;
import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.coders.AtomicCoder;
import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.CustomCoder;
import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.coders.VarLongCoder;
import org.apache.beam.sdk.io.GenerateSequence;
@@ -88,7 +88,7 @@ public class PAssertTest implements Serializable {
}
}
- private static class NotSerializableObjectCoder extends AtomicCoder<NotSerializableObject> {
+ private static class NotSerializableObjectCoder extends CustomCoder<NotSerializableObject> {
private NotSerializableObjectCoder() { }
private static final NotSerializableObjectCoder INSTANCE = new NotSerializableObjectCoder();
http://git-wip-us.apache.org/repos/asf/beam/blob/7ad9efc7/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/SerializableMatchersTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/SerializableMatchersTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/SerializableMatchersTest.java
index db5ff2e..ddc92d6 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/SerializableMatchersTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/SerializableMatchersTest.java
@@ -29,8 +29,8 @@ import com.google.common.collect.ImmutableList;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.Serializable;
-import org.apache.beam.sdk.coders.AtomicCoder;
import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CustomCoder;
import org.apache.beam.sdk.util.SerializableUtils;
import org.apache.beam.sdk.values.KV;
import org.hamcrest.Matchers;
@@ -151,7 +151,7 @@ public class SerializableMatchersTest implements Serializable {
}
}
- private static class NotSerializableClassCoder extends AtomicCoder<NotSerializableClass> {
+ private static class NotSerializableClassCoder extends CustomCoder<NotSerializableClass> {
@Override
public void encode(NotSerializableClass value, OutputStream outStream, Coder.Context context) {
}
http://git-wip-us.apache.org/repos/asf/beam/blob/7ad9efc7/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/WindowSupplierTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/WindowSupplierTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/WindowSupplierTest.java
index 1ab4c27..38a2fa2 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/WindowSupplierTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/WindowSupplierTest.java
@@ -24,8 +24,8 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Collections;
-import org.apache.beam.sdk.coders.AtomicCoder;
import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.CustomCoder;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.util.SerializableUtils;
@@ -72,7 +72,7 @@ public class WindowSupplierTest {
Collections.<BoundedWindow>singleton(window));
}
- private static class FailingCoder extends AtomicCoder<BoundedWindow> {
+ private static class FailingCoder extends CustomCoder<BoundedWindow> {
@Override
public void encode(
BoundedWindow value, OutputStream outStream, Context context)
http://git-wip-us.apache.org/repos/asf/beam/blob/7ad9efc7/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java
index 8a30476..89a1f33 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java
@@ -36,7 +36,6 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Random;
-import org.apache.beam.sdk.coders.AtomicCoder;
import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
@@ -202,7 +201,7 @@ public class CreateTest {
return myString.equals(((UnserializableRecord) o).myString);
}
- static class UnserializableRecordCoder extends AtomicCoder<UnserializableRecord> {
+ static class UnserializableRecordCoder extends CustomCoder<UnserializableRecord> {
private final Coder<String> stringCoder = StringUtf8Coder.of();
@Override
http://git-wip-us.apache.org/repos/asf/beam/blob/7ad9efc7/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
index 3443847..939261f 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
@@ -26,7 +26,6 @@ import static org.hamcrest.collection.IsIterableContainingInAnyOrder.containsInA
import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertThat;
-import com.fasterxml.jackson.annotation.JsonCreator;
import com.google.common.base.Function;
import com.google.common.collect.Iterables;
import java.io.DataInputStream;
@@ -39,9 +38,9 @@ import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;
-import org.apache.beam.sdk.coders.AtomicCoder;
import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CustomCoder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.MapCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
@@ -455,9 +454,8 @@ public class GroupByKeyTest {
/**
* Deterministic {@link Coder} for {@link BadEqualityKey}.
*/
- static class DeterministicKeyCoder extends AtomicCoder<BadEqualityKey> {
+ static class DeterministicKeyCoder extends CustomCoder<BadEqualityKey> {
- @JsonCreator
public static DeterministicKeyCoder of() {
return INSTANCE;
}
@@ -480,6 +478,9 @@ public class GroupByKeyTest {
throws IOException {
return new BadEqualityKey(new DataInputStream(inStream).readLong());
}
+
+ @Override
+ public void verifyDeterministic() {}
}
/**
http://git-wip-us.apache.org/repos/asf/beam/blob/7ad9efc7/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
index 3424f86..1a976f2 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
@@ -55,7 +55,6 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.beam.sdk.Pipeline.PipelineExecutionException;
-import org.apache.beam.sdk.coders.AtomicCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.CustomCoder;
@@ -979,7 +978,7 @@ public class ParDoTest implements Serializable {
private static class TestDummy { }
- private static class TestDummyCoder extends AtomicCoder<TestDummy> {
+ private static class TestDummyCoder extends CustomCoder<TestDummy> {
private TestDummyCoder() { }
private static final TestDummyCoder INSTANCE = new TestDummyCoder();
@@ -1015,6 +1014,9 @@ public class ParDoTest implements Serializable {
throws Exception {
observer.update(0L);
}
+
+ @Override
+ public void verifyDeterministic() {}
}
private static class TaggedOutputDummyFn extends DoFn<Integer, Integer> {
http://git-wip-us.apache.org/repos/asf/beam/blob/7ad9efc7/sdks/java/core/src/test/java/org/apache/beam/sdk/util/CoderUtilsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/CoderUtilsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/CoderUtilsTest.java
index 4bd2f19..32c2af4 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/CoderUtilsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/CoderUtilsTest.java
@@ -25,11 +25,11 @@ import static org.mockito.Mockito.mock;
import java.io.InputStream;
import java.io.OutputStream;
-import org.apache.beam.sdk.coders.AtomicCoder;
import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.Coder.Context;
import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.CustomCoder;
import org.apache.beam.sdk.coders.IterableCoder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
@@ -52,7 +52,7 @@ public class CoderUtilsTest {
@Rule
public transient ExpectedException expectedException = ExpectedException.none();
- static class TestCoder extends AtomicCoder<Integer> {
+ static class TestCoder extends CustomCoder<Integer> {
public static TestCoder of() {
return new TestCoder();
}
http://git-wip-us.apache.org/repos/asf/beam/blob/7ad9efc7/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoCoder.java b/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoCoder.java
index 99a0838..ece3eca 100644
--- a/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoCoder.java
+++ b/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoCoder.java
@@ -40,12 +40,12 @@ import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import javax.annotation.Nullable;
-import org.apache.beam.sdk.coders.AtomicCoder;
import org.apache.beam.sdk.coders.CannotProvideCoderException;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.CoderProvider;
import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.beam.sdk.coders.CustomCoder;
import org.apache.beam.sdk.util.CloudObject;
import org.apache.beam.sdk.util.Structs;
import org.apache.beam.sdk.values.PCollection;
@@ -110,7 +110,7 @@ import org.apache.beam.sdk.values.TypeDescriptor;
*
* @param <T> the Protocol Buffers {@link Message} handled by this {@link Coder}.
*/
-public class ProtoCoder<T extends Message> extends AtomicCoder<T> {
+public class ProtoCoder<T extends Message> extends CustomCoder<T> {
/**
* A {@link CoderProvider} that returns a {@link ProtoCoder} with an empty
http://git-wip-us.apache.org/repos/asf/beam/blob/7ad9efc7/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestinationCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestinationCoder.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestinationCoder.java
index 262a00d..c418804 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestinationCoder.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestinationCoder.java
@@ -18,20 +18,18 @@
package org.apache.beam.sdk.io.gcp.bigquery;
-import com.fasterxml.jackson.annotation.JsonCreator;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
-import org.apache.beam.sdk.coders.AtomicCoder;
import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.CustomCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
/** A coder for {@link TableDestination} objects. */
-public class TableDestinationCoder extends AtomicCoder<TableDestination> {
+public class TableDestinationCoder extends CustomCoder<TableDestination> {
private static final TableDestinationCoder INSTANCE = new TableDestinationCoder();
private static final StringUtf8Coder stringCoder = StringUtf8Coder.of();
- @JsonCreator
public static TableDestinationCoder of() {
return INSTANCE;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/7ad9efc7/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowInfoCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowInfoCoder.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowInfoCoder.java
index 9ef947e..c3e48a4 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowInfoCoder.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowInfoCoder.java
@@ -18,23 +18,21 @@
package org.apache.beam.sdk.io.gcp.bigquery;
-import com.fasterxml.jackson.annotation.JsonCreator;
import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
-import org.apache.beam.sdk.coders.AtomicCoder;
import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.CustomCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
/**
* Defines a coder for {@link TableRowInfo} objects.
*/
@VisibleForTesting
-class TableRowInfoCoder extends AtomicCoder<TableRowInfo> {
+class TableRowInfoCoder extends CustomCoder<TableRowInfo> {
private static final TableRowInfoCoder INSTANCE = new TableRowInfoCoder();
- @JsonCreator
public static TableRowInfoCoder of() {
return INSTANCE;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/7ad9efc7/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowJsonCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowJsonCoder.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowJsonCoder.java
index ce4b669..e5f8591 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowJsonCoder.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowJsonCoder.java
@@ -17,24 +17,22 @@
*/
package org.apache.beam.sdk.io.gcp.bigquery;
-import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.google.api.services.bigquery.model.TableRow;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
-import org.apache.beam.sdk.coders.AtomicCoder;
import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CustomCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.values.TypeDescriptor;
/**
* A {@link Coder} that encodes BigQuery {@link TableRow} objects in their native JSON format.
*/
-public class TableRowJsonCoder extends AtomicCoder<TableRow> {
+public class TableRowJsonCoder extends CustomCoder<TableRow> {
- @JsonCreator
public static TableRowJsonCoder of() {
return INSTANCE;
}
@@ -88,4 +86,10 @@ public class TableRowJsonCoder extends AtomicCoder<TableRow> {
public TypeDescriptor<TableRow> getEncodedTypeDescriptor() {
return TYPE_DESCRIPTOR;
}
+
+ @Override
+ public String getEncodingId() {
+ return "";
+ }
+
}
http://git-wip-us.apache.org/repos/asf/beam/blob/7ad9efc7/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java
index a25cc90..d337476 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java
@@ -26,8 +26,8 @@ import java.io.OutputStream;
import java.io.Serializable;
import java.util.Map;
import java.util.UUID;
-import org.apache.beam.sdk.coders.AtomicCoder;
import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.CustomCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarLongCoder;
import org.apache.beam.sdk.transforms.DoFn;
@@ -67,7 +67,7 @@ class WriteBundlesToFiles extends DoFn<KV<TableDestination, TableRow>, WriteBund
/**
* a coder for the {@link Result} class.
*/
- public static class ResultCoder extends AtomicCoder<Result> {
+ public static class ResultCoder extends CustomCoder<Result> {
private static final ResultCoder INSTANCE = new ResultCoder();
private static final StringUtf8Coder stringCoder = StringUtf8Coder.of();
private static final VarLongCoder longCoder = VarLongCoder.of();
@@ -98,8 +98,7 @@ class WriteBundlesToFiles extends DoFn<KV<TableDestination, TableRow>, WriteBund
}
@Override
- public void verifyDeterministic() throws NonDeterministicException {
- }
+ public void verifyDeterministic() {}
}
WriteBundlesToFiles(String tempFilePrefix) {
http://git-wip-us.apache.org/repos/asf/beam/blob/7ad9efc7/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java
index 3ce9224..f1dc1e8 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java
@@ -32,11 +32,11 @@ import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
import javax.annotation.Nullable;
-import org.apache.beam.sdk.coders.AtomicCoder;
import org.apache.beam.sdk.coders.BigEndianLongCoder;
import org.apache.beam.sdk.coders.ByteArrayCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.CustomCoder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.MapCoder;
import org.apache.beam.sdk.coders.NullableCoder;
@@ -103,7 +103,7 @@ public class PubsubUnboundedSink<T> extends PTransform<PCollection<T>, PDone> {
/**
* Coder for conveying outgoing messages between internal stages.
*/
- private static class OutgoingMessageCoder extends AtomicCoder<OutgoingMessage> {
+ private static class OutgoingMessageCoder extends CustomCoder<OutgoingMessage> {
private static final NullableCoder<String> RECORD_ID_CODER =
NullableCoder.of(StringUtf8Coder.of());
private static final NullableCoder<Map<String, String>> ATTRIBUTES_CODER =
http://git-wip-us.apache.org/repos/asf/beam/blob/7ad9efc7/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java
index 0389d4b..558b944 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java
@@ -46,9 +46,9 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nullable;
-import org.apache.beam.sdk.coders.AtomicCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.CustomCoder;
import org.apache.beam.sdk.coders.ListCoder;
import org.apache.beam.sdk.coders.NullableCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
@@ -359,7 +359,7 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>>
}
/** The coder for our checkpoints. */
- private static class PubsubCheckpointCoder<T> extends AtomicCoder<PubsubCheckpoint<T>> {
+ private static class PubsubCheckpointCoder<T> extends CustomCoder<PubsubCheckpoint<T>> {
private static final Coder<String> SUBSCRIPTION_PATH_CODER =
NullableCoder.of(StringUtf8Coder.of());
private static final Coder<List<String>> LIST_CODER = ListCoder.of(StringUtf8Coder.of());
http://git-wip-us.apache.org/repos/asf/beam/blob/7ad9efc7/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
index e11dd74..e0a5fac 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
@@ -28,6 +28,7 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
+
import com.google.api.client.util.Data;
import com.google.api.services.bigquery.model.Job;
import com.google.api.services.bigquery.model.JobStatistics;
@@ -44,7 +45,6 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
-
import java.io.File;
import java.io.FileFilter;
import java.io.IOException;
@@ -64,12 +64,11 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
-
import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.coders.AtomicCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.Coder.Context;
import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.CustomCoder;
import org.apache.beam.sdk.coders.IterableCoder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
@@ -590,7 +589,7 @@ public class BigQueryIOTest implements Serializable {
/**
* Coder for @link{PartitionedGlobalWindow}.
*/
- private static class PartitionedGlobalWindowCoder extends AtomicCoder<PartitionedGlobalWindow> {
+ private static class PartitionedGlobalWindowCoder extends CustomCoder<PartitionedGlobalWindow> {
@Override
public void encode(PartitionedGlobalWindow window, OutputStream outStream, Context context)
throws IOException, CoderException {
@@ -602,6 +601,9 @@ public class BigQueryIOTest implements Serializable {
throws IOException, CoderException {
return new PartitionedGlobalWindow(StringUtf8Coder.of().decode(inStream, context));
}
+
+ @Override
+ public void verifyDeterministic() {}
}
@Test
http://git-wip-us.apache.org/repos/asf/beam/blob/7ad9efc7/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseMutationCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseMutationCoder.java b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseMutationCoder.java
index 228e0b4..35a8863 100644
--- a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseMutationCoder.java
+++ b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseMutationCoder.java
@@ -21,9 +21,8 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.Serializable;
-
-import org.apache.beam.sdk.coders.AtomicCoder;
import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CustomCoder;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
@@ -35,7 +34,7 @@ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.Mut
* A {@link Coder} that serializes and deserializes the {@link Mutation} objects using {@link
* ProtobufUtil}.
*/
-class HBaseMutationCoder extends AtomicCoder<Mutation> implements Serializable {
+class HBaseMutationCoder extends CustomCoder<Mutation> implements Serializable {
private static final HBaseMutationCoder INSTANCE = new HBaseMutationCoder();
private HBaseMutationCoder() {}
[2/3] beam git commit: Stop Extending AtomicCoder
Posted by tg...@apache.org.
http://git-wip-us.apache.org/repos/asf/beam/blob/7ad9efc7/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseResultCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseResultCoder.java b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseResultCoder.java
index 94f324a..0004d03 100644
--- a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseResultCoder.java
+++ b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseResultCoder.java
@@ -21,9 +21,8 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.Serializable;
-
-import org.apache.beam.sdk.coders.AtomicCoder;
import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CustomCoder;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
@@ -32,7 +31,7 @@ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
* A {@link Coder} that serializes and deserializes the {@link Result} objects using {@link
* ProtobufUtil}.
*/
-class HBaseResultCoder extends AtomicCoder<Result> implements Serializable {
+class HBaseResultCoder extends CustomCoder<Result> implements Serializable {
private static final HBaseResultCoder INSTANCE = new HBaseResultCoder();
private HBaseResultCoder() {}
http://git-wip-us.apache.org/repos/asf/beam/blob/7ad9efc7/sdks/java/io/kafka/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/kafka/pom.xml b/sdks/java/io/kafka/pom.xml
index 785699c..2ae940d 100644
--- a/sdks/java/io/kafka/pom.xml
+++ b/sdks/java/io/kafka/pom.xml
@@ -83,11 +83,6 @@
</dependency>
<dependency>
- <groupId>com.fasterxml.jackson.core</groupId>
- <artifactId>jackson-annotations</artifactId>
- </dependency>
-
- <dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
http://git-wip-us.apache.org/repos/asf/beam/blob/7ad9efc7/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
index fbd96eb..68efb9a 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
@@ -53,11 +53,11 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.Nullable;
import org.apache.beam.sdk.annotations.Experimental;
-import org.apache.beam.sdk.coders.AtomicCoder;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.ByteArrayCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.CustomCoder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.io.Read.Unbounded;
import org.apache.beam.sdk.io.UnboundedSource;
@@ -1321,7 +1321,7 @@ public class KafkaIO {
}
}
- private static class NullOnlyCoder<T> extends AtomicCoder<T> {
+ private static class NullOnlyCoder<T> extends CustomCoder<T> {
@Override
public void encode(T value, OutputStream outStream, Context context) {
checkArgument(value == null, "Can only encode nulls");
http://git-wip-us.apache.org/repos/asf/beam/blob/7ad9efc7/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecordCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecordCoder.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecordCoder.java
index 25ef7df..160e8ce 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecordCoder.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecordCoder.java
@@ -17,28 +17,23 @@
*/
package org.apache.beam.sdk.io.kafka;
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.List;
-
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.CustomCoder;
import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.coders.StandardCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.coders.VarLongCoder;
-import org.apache.beam.sdk.util.PropertyNames;
import org.apache.beam.sdk.values.KV;
/**
* {@link Coder} for {@link KafkaRecord}.
*/
-public class KafkaRecordCoder<K, V> extends StandardCoder<KafkaRecord<K, V>> {
+public class KafkaRecordCoder<K, V> extends CustomCoder<KafkaRecord<K, V>> {
private static final StringUtf8Coder stringCoder = StringUtf8Coder.of();
private static final VarLongCoder longCoder = VarLongCoder.of();
@@ -46,13 +41,6 @@ public class KafkaRecordCoder<K, V> extends StandardCoder<KafkaRecord<K, V>> {
private final KvCoder<K, V> kvCoder;
- @JsonCreator
- public static KafkaRecordCoder<?, ?> of(@JsonProperty(PropertyNames.COMPONENT_ENCODINGS)
- List<Coder<?>> components) {
- KvCoder<?, ?> kvCoder = KvCoder.of(components);
- return of(kvCoder.getKeyCoder(), kvCoder.getValueCoder());
- }
-
public static <K, V> KafkaRecordCoder<K, V> of(Coder<K> keyCoder, Coder<V> valueCoder) {
return new KafkaRecordCoder<K, V>(keyCoder, valueCoder);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/7ad9efc7/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisRecordCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisRecordCoder.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisRecordCoder.java
index 77fe127..4da2b05 100644
--- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisRecordCoder.java
+++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisRecordCoder.java
@@ -21,9 +21,9 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
-import org.apache.beam.sdk.coders.AtomicCoder;
import org.apache.beam.sdk.coders.ByteArrayCoder;
import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CustomCoder;
import org.apache.beam.sdk.coders.InstantCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarLongCoder;
@@ -32,7 +32,7 @@ import org.joda.time.Instant;
/**
* A {@link Coder} for {@link KinesisRecord}.
*/
-class KinesisRecordCoder extends AtomicCoder<KinesisRecord> {
+class KinesisRecordCoder extends CustomCoder<KinesisRecord> {
private static final StringUtf8Coder STRING_CODER = StringUtf8Coder.of();
private static final ByteArrayCoder BYTE_ARRAY_CODER = ByteArrayCoder.of();
private static final InstantCoder INSTANT_CODER = InstantCoder.of();
http://git-wip-us.apache.org/repos/asf/beam/blob/7ad9efc7/sdks/java/io/xml/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/xml/pom.xml b/sdks/java/io/xml/pom.xml
index 49ce239..51f1c6c 100644
--- a/sdks/java/io/xml/pom.xml
+++ b/sdks/java/io/xml/pom.xml
@@ -57,11 +57,6 @@
</dependency>
<dependency>
- <groupId>com.fasterxml.jackson.core</groupId>
- <artifactId>jackson-annotations</artifactId>
- </dependency>
-
- <dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
http://git-wip-us.apache.org/repos/asf/beam/blob/7ad9efc7/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/JAXBCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/JAXBCoder.java b/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/JAXBCoder.java
index b8b1b79..1e2e07c 100644
--- a/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/JAXBCoder.java
+++ b/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/JAXBCoder.java
@@ -17,8 +17,6 @@
*/
package org.apache.beam.sdk.io.xml;
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.io.ByteStreams;
import java.io.FilterInputStream;
import java.io.FilterOutputStream;
@@ -29,11 +27,9 @@ import javax.xml.bind.JAXBContext;
import javax.xml.bind.JAXBException;
import javax.xml.bind.Marshaller;
import javax.xml.bind.Unmarshaller;
-import org.apache.beam.sdk.coders.AtomicCoder;
import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.util.CloudObject;
+import org.apache.beam.sdk.coders.CustomCoder;
import org.apache.beam.sdk.util.EmptyOnDeserializationThreadLocal;
-import org.apache.beam.sdk.util.Structs;
import org.apache.beam.sdk.util.VarInt;
import org.apache.beam.sdk.values.TypeDescriptor;
@@ -43,10 +39,9 @@ import org.apache.beam.sdk.values.TypeDescriptor;
*
* @param <T> type of JAXB annotated objects that will be serialized.
*/
-public class JAXBCoder<T> extends AtomicCoder<T> {
+public class JAXBCoder<T> extends CustomCoder<T> {
private final Class<T> jaxbClass;
- private final TypeDescriptor<T> typeDescriptor;
private transient volatile JAXBContext jaxbContext;
private final EmptyOnDeserializationThreadLocal<Marshaller> jaxbMarshaller;
private final EmptyOnDeserializationThreadLocal<Unmarshaller> jaxbUnmarshaller;
@@ -57,7 +52,6 @@ public class JAXBCoder<T> extends AtomicCoder<T> {
private JAXBCoder(Class<T> jaxbClass) {
this.jaxbClass = jaxbClass;
- this.typeDescriptor = TypeDescriptor.of(jaxbClass);
this.jaxbMarshaller = new EmptyOnDeserializationThreadLocal<Marshaller>() {
@Override
protected Marshaller initialValue() {
@@ -147,7 +141,7 @@ public class JAXBCoder<T> extends AtomicCoder<T> {
@Override
public TypeDescriptor<T> getEncodedTypeDescriptor() {
- return typeDescriptor;
+ return TypeDescriptor.of(jaxbClass);
}
private static class CloseIgnoringInputStream extends FilterInputStream {
@@ -173,31 +167,4 @@ public class JAXBCoder<T> extends AtomicCoder<T> {
// JAXB closes the underlying stream so we must filter out those calls.
}
}
-
- ////////////////////////////////////////////////////////////////////////////////////
- // JSON Serialization details below
-
- private static final String JAXB_CLASS = "jaxb_class";
-
- /**
- * Constructor for JSON deserialization only.
- */
- @JsonCreator
- public static <T> JAXBCoder<T> of(
- @JsonProperty(JAXB_CLASS) String jaxbClassName) {
- try {
- @SuppressWarnings("unchecked")
- Class<T> jaxbClass = (Class<T>) Class.forName(jaxbClassName);
- return of(jaxbClass);
- } catch (ClassNotFoundException e) {
- throw new IllegalArgumentException(e);
- }
- }
-
- @Override
- protected CloudObject initializeCloudObject() {
- CloudObject result = CloudObject.forClass(getClass());
- Structs.addString(result, JAXB_CLASS, jaxbClass.getName());
- return result;
- }
}