You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ie...@apache.org on 2017/08/23 17:09:41 UTC

[33/55] [abbrv] beam git commit: Migrate to Beam 2.1.0-SNAPSHOT

Migrate to Beam 2.1.0-SNAPSHOT


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/3d5c3d00
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/3d5c3d00
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/3d5c3d00

Branch: refs/heads/master
Commit: 3d5c3d009b441a8085189f9d4ed1926a4042f816
Parents: 69953a0
Author: Etienne Chauchot <ec...@gmail.com>
Authored: Tue May 9 15:25:54 2017 +0200
Committer: Ismaël Mejía <ie...@gmail.com>
Committed: Wed Aug 23 19:07:28 2017 +0200

----------------------------------------------------------------------
 integration/java/nexmark/pom.xml                |  2 +-
 .../integration/nexmark/NexmarkLauncher.java    |  4 +-
 .../beam/integration/nexmark/NexmarkUtils.java  | 43 +++++++++----------
 .../beam/integration/nexmark/model/Auction.java | 45 ++++++++++----------
 .../integration/nexmark/model/AuctionBid.java   | 13 +++---
 .../integration/nexmark/model/AuctionCount.java | 14 +++---
 .../integration/nexmark/model/AuctionPrice.java | 13 +++---
 .../beam/integration/nexmark/model/Bid.java     | 25 ++++++-----
 .../nexmark/model/BidsPerSession.java           | 13 +++---
 .../nexmark/model/CategoryPrice.java            | 18 ++++----
 .../beam/integration/nexmark/model/Done.java    | 10 ++---
 .../beam/integration/nexmark/model/Event.java   | 24 +++++------
 .../nexmark/model/IdNameReserve.java            | 17 ++++----
 .../nexmark/model/NameCityStateId.java          | 22 +++++-----
 .../beam/integration/nexmark/model/Person.java  | 38 ++++++++---------
 .../integration/nexmark/model/SellerPrice.java  | 13 +++---
 .../nexmark/queries/WinningBids.java            | 16 +++----
 .../integration/nexmark/sources/Generator.java  | 19 +++------
 integration/java/pom.xml                        |  2 +-
 integration/pom.xml                             |  2 +-
 20 files changed, 163 insertions(+), 190 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/3d5c3d00/integration/java/nexmark/pom.xml
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/pom.xml b/integration/java/nexmark/pom.xml
index 35fe0f3..86b88bd 100644
--- a/integration/java/nexmark/pom.xml
+++ b/integration/java/nexmark/pom.xml
@@ -23,7 +23,7 @@
   <parent>
     <groupId>org.apache.beam</groupId>
     <artifactId>beam-integration-java-parent</artifactId>
-    <version>0.7.0-SNAPSHOT</version>
+    <version>2.1.0-SNAPSHOT</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 

http://git-wip-us.apache.org/repos/asf/beam/blob/3d5c3d00/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkLauncher.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkLauncher.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkLauncher.java
index ea4ff58..db53191 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkLauncher.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkLauncher.java
@@ -739,7 +739,7 @@ public class NexmarkLauncher<OptionT extends NexmarkOptions> {
     NexmarkUtils.console("Reading events from Pubsub %s", shortSubscription);
 
     PubsubIO.Read<PubsubMessage> io =
-        PubsubIO.readPubsubMessagesWithAttributes().fromSubscription(shortSubscription)
+        PubsubIO.readMessagesWithAttributes().fromSubscription(shortSubscription)
             .withIdAttribute(NexmarkUtils.PUBSUB_ID);
     if (!configuration.usePubsubPublishTime) {
       io = io.withTimestampAttribute(NexmarkUtils.PUBSUB_TIMESTAMP);
@@ -784,7 +784,7 @@ public class NexmarkLauncher<OptionT extends NexmarkOptions> {
     NexmarkUtils.console("Writing events to Pubsub %s", shortTopic);
 
     PubsubIO.Write<PubsubMessage> io =
-        PubsubIO.writePubsubMessages().to(shortTopic)
+        PubsubIO.writeMessages().to(shortTopic)
             .withIdAttribute(NexmarkUtils.PUBSUB_ID);
     if (!configuration.usePubsubPublishTime) {
       io = io.withTimestampAttribute(NexmarkUtils.PUBSUB_TIMESTAMP);

http://git-wip-us.apache.org/repos/asf/beam/blob/3d5c3d00/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkUtils.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkUtils.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkUtils.java
index ea851af..7707429 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkUtils.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkUtils.java
@@ -351,25 +351,25 @@ public class NexmarkUtils {
     CoderRegistry registry = p.getCoderRegistry();
     switch (coderStrategy) {
       case HAND:
-        registry.registerCoder(Auction.class, Auction.CODER);
-        registry.registerCoder(AuctionBid.class, AuctionBid.CODER);
-        registry.registerCoder(AuctionCount.class, AuctionCount.CODER);
-        registry.registerCoder(AuctionPrice.class, AuctionPrice.CODER);
-        registry.registerCoder(Bid.class, Bid.CODER);
-        registry.registerCoder(CategoryPrice.class, CategoryPrice.CODER);
-        registry.registerCoder(Event.class, Event.CODER);
-        registry.registerCoder(IdNameReserve.class, IdNameReserve.CODER);
-        registry.registerCoder(NameCityStateId.class, NameCityStateId.CODER);
-        registry.registerCoder(Person.class, Person.CODER);
-        registry.registerCoder(SellerPrice.class, SellerPrice.CODER);
-        registry.registerCoder(Done.class, Done.CODER);
-        registry.registerCoder(BidsPerSession.class, BidsPerSession.CODER);
+        registry.registerCoderForClass(Auction.class, Auction.CODER);
+        registry.registerCoderForClass(AuctionBid.class, AuctionBid.CODER);
+        registry.registerCoderForClass(AuctionCount.class, AuctionCount.CODER);
+        registry.registerCoderForClass(AuctionPrice.class, AuctionPrice.CODER);
+        registry.registerCoderForClass(Bid.class, Bid.CODER);
+        registry.registerCoderForClass(CategoryPrice.class, CategoryPrice.CODER);
+        registry.registerCoderForClass(Event.class, Event.CODER);
+        registry.registerCoderForClass(IdNameReserve.class, IdNameReserve.CODER);
+        registry.registerCoderForClass(NameCityStateId.class, NameCityStateId.CODER);
+        registry.registerCoderForClass(Person.class, Person.CODER);
+        registry.registerCoderForClass(SellerPrice.class, SellerPrice.CODER);
+        registry.registerCoderForClass(Done.class, Done.CODER);
+        registry.registerCoderForClass(BidsPerSession.class, BidsPerSession.CODER);
         break;
       case AVRO:
-        registry.setFallbackCoderProvider(AvroCoder.PROVIDER);
+        registry.registerCoderProvider(AvroCoder.getCoderProvider());
         break;
       case JAVA:
-        registry.setFallbackCoderProvider(SerializableCoder.PROVIDER);
+        registry.registerCoderProvider(SerializableCoder.getCoderProvider());
         break;
     }
   }
@@ -621,22 +621,17 @@ public class NexmarkUtils {
     }
 
     @Override
-    public void encode(KnownSize value, OutputStream outStream, Context context)
+    public void encode(KnownSize value, OutputStream outStream)
         throws CoderException, IOException {
       @SuppressWarnings("unchecked")
       T typedValue = (T) value;
-      trueCoder.encode(typedValue, outStream, context);
+      trueCoder.encode(typedValue, outStream);
     }
 
     @Override
-    public KnownSize decode(InputStream inStream, Context context)
+    public KnownSize decode(InputStream inStream)
         throws CoderException, IOException {
-      return trueCoder.decode(inStream, context);
-    }
-
-    @Override
-    public List<? extends Coder<?>> getComponents() {
-      return ImmutableList.of(trueCoder);
+      return trueCoder.decode(inStream);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/3d5c3d00/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Auction.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Auction.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Auction.java
index 5c018dc..9f5d7c0 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Auction.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Auction.java
@@ -39,35 +39,34 @@ public class Auction implements KnownSize, Serializable {
 
   public static final Coder<Auction> CODER = new CustomCoder<Auction>() {
     @Override
-    public void encode(Auction value, OutputStream outStream,
-        Coder.Context context)
+    public void encode(Auction value, OutputStream outStream)
         throws CoderException, IOException {
-      LONG_CODER.encode(value.id, outStream, Context.NESTED);
-      STRING_CODER.encode(value.itemName, outStream, Context.NESTED);
-      STRING_CODER.encode(value.description, outStream, Context.NESTED);
-      LONG_CODER.encode(value.initialBid, outStream, Context.NESTED);
-      LONG_CODER.encode(value.reserve, outStream, Context.NESTED);
-      LONG_CODER.encode(value.dateTime, outStream, Context.NESTED);
-      LONG_CODER.encode(value.expires, outStream, Context.NESTED);
-      LONG_CODER.encode(value.seller, outStream, Context.NESTED);
-      LONG_CODER.encode(value.category, outStream, Context.NESTED);
-      STRING_CODER.encode(value.extra, outStream, Context.NESTED);
+      LONG_CODER.encode(value.id, outStream);
+      STRING_CODER.encode(value.itemName, outStream);
+      STRING_CODER.encode(value.description, outStream);
+      LONG_CODER.encode(value.initialBid, outStream);
+      LONG_CODER.encode(value.reserve, outStream);
+      LONG_CODER.encode(value.dateTime, outStream);
+      LONG_CODER.encode(value.expires, outStream);
+      LONG_CODER.encode(value.seller, outStream);
+      LONG_CODER.encode(value.category, outStream);
+      STRING_CODER.encode(value.extra, outStream);
     }
 
     @Override
     public Auction decode(
-        InputStream inStream, Coder.Context context)
+        InputStream inStream)
         throws CoderException, IOException {
-      long id = LONG_CODER.decode(inStream, Context.NESTED);
-      String itemName = STRING_CODER.decode(inStream, Context.NESTED);
-      String description = STRING_CODER.decode(inStream, Context.NESTED);
-      long initialBid = LONG_CODER.decode(inStream, Context.NESTED);
-      long reserve = LONG_CODER.decode(inStream, Context.NESTED);
-      long dateTime = LONG_CODER.decode(inStream, Context.NESTED);
-      long expires = LONG_CODER.decode(inStream, Context.NESTED);
-      long seller = LONG_CODER.decode(inStream, Context.NESTED);
-      long category = LONG_CODER.decode(inStream, Context.NESTED);
-      String extra = STRING_CODER.decode(inStream, Context.NESTED);
+      long id = LONG_CODER.decode(inStream);
+      String itemName = STRING_CODER.decode(inStream);
+      String description = STRING_CODER.decode(inStream);
+      long initialBid = LONG_CODER.decode(inStream);
+      long reserve = LONG_CODER.decode(inStream);
+      long dateTime = LONG_CODER.decode(inStream);
+      long expires = LONG_CODER.decode(inStream);
+      long seller = LONG_CODER.decode(inStream);
+      long category = LONG_CODER.decode(inStream);
+      String extra = STRING_CODER.decode(inStream);
       return new Auction(
           id, itemName, description, initialBid, reserve, dateTime, expires, seller, category,
           extra);

http://git-wip-us.apache.org/repos/asf/beam/blob/3d5c3d00/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/AuctionBid.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/AuctionBid.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/AuctionBid.java
index b1d9ec2..b9d79db 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/AuctionBid.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/AuctionBid.java
@@ -34,19 +34,18 @@ import org.apache.beam.sdk.coders.CustomCoder;
 public class AuctionBid implements KnownSize, Serializable {
   public static final Coder<AuctionBid> CODER = new CustomCoder<AuctionBid>() {
     @Override
-    public void encode(AuctionBid value, OutputStream outStream,
-        Coder.Context context)
+    public void encode(AuctionBid value, OutputStream outStream)
         throws CoderException, IOException {
-      Auction.CODER.encode(value.auction, outStream, Context.NESTED);
-      Bid.CODER.encode(value.bid, outStream, Context.NESTED);
+      Auction.CODER.encode(value.auction, outStream);
+      Bid.CODER.encode(value.bid, outStream);
     }
 
     @Override
     public AuctionBid decode(
-        InputStream inStream, Coder.Context context)
+        InputStream inStream)
         throws CoderException, IOException {
-      Auction auction = Auction.CODER.decode(inStream, Context.NESTED);
-      Bid bid = Bid.CODER.decode(inStream, Context.NESTED);
+      Auction auction = Auction.CODER.decode(inStream);
+      Bid bid = Bid.CODER.decode(inStream);
       return new AuctionBid(auction, bid);
     }
   };

http://git-wip-us.apache.org/repos/asf/beam/blob/3d5c3d00/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/AuctionCount.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/AuctionCount.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/AuctionCount.java
index c83a455..0e643ff 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/AuctionCount.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/AuctionCount.java
@@ -37,19 +37,17 @@ public class AuctionCount implements KnownSize, Serializable {
 
   public static final Coder<AuctionCount> CODER = new CustomCoder<AuctionCount>() {
     @Override
-    public void encode(AuctionCount value, OutputStream outStream,
-        Coder.Context context)
+    public void encode(AuctionCount value, OutputStream outStream)
         throws CoderException, IOException {
-      LONG_CODER.encode(value.auction, outStream, Context.NESTED);
-      LONG_CODER.encode(value.count, outStream, Context.NESTED);
+      LONG_CODER.encode(value.auction, outStream);
+      LONG_CODER.encode(value.count, outStream);
     }
 
     @Override
-    public AuctionCount decode(
-        InputStream inStream, Coder.Context context)
+    public AuctionCount decode(InputStream inStream)
         throws CoderException, IOException {
-      long auction = LONG_CODER.decode(inStream, Context.NESTED);
-      long count = LONG_CODER.decode(inStream, Context.NESTED);
+      long auction = LONG_CODER.decode(inStream);
+      long count = LONG_CODER.decode(inStream);
       return new AuctionCount(auction, count);
     }
   };

http://git-wip-us.apache.org/repos/asf/beam/blob/3d5c3d00/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/AuctionPrice.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/AuctionPrice.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/AuctionPrice.java
index 43d0b27..7d51a21 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/AuctionPrice.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/AuctionPrice.java
@@ -37,19 +37,18 @@ public class AuctionPrice implements KnownSize, Serializable {
 
   public static final Coder<AuctionPrice> CODER = new CustomCoder<AuctionPrice>() {
     @Override
-    public void encode(AuctionPrice value, OutputStream outStream,
-        Coder.Context context)
+    public void encode(AuctionPrice value, OutputStream outStream)
         throws CoderException, IOException {
-      LONG_CODER.encode(value.auction, outStream, Context.NESTED);
-      LONG_CODER.encode(value.price, outStream, Context.NESTED);
+      LONG_CODER.encode(value.auction, outStream);
+      LONG_CODER.encode(value.price, outStream);
     }
 
     @Override
     public AuctionPrice decode(
-        InputStream inStream, Coder.Context context)
+        InputStream inStream)
         throws CoderException, IOException {
-      long auction = LONG_CODER.decode(inStream, Context.NESTED);
-      long price = LONG_CODER.decode(inStream, Context.NESTED);
+      long auction = LONG_CODER.decode(inStream);
+      long price = LONG_CODER.decode(inStream);
       return new AuctionPrice(auction, price);
     }
   };

http://git-wip-us.apache.org/repos/asf/beam/blob/3d5c3d00/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Bid.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Bid.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Bid.java
index faeb928..4fa9ea0 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Bid.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Bid.java
@@ -40,25 +40,24 @@ public class Bid implements KnownSize, Serializable {
 
   public static final Coder<Bid> CODER = new CustomCoder<Bid>() {
     @Override
-    public void encode(Bid value, OutputStream outStream,
-        Coder.Context context)
+    public void encode(Bid value, OutputStream outStream)
         throws CoderException, IOException {
-      LONG_CODER.encode(value.auction, outStream, Context.NESTED);
-      LONG_CODER.encode(value.bidder, outStream, Context.NESTED);
-      LONG_CODER.encode(value.price, outStream, Context.NESTED);
-      LONG_CODER.encode(value.dateTime, outStream, Context.NESTED);
-      STRING_CODER.encode(value.extra, outStream, Context.NESTED);
+      LONG_CODER.encode(value.auction, outStream);
+      LONG_CODER.encode(value.bidder, outStream);
+      LONG_CODER.encode(value.price, outStream);
+      LONG_CODER.encode(value.dateTime, outStream);
+      STRING_CODER.encode(value.extra, outStream);
     }
 
     @Override
     public Bid decode(
-        InputStream inStream, Coder.Context context)
+        InputStream inStream)
         throws CoderException, IOException {
-      long auction = LONG_CODER.decode(inStream, Context.NESTED);
-      long bidder = LONG_CODER.decode(inStream, Context.NESTED);
-      long price = LONG_CODER.decode(inStream, Context.NESTED);
-      long dateTime = LONG_CODER.decode(inStream, Context.NESTED);
-      String extra = STRING_CODER.decode(inStream, Context.NESTED);
+      long auction = LONG_CODER.decode(inStream);
+      long bidder = LONG_CODER.decode(inStream);
+      long price = LONG_CODER.decode(inStream);
+      long dateTime = LONG_CODER.decode(inStream);
+      String extra = STRING_CODER.decode(inStream);
       return new Bid(auction, bidder, price, dateTime, extra);
     }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/3d5c3d00/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/BidsPerSession.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/BidsPerSession.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/BidsPerSession.java
index 6dddf34..3211456 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/BidsPerSession.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/BidsPerSession.java
@@ -37,19 +37,18 @@ public class BidsPerSession implements KnownSize, Serializable {
 
   public static final Coder<BidsPerSession> CODER = new CustomCoder<BidsPerSession>() {
     @Override
-    public void encode(BidsPerSession value, OutputStream outStream,
-        Coder.Context context)
+    public void encode(BidsPerSession value, OutputStream outStream)
         throws CoderException, IOException {
-      LONG_CODER.encode(value.personId, outStream, Context.NESTED);
-      LONG_CODER.encode(value.bidsPerSession, outStream, Context.NESTED);
+      LONG_CODER.encode(value.personId, outStream);
+      LONG_CODER.encode(value.bidsPerSession, outStream);
     }
 
     @Override
     public BidsPerSession decode(
-        InputStream inStream, Coder.Context context)
+        InputStream inStream)
         throws CoderException, IOException {
-      long personId = LONG_CODER.decode(inStream, Context.NESTED);
-      long bidsPerSession = LONG_CODER.decode(inStream, Context.NESTED);
+      long personId = LONG_CODER.decode(inStream);
+      long bidsPerSession = LONG_CODER.decode(inStream);
       return new BidsPerSession(personId, bidsPerSession);
     }
     @Override public void verifyDeterministic() throws NonDeterministicException {}

http://git-wip-us.apache.org/repos/asf/beam/blob/3d5c3d00/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/CategoryPrice.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/CategoryPrice.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/CategoryPrice.java
index ccb2bc7..2678198 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/CategoryPrice.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/CategoryPrice.java
@@ -39,21 +39,19 @@ public class CategoryPrice implements KnownSize, Serializable {
 
   public static final Coder<CategoryPrice> CODER = new CustomCoder<CategoryPrice>() {
     @Override
-    public void encode(CategoryPrice value, OutputStream outStream,
-        Coder.Context context)
+    public void encode(CategoryPrice value, OutputStream outStream)
         throws CoderException, IOException {
-      LONG_CODER.encode(value.category, outStream, Context.NESTED);
-      LONG_CODER.encode(value.price, outStream, Context.NESTED);
-      INT_CODER.encode(value.isLast ? 1 : 0, outStream, Context.NESTED);
+      LONG_CODER.encode(value.category, outStream);
+      LONG_CODER.encode(value.price, outStream);
+      INT_CODER.encode(value.isLast ? 1 : 0, outStream);
     }
 
     @Override
-    public CategoryPrice decode(
-        InputStream inStream, Coder.Context context)
+    public CategoryPrice decode(InputStream inStream)
         throws CoderException, IOException {
-      long category = LONG_CODER.decode(inStream, Context.NESTED);
-      long price = LONG_CODER.decode(inStream, Context.NESTED);
-      boolean isLast = INT_CODER.decode(inStream, context) != 0;
+      long category = LONG_CODER.decode(inStream);
+      long price = LONG_CODER.decode(inStream);
+      boolean isLast = INT_CODER.decode(inStream) != 0;
       return new CategoryPrice(category, price, isLast);
     }
     @Override public void verifyDeterministic() throws NonDeterministicException {}

http://git-wip-us.apache.org/repos/asf/beam/blob/3d5c3d00/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Done.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Done.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Done.java
index 0c14e8f..b0a88d4 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Done.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Done.java
@@ -37,17 +37,15 @@ public class Done implements KnownSize, Serializable {
 
   public static final Coder<Done> CODER = new CustomCoder<Done>() {
     @Override
-    public void encode(Done value, OutputStream outStream,
-        Coder.Context context)
+    public void encode(Done value, OutputStream outStream)
         throws CoderException, IOException {
-      STRING_CODER.encode(value.message, outStream, Context.NESTED);
+      STRING_CODER.encode(value.message, outStream);
     }
 
     @Override
-    public Done decode(
-        InputStream inStream, Coder.Context context)
+    public Done decode(InputStream inStream)
         throws CoderException, IOException {
-      String message = STRING_CODER.decode(inStream, Context.NESTED);
+      String message = STRING_CODER.decode(inStream);
       return new Done(message);
     }
     @Override public void verifyDeterministic() throws NonDeterministicException {}

http://git-wip-us.apache.org/repos/asf/beam/blob/3d5c3d00/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Event.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Event.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Event.java
index 1f1f096..d813833 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Event.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Event.java
@@ -36,17 +36,17 @@ public class Event implements KnownSize, Serializable {
 
   public static final Coder<Event> CODER = new CustomCoder<Event>() {
     @Override
-    public void encode(Event value, OutputStream outStream, Coder.Context context)
+    public void encode(Event value, OutputStream outStream)
         throws CoderException, IOException {
       if (value.newPerson != null) {
-        INT_CODER.encode(0, outStream, Context.NESTED);
-        Person.CODER.encode(value.newPerson, outStream, Context.NESTED);
+        INT_CODER.encode(0, outStream);
+        Person.CODER.encode(value.newPerson, outStream);
       } else if (value.newAuction != null) {
-        INT_CODER.encode(1, outStream, Context.NESTED);
-        Auction.CODER.encode(value.newAuction, outStream, Context.NESTED);
+        INT_CODER.encode(1, outStream);
+        Auction.CODER.encode(value.newAuction, outStream);
       } else if (value.bid != null) {
-        INT_CODER.encode(2, outStream, Context.NESTED);
-        Bid.CODER.encode(value.bid, outStream, Context.NESTED);
+        INT_CODER.encode(2, outStream);
+        Bid.CODER.encode(value.bid, outStream);
       } else {
         throw new RuntimeException("invalid event");
       }
@@ -54,17 +54,17 @@ public class Event implements KnownSize, Serializable {
 
     @Override
     public Event decode(
-        InputStream inStream, Coder.Context context)
+        InputStream inStream)
         throws CoderException, IOException {
-      int tag = INT_CODER.decode(inStream, context);
+      int tag = INT_CODER.decode(inStream);
       if (tag == 0) {
-        Person person = Person.CODER.decode(inStream, Context.NESTED);
+        Person person = Person.CODER.decode(inStream);
         return new Event(person);
       } else if (tag == 1) {
-        Auction auction = Auction.CODER.decode(inStream, Context.NESTED);
+        Auction auction = Auction.CODER.decode(inStream);
         return new Event(auction);
       } else if (tag == 2) {
-        Bid bid = Bid.CODER.decode(inStream, Context.NESTED);
+        Bid bid = Bid.CODER.decode(inStream);
         return new Event(bid);
       } else {
         throw new RuntimeException("invalid event encoding");

http://git-wip-us.apache.org/repos/asf/beam/blob/3d5c3d00/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/IdNameReserve.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/IdNameReserve.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/IdNameReserve.java
index 17b8c4a..8cade4e 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/IdNameReserve.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/IdNameReserve.java
@@ -39,21 +39,20 @@ public class IdNameReserve implements KnownSize, Serializable {
 
   public static final Coder<IdNameReserve> CODER = new CustomCoder<IdNameReserve>() {
     @Override
-    public void encode(IdNameReserve value, OutputStream outStream,
-        Coder.Context context)
+    public void encode(IdNameReserve value, OutputStream outStream)
         throws CoderException, IOException {
-      LONG_CODER.encode(value.id, outStream, Context.NESTED);
-      STRING_CODER.encode(value.name, outStream, Context.NESTED);
-      LONG_CODER.encode(value.reserve, outStream, Context.NESTED);
+      LONG_CODER.encode(value.id, outStream);
+      STRING_CODER.encode(value.name, outStream);
+      LONG_CODER.encode(value.reserve, outStream);
     }
 
     @Override
     public IdNameReserve decode(
-        InputStream inStream, Coder.Context context)
+        InputStream inStream)
         throws CoderException, IOException {
-      long id = LONG_CODER.decode(inStream, Context.NESTED);
-      String name = STRING_CODER.decode(inStream, Context.NESTED);
-      long reserve = LONG_CODER.decode(inStream, Context.NESTED);
+      long id = LONG_CODER.decode(inStream);
+      String name = STRING_CODER.decode(inStream);
+      long reserve = LONG_CODER.decode(inStream);
       return new IdNameReserve(id, name, reserve);
     }
     @Override public void verifyDeterministic() throws NonDeterministicException {}

http://git-wip-us.apache.org/repos/asf/beam/blob/3d5c3d00/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/NameCityStateId.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/NameCityStateId.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/NameCityStateId.java
index 28f25cd..37bd3c6 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/NameCityStateId.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/NameCityStateId.java
@@ -39,23 +39,21 @@ public class NameCityStateId implements KnownSize, Serializable {
 
   public static final Coder<NameCityStateId> CODER = new CustomCoder<NameCityStateId>() {
     @Override
-    public void encode(NameCityStateId value, OutputStream outStream,
-        Coder.Context context)
+    public void encode(NameCityStateId value, OutputStream outStream)
         throws CoderException, IOException {
-      STRING_CODER.encode(value.name, outStream, Context.NESTED);
-      STRING_CODER.encode(value.city, outStream, Context.NESTED);
-      STRING_CODER.encode(value.state, outStream, Context.NESTED);
-      LONG_CODER.encode(value.id, outStream, Context.NESTED);
+      STRING_CODER.encode(value.name, outStream);
+      STRING_CODER.encode(value.city, outStream);
+      STRING_CODER.encode(value.state, outStream);
+      LONG_CODER.encode(value.id, outStream);
     }
 
     @Override
-    public NameCityStateId decode(
-        InputStream inStream, Coder.Context context)
+    public NameCityStateId decode(InputStream inStream)
         throws CoderException, IOException {
-      String name = STRING_CODER.decode(inStream, Context.NESTED);
-      String city = STRING_CODER.decode(inStream, Context.NESTED);
-      String state = STRING_CODER.decode(inStream, Context.NESTED);
-      long id = LONG_CODER.decode(inStream, Context.NESTED);
+      String name = STRING_CODER.decode(inStream);
+      String city = STRING_CODER.decode(inStream);
+      String state = STRING_CODER.decode(inStream);
+      long id = LONG_CODER.decode(inStream);
       return new NameCityStateId(name, city, state, id);
     }
     @Override public void verifyDeterministic() throws NonDeterministicException {}

http://git-wip-us.apache.org/repos/asf/beam/blob/3d5c3d00/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Person.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Person.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Person.java
index c690fd4..bde587d 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Person.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Person.java
@@ -38,31 +38,29 @@ public class Person implements KnownSize, Serializable {
   private static final Coder<String> STRING_CODER = StringUtf8Coder.of();
   public static final Coder<Person> CODER = new CustomCoder<Person>() {
     @Override
-    public void encode(Person value, OutputStream outStream,
-        Coder.Context context)
+    public void encode(Person value, OutputStream outStream)
         throws CoderException, IOException {
-      LONG_CODER.encode(value.id, outStream, Context.NESTED);
-      STRING_CODER.encode(value.name, outStream, Context.NESTED);
-      STRING_CODER.encode(value.emailAddress, outStream, Context.NESTED);
-      STRING_CODER.encode(value.creditCard, outStream, Context.NESTED);
-      STRING_CODER.encode(value.city, outStream, Context.NESTED);
-      STRING_CODER.encode(value.state, outStream, Context.NESTED);
-      LONG_CODER.encode(value.dateTime, outStream, Context.NESTED);
-      STRING_CODER.encode(value.extra, outStream, Context.NESTED);
+      LONG_CODER.encode(value.id, outStream);
+      STRING_CODER.encode(value.name, outStream);
+      STRING_CODER.encode(value.emailAddress, outStream);
+      STRING_CODER.encode(value.creditCard, outStream);
+      STRING_CODER.encode(value.city, outStream);
+      STRING_CODER.encode(value.state, outStream);
+      LONG_CODER.encode(value.dateTime, outStream);
+      STRING_CODER.encode(value.extra, outStream);
     }
 
     @Override
-    public Person decode(
-        InputStream inStream, Coder.Context context)
+    public Person decode(InputStream inStream)
         throws CoderException, IOException {
-      long id = LONG_CODER.decode(inStream, Context.NESTED);
-      String name = STRING_CODER.decode(inStream, Context.NESTED);
-      String emailAddress = STRING_CODER.decode(inStream, Context.NESTED);
-      String creditCard = STRING_CODER.decode(inStream, Context.NESTED);
-      String city = STRING_CODER.decode(inStream, Context.NESTED);
-      String state = STRING_CODER.decode(inStream, Context.NESTED);
-      long dateTime = LONG_CODER.decode(inStream, Context.NESTED);
-      String extra = STRING_CODER.decode(inStream, Context.NESTED);
+      long id = LONG_CODER.decode(inStream);
+      String name = STRING_CODER.decode(inStream);
+      String emailAddress = STRING_CODER.decode(inStream);
+      String creditCard = STRING_CODER.decode(inStream);
+      String city = STRING_CODER.decode(inStream);
+      String state = STRING_CODER.decode(inStream);
+      long dateTime = LONG_CODER.decode(inStream);
+      String extra = STRING_CODER.decode(inStream);
       return new Person(id, name, emailAddress, creditCard, city, state, dateTime, extra);
     }
     @Override public void verifyDeterministic() throws NonDeterministicException {}

http://git-wip-us.apache.org/repos/asf/beam/blob/3d5c3d00/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/SellerPrice.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/SellerPrice.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/SellerPrice.java
index 52ff540..61537f6 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/SellerPrice.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/SellerPrice.java
@@ -37,19 +37,18 @@ public class SellerPrice implements KnownSize, Serializable {
 
   public static final Coder<SellerPrice> CODER = new CustomCoder<SellerPrice>() {
     @Override
-    public void encode(SellerPrice value, OutputStream outStream,
-        Coder.Context context)
+    public void encode(SellerPrice value, OutputStream outStream)
         throws CoderException, IOException {
-      LONG_CODER.encode(value.seller, outStream, Context.NESTED);
-      LONG_CODER.encode(value.price, outStream, Context.NESTED);
+      LONG_CODER.encode(value.seller, outStream);
+      LONG_CODER.encode(value.price, outStream);
     }
 
     @Override
     public SellerPrice decode(
-        InputStream inStream, Coder.Context context)
+        InputStream inStream)
         throws CoderException, IOException {
-      long seller = LONG_CODER.decode(inStream, Context.NESTED);
-      long price = LONG_CODER.decode(inStream, Context.NESTED);
+      long seller = LONG_CODER.decode(inStream);
+      long price = LONG_CODER.decode(inStream);
       return new SellerPrice(seller, price);
     }
     @Override public void verifyDeterministic() throws NonDeterministicException {}

http://git-wip-us.apache.org/repos/asf/beam/blob/3d5c3d00/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/WinningBids.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/WinningBids.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/WinningBids.java
index 52891a7..bd6c2ed 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/WinningBids.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/WinningBids.java
@@ -156,19 +156,19 @@ public class WinningBids extends PTransform<PCollection<Event>, PCollection<Auct
     }
 
     @Override
-    public void encode(AuctionOrBidWindow window, OutputStream outStream, Coder.Context context)
+    public void encode(AuctionOrBidWindow window, OutputStream outStream)
         throws IOException, CoderException {
-      SUPER_CODER.encode(window, outStream, Coder.Context.NESTED);
-      ID_CODER.encode(window.auction, outStream, Coder.Context.NESTED);
-      INT_CODER.encode(window.isAuctionWindow ? 1 : 0, outStream, Coder.Context.NESTED);
+      SUPER_CODER.encode(window, outStream);
+      ID_CODER.encode(window.auction, outStream);
+      INT_CODER.encode(window.isAuctionWindow ? 1 : 0, outStream);
     }
 
     @Override
-    public AuctionOrBidWindow decode(InputStream inStream, Coder.Context context)
+    public AuctionOrBidWindow decode(InputStream inStream)
         throws IOException, CoderException {
-      IntervalWindow superWindow = SUPER_CODER.decode(inStream, Coder.Context.NESTED);
-      long auction = ID_CODER.decode(inStream, Coder.Context.NESTED);
-      boolean isAuctionWindow = INT_CODER.decode(inStream, Context.NESTED) != 0;
+      IntervalWindow superWindow = SUPER_CODER.decode(inStream);
+      long auction = ID_CODER.decode(inStream);
+      boolean isAuctionWindow = INT_CODER.decode(inStream) != 0;
       return new AuctionOrBidWindow(
           superWindow.start(), superWindow.end(), auction, isAuctionWindow);
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/3d5c3d00/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/sources/Generator.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/sources/Generator.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/sources/Generator.java
index 2a2732b..4f548cd 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/sources/Generator.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/sources/Generator.java
@@ -102,22 +102,17 @@ public class Generator implements Iterator<TimestampedValue<Event>>, Serializabl
     /** Coder for this class. */
     public static final Coder<Checkpoint> CODER_INSTANCE =
         new CustomCoder<Checkpoint>() {
-          @Override
-          public void encode(
-              Checkpoint value,
-              OutputStream outStream,
-              Coder.Context context)
-              throws CoderException, IOException {
-            LONG_CODER.encode(value.numEvents, outStream, Context.NESTED);
-            LONG_CODER.encode(value.wallclockBaseTime, outStream, Context.NESTED);
+          @Override public void encode(Checkpoint value, OutputStream outStream)
+          throws CoderException, IOException {
+            LONG_CODER.encode(value.numEvents, outStream);
+            LONG_CODER.encode(value.wallclockBaseTime, outStream);
           }
 
           @Override
-          public Checkpoint decode(
-              InputStream inStream, Coder.Context context)
+          public Checkpoint decode(InputStream inStream)
               throws CoderException, IOException {
-            long numEvents = LONG_CODER.decode(inStream, Context.NESTED);
-            long wallclockBaseTime = LONG_CODER.decode(inStream, Context.NESTED);
+            long numEvents = LONG_CODER.decode(inStream);
+            long wallclockBaseTime = LONG_CODER.decode(inStream);
             return new Checkpoint(numEvents, wallclockBaseTime);
           }
           @Override public void verifyDeterministic() throws NonDeterministicException {}

http://git-wip-us.apache.org/repos/asf/beam/blob/3d5c3d00/integration/java/pom.xml
----------------------------------------------------------------------
diff --git a/integration/java/pom.xml b/integration/java/pom.xml
index dcad4c3..b0c3853 100644
--- a/integration/java/pom.xml
+++ b/integration/java/pom.xml
@@ -22,7 +22,7 @@
   <parent>
     <groupId>org.apache.beam</groupId>
     <artifactId>beam-integration-parent</artifactId>
-    <version>0.7.0-SNAPSHOT</version>
+    <version>2.1.0-SNAPSHOT</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 

http://git-wip-us.apache.org/repos/asf/beam/blob/3d5c3d00/integration/pom.xml
----------------------------------------------------------------------
diff --git a/integration/pom.xml b/integration/pom.xml
index 31f293e..4254819 100644
--- a/integration/pom.xml
+++ b/integration/pom.xml
@@ -22,7 +22,7 @@
   <parent>
     <groupId>org.apache.beam</groupId>
     <artifactId>beam-parent</artifactId>
-    <version>0.7.0-SNAPSHOT</version>
+    <version>2.1.0-SNAPSHOT</version>
     <relativePath>../pom.xml</relativePath>
   </parent>