You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ie...@apache.org on 2017/08/23 17:09:38 UTC

[30/55] [abbrv] beam git commit: Fix compile after Coders and Pubsub refactor

Fix compile after Coders and Pubsub refactor


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/8b96949b
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/8b96949b
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/8b96949b

Branch: refs/heads/master
Commit: 8b96949b934be1df7102aeb24ef4b23d9dd28812
Parents: b438fa7
Author: Etienne Chauchot <ec...@gmail.com>
Authored: Fri Apr 28 10:29:38 2017 +0200
Committer: Ismaël Mejía <ie...@gmail.com>
Committed: Wed Aug 23 19:07:28 2017 +0200

----------------------------------------------------------------------
 .../integration/nexmark/NexmarkOptions.java     |  3 +--
 .../beam/integration/nexmark/WinningBids.java   | 23 +++++++++++---------
 .../integration/nexmark/io/PubsubClient.java    |  2 +-
 .../integration/nexmark/io/PubsubHelper.java    |  2 +-
 .../nexmark/io/PubsubJsonClient.java            |  2 +-
 .../nexmark/io/PubsubTestClient.java            |  2 +-
 .../beam/integration/nexmark/model/Auction.java |  6 ++---
 .../integration/nexmark/model/AuctionBid.java   |  6 ++---
 .../integration/nexmark/model/AuctionCount.java |  6 ++---
 .../integration/nexmark/model/AuctionPrice.java |  6 ++---
 .../beam/integration/nexmark/model/Bid.java     |  8 +++----
 .../nexmark/model/BidsPerSession.java           |  7 +++---
 .../nexmark/model/CategoryPrice.java            |  7 +++---
 .../beam/integration/nexmark/model/Done.java    |  7 +++---
 .../beam/integration/nexmark/model/Event.java   |  6 ++---
 .../nexmark/model/IdNameReserve.java            |  7 +++---
 .../nexmark/model/NameCityStateId.java          |  7 +++---
 .../beam/integration/nexmark/model/Person.java  |  7 +++---
 .../integration/nexmark/model/SellerPrice.java  |  7 +++---
 .../integration/nexmark/sources/Generator.java  |  6 ++---
 20 files changed, 57 insertions(+), 70 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/8b96949b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkOptions.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkOptions.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkOptions.java
index 5d093ae..e1c1af2 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkOptions.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkOptions.java
@@ -18,10 +18,9 @@
 package org.apache.beam.integration.nexmark;
 
 import javax.annotation.Nullable;
-
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubOptions;
 import org.apache.beam.sdk.options.Default;
 import org.apache.beam.sdk.options.Description;
-import org.apache.beam.sdk.options.PubsubOptions;
 
 /**
  * Command line flags.

http://git-wip-us.apache.org/repos/asf/beam/blob/8b96949b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/WinningBids.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/WinningBids.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/WinningBids.java
index f2566b8..3815b9d 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/WinningBids.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/WinningBids.java
@@ -35,9 +35,9 @@ import org.apache.beam.integration.nexmark.model.AuctionBid;
 import org.apache.beam.integration.nexmark.model.Bid;
 import org.apache.beam.integration.nexmark.model.Event;
 import org.apache.beam.integration.nexmark.sources.GeneratorConfig;
-import org.apache.beam.sdk.coders.AtomicCoder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.CustomCoder;
 import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.coders.VarLongCoder;
 import org.apache.beam.sdk.metrics.Counter;
@@ -145,7 +145,7 @@ public class WinningBids extends PTransform<PCollection<Event>, PCollection<Auct
   /**
    * Encodes an {@link AuctionOrBidWindow} as an {@link IntervalWindow} and an auction id long.
    */
-  private static class AuctionOrBidWindowCoder extends AtomicCoder<AuctionOrBidWindow> {
+  private static class AuctionOrBidWindowCoder extends CustomCoder<AuctionOrBidWindow> {
     private static final AuctionOrBidWindowCoder INSTANCE = new AuctionOrBidWindowCoder();
     private static final Coder<IntervalWindow> SUPER_CODER = IntervalWindow.getCoder();
     private static final Coder<Long> ID_CODER = VarLongCoder.of();
@@ -157,22 +157,25 @@ public class WinningBids extends PTransform<PCollection<Event>, PCollection<Auct
     }
 
     @Override
-    public void encode(AuctionOrBidWindow window, OutputStream outStream, Context context)
+    public void encode(AuctionOrBidWindow window, OutputStream outStream, Coder.Context context)
         throws IOException, CoderException {
-      SUPER_CODER.encode(window, outStream, Context.NESTED);
-      ID_CODER.encode(window.auction, outStream, Context.NESTED);
-      INT_CODER.encode(window.isAuctionWindow ? 1 : 0, outStream, Context.NESTED);
+      SUPER_CODER.encode(window, outStream, Coder.Context.NESTED);
+      ID_CODER.encode(window.auction, outStream, Coder.Context.NESTED);
+      INT_CODER.encode(window.isAuctionWindow ? 1 : 0, outStream, Coder.Context.NESTED);
     }
 
     @Override
-    public AuctionOrBidWindow decode(InputStream inStream, Context context)
+    public AuctionOrBidWindow decode(InputStream inStream, Coder.Context context)
         throws IOException, CoderException {
-      IntervalWindow superWindow = SUPER_CODER.decode(inStream, Context.NESTED);
-      long auction = ID_CODER.decode(inStream, Context.NESTED);
-      boolean isAuctionWindow = INT_CODER.decode(inStream, Context.NESTED) == 0 ? false : true;
+      IntervalWindow superWindow = SUPER_CODER.decode(inStream, Coder.Context.NESTED);
+      long auction = ID_CODER.decode(inStream, Coder.Context.NESTED);
+      boolean isAuctionWindow =
+          INT_CODER.decode(inStream, Coder.Context.NESTED) == 0 ? false : true;
       return new AuctionOrBidWindow(
           superWindow.start(), superWindow.end(), auction, isAuctionWindow);
     }
+
+    @Override public void verifyDeterministic() throws NonDeterministicException {}
   }
 
   /** Assign events to auction windows and merges them intelligently. */

http://git-wip-us.apache.org/repos/asf/beam/blob/8b96949b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/io/PubsubClient.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/io/PubsubClient.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/io/PubsubClient.java
index 687aa35..931fe6e 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/io/PubsubClient.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/io/PubsubClient.java
@@ -32,7 +32,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ThreadLocalRandom;
 import javax.annotation.Nullable;
-import org.apache.beam.sdk.options.PubsubOptions;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubOptions;
 
 /**
  * An (abstract) helper class for talking to Pubsub via an underlying transport.

http://git-wip-us.apache.org/repos/asf/beam/blob/8b96949b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/io/PubsubHelper.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/io/PubsubHelper.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/io/PubsubHelper.java
index 15401b7..bcc5b1c 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/io/PubsubHelper.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/io/PubsubHelper.java
@@ -23,7 +23,7 @@ import java.util.Collection;
 import java.util.List;
 
 import org.apache.beam.integration.nexmark.NexmarkUtils;
-import org.apache.beam.sdk.options.PubsubOptions;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubOptions;
 
 /**
  * Helper for working with pubsub.

http://git-wip-us.apache.org/repos/asf/beam/blob/8b96949b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/io/PubsubJsonClient.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/io/PubsubJsonClient.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/io/PubsubJsonClient.java
index b778a09..afddbd8 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/io/PubsubJsonClient.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/io/PubsubJsonClient.java
@@ -47,7 +47,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
 import javax.annotation.Nullable;
-import org.apache.beam.sdk.options.PubsubOptions;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubOptions;
 import org.apache.beam.sdk.util.RetryHttpRequestInitializer;
 import org.apache.beam.sdk.util.Transport;
 

http://git-wip-us.apache.org/repos/asf/beam/blob/8b96949b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/io/PubsubTestClient.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/io/PubsubTestClient.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/io/PubsubTestClient.java
index 125a8d6..69ba2b0 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/io/PubsubTestClient.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/io/PubsubTestClient.java
@@ -33,7 +33,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import javax.annotation.Nullable;
-import org.apache.beam.sdk.options.PubsubOptions;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubOptions;
 
 /**
  * A (partial) implementation of {@link PubsubClient} for use by unit tests. Only suitable for

http://git-wip-us.apache.org/repos/asf/beam/blob/8b96949b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Auction.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Auction.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Auction.java
index ac30568..4b1a848 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Auction.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Auction.java
@@ -19,16 +19,14 @@ package org.apache.beam.integration.nexmark.model;
 
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.core.JsonProcessingException;
-
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.io.Serializable;
-
 import org.apache.beam.integration.nexmark.NexmarkUtils;
-import org.apache.beam.sdk.coders.AtomicCoder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.CustomCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VarLongCoder;
 
@@ -39,7 +37,7 @@ public class Auction implements KnownSize, Serializable {
   private static final Coder<Long> LONG_CODER = VarLongCoder.of();
   private static final Coder<String> STRING_CODER = StringUtf8Coder.of();
 
-  public static final Coder<Auction> CODER = new AtomicCoder<Auction>() {
+  public static final Coder<Auction> CODER = new CustomCoder<Auction>() {
     @Override
     public void encode(Auction value, OutputStream outStream,
         Coder.Context context)

http://git-wip-us.apache.org/repos/asf/beam/blob/8b96949b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/AuctionBid.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/AuctionBid.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/AuctionBid.java
index c014257..7f6b7c9 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/AuctionBid.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/AuctionBid.java
@@ -19,23 +19,21 @@ package org.apache.beam.integration.nexmark.model;
 
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.core.JsonProcessingException;
-
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.io.Serializable;
-
 import org.apache.beam.integration.nexmark.NexmarkUtils;
 import org.apache.beam.integration.nexmark.WinningBids;
-import org.apache.beam.sdk.coders.AtomicCoder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.CustomCoder;
 
 /**
  * Result of {@link WinningBids} transform.
  */
 public class AuctionBid implements KnownSize, Serializable {
-  public static final Coder<AuctionBid> CODER = new AtomicCoder<AuctionBid>() {
+  public static final Coder<AuctionBid> CODER = new CustomCoder<AuctionBid>() {
     @Override
     public void encode(AuctionBid value, OutputStream outStream,
         Coder.Context context)

http://git-wip-us.apache.org/repos/asf/beam/blob/8b96949b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/AuctionCount.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/AuctionCount.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/AuctionCount.java
index aa16629..e6d3450 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/AuctionCount.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/AuctionCount.java
@@ -19,16 +19,14 @@ package org.apache.beam.integration.nexmark.model;
 
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.core.JsonProcessingException;
-
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.io.Serializable;
-
 import org.apache.beam.integration.nexmark.NexmarkUtils;
-import org.apache.beam.sdk.coders.AtomicCoder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.CustomCoder;
 import org.apache.beam.sdk.coders.VarLongCoder;
 
 /**
@@ -37,7 +35,7 @@ import org.apache.beam.sdk.coders.VarLongCoder;
 public class AuctionCount implements KnownSize, Serializable {
   private static final Coder<Long> LONG_CODER = VarLongCoder.of();
 
-  public static final Coder<AuctionCount> CODER = new AtomicCoder<AuctionCount>() {
+  public static final Coder<AuctionCount> CODER = new CustomCoder<AuctionCount>() {
     @Override
     public void encode(AuctionCount value, OutputStream outStream,
         Coder.Context context)

http://git-wip-us.apache.org/repos/asf/beam/blob/8b96949b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/AuctionPrice.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/AuctionPrice.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/AuctionPrice.java
index f365cc8..cb971e2 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/AuctionPrice.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/AuctionPrice.java
@@ -19,16 +19,14 @@ package org.apache.beam.integration.nexmark.model;
 
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.core.JsonProcessingException;
-
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.io.Serializable;
-
 import org.apache.beam.integration.nexmark.NexmarkUtils;
-import org.apache.beam.sdk.coders.AtomicCoder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.CustomCoder;
 import org.apache.beam.sdk.coders.VarLongCoder;
 
 /**
@@ -37,7 +35,7 @@ import org.apache.beam.sdk.coders.VarLongCoder;
 public class AuctionPrice implements KnownSize, Serializable {
   private static final Coder<Long> LONG_CODER = VarLongCoder.of();
 
-  public static final Coder<AuctionPrice> CODER = new AtomicCoder<AuctionPrice>() {
+  public static final Coder<AuctionPrice> CODER = new CustomCoder<AuctionPrice>() {
     @Override
     public void encode(AuctionPrice value, OutputStream outStream,
         Coder.Context context)

http://git-wip-us.apache.org/repos/asf/beam/blob/8b96949b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Bid.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Bid.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Bid.java
index 59a33c1..faeb928 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Bid.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Bid.java
@@ -19,17 +19,15 @@ package org.apache.beam.integration.nexmark.model;
 
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.core.JsonProcessingException;
-
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.io.Serializable;
 import java.util.Comparator;
-
 import org.apache.beam.integration.nexmark.NexmarkUtils;
-import org.apache.beam.sdk.coders.AtomicCoder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.CustomCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VarLongCoder;
 
@@ -40,7 +38,7 @@ public class Bid implements KnownSize, Serializable {
   private static final Coder<Long> LONG_CODER = VarLongCoder.of();
   private static final Coder<String> STRING_CODER = StringUtf8Coder.of();
 
-  public static final Coder<Bid> CODER = new AtomicCoder<Bid>() {
+  public static final Coder<Bid> CODER = new CustomCoder<Bid>() {
     @Override
     public void encode(Bid value, OutputStream outStream,
         Coder.Context context)
@@ -63,6 +61,8 @@ public class Bid implements KnownSize, Serializable {
       String extra = STRING_CODER.decode(inStream, Context.NESTED);
       return new Bid(auction, bidder, price, dateTime, extra);
     }
+
+    @Override public void verifyDeterministic() throws NonDeterministicException {}
   };
 
   /**

http://git-wip-us.apache.org/repos/asf/beam/blob/8b96949b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/BidsPerSession.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/BidsPerSession.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/BidsPerSession.java
index 7c4dfae..26b6a41 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/BidsPerSession.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/BidsPerSession.java
@@ -19,16 +19,14 @@ package org.apache.beam.integration.nexmark.model;
 
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.core.JsonProcessingException;
-
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.io.Serializable;
-
 import org.apache.beam.integration.nexmark.NexmarkUtils;
-import org.apache.beam.sdk.coders.AtomicCoder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.CustomCoder;
 import org.apache.beam.sdk.coders.VarLongCoder;
 
 /**
@@ -37,7 +35,7 @@ import org.apache.beam.sdk.coders.VarLongCoder;
 public class BidsPerSession implements KnownSize, Serializable {
   private static final Coder<Long> LONG_CODER = VarLongCoder.of();
 
-  public static final Coder<BidsPerSession> CODER = new AtomicCoder<BidsPerSession>() {
+  public static final Coder<BidsPerSession> CODER = new CustomCoder<BidsPerSession>() {
     @Override
     public void encode(BidsPerSession value, OutputStream outStream,
         Coder.Context context)
@@ -54,6 +52,7 @@ public class BidsPerSession implements KnownSize, Serializable {
       long bidsPerSession = LONG_CODER.decode(inStream, Context.NESTED);
       return new BidsPerSession(personId, bidsPerSession);
     }
+    @Override public void verifyDeterministic() throws NonDeterministicException {}
   };
 
   @JsonProperty

http://git-wip-us.apache.org/repos/asf/beam/blob/8b96949b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/CategoryPrice.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/CategoryPrice.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/CategoryPrice.java
index 6512cc1..ccb2bc7 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/CategoryPrice.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/CategoryPrice.java
@@ -19,16 +19,14 @@ package org.apache.beam.integration.nexmark.model;
 
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.core.JsonProcessingException;
-
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.io.Serializable;
-
 import org.apache.beam.integration.nexmark.NexmarkUtils;
-import org.apache.beam.sdk.coders.AtomicCoder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.CustomCoder;
 import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.coders.VarLongCoder;
 
@@ -39,7 +37,7 @@ public class CategoryPrice implements KnownSize, Serializable {
   private static final Coder<Long> LONG_CODER = VarLongCoder.of();
   private static final Coder<Integer> INT_CODER = VarIntCoder.of();
 
-  public static final Coder<CategoryPrice> CODER = new AtomicCoder<CategoryPrice>() {
+  public static final Coder<CategoryPrice> CODER = new CustomCoder<CategoryPrice>() {
     @Override
     public void encode(CategoryPrice value, OutputStream outStream,
         Coder.Context context)
@@ -58,6 +56,7 @@ public class CategoryPrice implements KnownSize, Serializable {
       boolean isLast = INT_CODER.decode(inStream, context) != 0;
       return new CategoryPrice(category, price, isLast);
     }
+    @Override public void verifyDeterministic() throws NonDeterministicException {}
   };
 
   @JsonProperty

http://git-wip-us.apache.org/repos/asf/beam/blob/8b96949b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Done.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Done.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Done.java
index 6009463..42999cd 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Done.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Done.java
@@ -19,16 +19,14 @@ package org.apache.beam.integration.nexmark.model;
 
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.core.JsonProcessingException;
-
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.io.Serializable;
-
 import org.apache.beam.integration.nexmark.NexmarkUtils;
-import org.apache.beam.sdk.coders.AtomicCoder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.CustomCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 
 /**
@@ -37,7 +35,7 @@ import org.apache.beam.sdk.coders.StringUtf8Coder;
 public class Done implements KnownSize, Serializable {
   private static final Coder<String> STRING_CODER = StringUtf8Coder.of();
 
-  public static final Coder<Done> CODER = new AtomicCoder<Done>() {
+  public static final Coder<Done> CODER = new CustomCoder<Done>() {
     @Override
     public void encode(Done value, OutputStream outStream,
         Coder.Context context)
@@ -52,6 +50,7 @@ public class Done implements KnownSize, Serializable {
       String message = STRING_CODER.decode(inStream, Context.NESTED);
       return new Done(message);
     }
+    @Override public void verifyDeterministic() throws NonDeterministicException {}
   };
 
   @JsonProperty

http://git-wip-us.apache.org/repos/asf/beam/blob/8b96949b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Event.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Event.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Event.java
index 8a278bf..e2130c9 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Event.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Event.java
@@ -22,10 +22,9 @@ import java.io.InputStream;
 import java.io.OutputStream;
 import java.io.Serializable;
 import javax.annotation.Nullable;
-
-import org.apache.beam.sdk.coders.AtomicCoder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.CustomCoder;
 import org.apache.beam.sdk.coders.VarIntCoder;
 
 /**
@@ -35,7 +34,7 @@ import org.apache.beam.sdk.coders.VarIntCoder;
 public class Event implements KnownSize, Serializable {
   private static final Coder<Integer> INT_CODER = VarIntCoder.of();
 
-  public static final Coder<Event> CODER = new AtomicCoder<Event>() {
+  public static final Coder<Event> CODER = new CustomCoder<Event>() {
     @Override
     public void encode(Event value, OutputStream outStream, Coder.Context context)
         throws CoderException, IOException {
@@ -71,6 +70,7 @@ public class Event implements KnownSize, Serializable {
         throw new RuntimeException("invalid event encoding");
       }
     }
+    @Override public void verifyDeterministic() throws NonDeterministicException {}
   };
 
   @Nullable

http://git-wip-us.apache.org/repos/asf/beam/blob/8b96949b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/IdNameReserve.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/IdNameReserve.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/IdNameReserve.java
index 5d22651..cf1e571 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/IdNameReserve.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/IdNameReserve.java
@@ -19,16 +19,14 @@ package org.apache.beam.integration.nexmark.model;
 
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.core.JsonProcessingException;
-
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.io.Serializable;
-
 import org.apache.beam.integration.nexmark.NexmarkUtils;
-import org.apache.beam.sdk.coders.AtomicCoder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.CustomCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VarLongCoder;
 
@@ -39,7 +37,7 @@ public class IdNameReserve implements KnownSize, Serializable {
   private static final Coder<Long> LONG_CODER = VarLongCoder.of();
   private static final Coder<String> STRING_CODER = StringUtf8Coder.of();
 
-  public static final Coder<IdNameReserve> CODER = new AtomicCoder<IdNameReserve>() {
+  public static final Coder<IdNameReserve> CODER = new CustomCoder<IdNameReserve>() {
     @Override
     public void encode(IdNameReserve value, OutputStream outStream,
         Coder.Context context)
@@ -58,6 +56,7 @@ public class IdNameReserve implements KnownSize, Serializable {
       long reserve = LONG_CODER.decode(inStream, Context.NESTED);
       return new IdNameReserve(id, name, reserve);
     }
+    @Override public void verifyDeterministic() throws NonDeterministicException {}
   };
 
   @JsonProperty

http://git-wip-us.apache.org/repos/asf/beam/blob/8b96949b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/NameCityStateId.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/NameCityStateId.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/NameCityStateId.java
index ac22879..86d1738 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/NameCityStateId.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/NameCityStateId.java
@@ -19,16 +19,14 @@ package org.apache.beam.integration.nexmark.model;
 
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.core.JsonProcessingException;
-
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.io.Serializable;
-
 import org.apache.beam.integration.nexmark.NexmarkUtils;
-import org.apache.beam.sdk.coders.AtomicCoder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.CustomCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VarLongCoder;
 
@@ -39,7 +37,7 @@ public class NameCityStateId implements KnownSize, Serializable {
   private static final Coder<Long> LONG_CODER = VarLongCoder.of();
   private static final Coder<String> STRING_CODER = StringUtf8Coder.of();
 
-  public static final Coder<NameCityStateId> CODER = new AtomicCoder<NameCityStateId>() {
+  public static final Coder<NameCityStateId> CODER = new CustomCoder<NameCityStateId>() {
     @Override
     public void encode(NameCityStateId value, OutputStream outStream,
         Coder.Context context)
@@ -60,6 +58,7 @@ public class NameCityStateId implements KnownSize, Serializable {
       long id = LONG_CODER.decode(inStream, Context.NESTED);
       return new NameCityStateId(name, city, state, id);
     }
+    @Override public void verifyDeterministic() throws NonDeterministicException {}
   };
 
   @JsonProperty

http://git-wip-us.apache.org/repos/asf/beam/blob/8b96949b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Person.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Person.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Person.java
index 85c7183..906df94 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Person.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Person.java
@@ -19,16 +19,14 @@ package org.apache.beam.integration.nexmark.model;
 
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.core.JsonProcessingException;
-
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.io.Serializable;
-
 import org.apache.beam.integration.nexmark.NexmarkUtils;
-import org.apache.beam.sdk.coders.AtomicCoder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.CustomCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VarLongCoder;
 
@@ -38,7 +36,7 @@ import org.apache.beam.sdk.coders.VarLongCoder;
 public class Person implements KnownSize, Serializable {
   private static final Coder<Long> LONG_CODER = VarLongCoder.of();
   private static final Coder<String> STRING_CODER = StringUtf8Coder.of();
-  public static final Coder<Person> CODER = new AtomicCoder<Person>() {
+  public static final Coder<Person> CODER = new CustomCoder<Person>() {
     @Override
     public void encode(Person value, OutputStream outStream,
         Coder.Context context)
@@ -67,6 +65,7 @@ public class Person implements KnownSize, Serializable {
       String extra = STRING_CODER.decode(inStream, Context.NESTED);
       return new Person(id, name, emailAddress, creditCard, city, state, dateTime, extra);
     }
+    @Override public void verifyDeterministic() throws NonDeterministicException {}
   };
 
   /** Id of person. */

http://git-wip-us.apache.org/repos/asf/beam/blob/8b96949b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/SellerPrice.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/SellerPrice.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/SellerPrice.java
index b7c2b14..68f2697 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/SellerPrice.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/SellerPrice.java
@@ -19,16 +19,14 @@ package org.apache.beam.integration.nexmark.model;
 
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.core.JsonProcessingException;
-
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.io.Serializable;
-
 import org.apache.beam.integration.nexmark.NexmarkUtils;
-import org.apache.beam.sdk.coders.AtomicCoder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.CustomCoder;
 import org.apache.beam.sdk.coders.VarLongCoder;
 
 /**
@@ -37,7 +35,7 @@ import org.apache.beam.sdk.coders.VarLongCoder;
 public class SellerPrice implements KnownSize, Serializable {
   private static final Coder<Long> LONG_CODER = VarLongCoder.of();
 
-  public static final Coder<SellerPrice> CODER = new AtomicCoder<SellerPrice>() {
+  public static final Coder<SellerPrice> CODER = new CustomCoder<SellerPrice>() {
     @Override
     public void encode(SellerPrice value, OutputStream outStream,
         Coder.Context context)
@@ -54,6 +52,7 @@ public class SellerPrice implements KnownSize, Serializable {
       long price = LONG_CODER.decode(inStream, Context.NESTED);
       return new SellerPrice(seller, price);
     }
+    @Override public void verifyDeterministic() throws NonDeterministicException {}
   };
 
   @JsonProperty

http://git-wip-us.apache.org/repos/asf/beam/blob/8b96949b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/sources/Generator.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/sources/Generator.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/sources/Generator.java
index cffc7a5..012d4e6 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/sources/Generator.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/sources/Generator.java
@@ -27,14 +27,13 @@ import java.util.Arrays;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Random;
-
 import org.apache.beam.integration.nexmark.model.Auction;
 import org.apache.beam.integration.nexmark.model.Bid;
 import org.apache.beam.integration.nexmark.model.Event;
 import org.apache.beam.integration.nexmark.model.Person;
-import org.apache.beam.sdk.coders.AtomicCoder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.CustomCoder;
 import org.apache.beam.sdk.coders.VarLongCoder;
 import org.apache.beam.sdk.io.UnboundedSource;
 import org.apache.beam.sdk.values.TimestampedValue;
@@ -102,7 +101,7 @@ public class Generator implements Iterator<TimestampedValue<Event>>, Serializabl
 
     /** Coder for this class. */
     public static final Coder<Checkpoint> CODER_INSTANCE =
-        new AtomicCoder<Checkpoint>() {
+        new CustomCoder<Checkpoint>() {
           @Override
           public void encode(
               Checkpoint value,
@@ -121,6 +120,7 @@ public class Generator implements Iterator<TimestampedValue<Event>>, Serializabl
             long wallclockBaseTime = LONG_CODER.decode(inStream, Context.NESTED);
             return new Checkpoint(numEvents, wallclockBaseTime);
           }
+          @Override public void verifyDeterministic() throws NonDeterministicException {}
         };
 
     private long numEvents;