You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ie...@apache.org on 2017/08/23 17:09:38 UTC
[30/55] [abbrv] beam git commit: Fix compile after Coders and Pubsub
refactor
Fix compile after Coders and Pubsub refactor
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/8b96949b
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/8b96949b
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/8b96949b
Branch: refs/heads/master
Commit: 8b96949b934be1df7102aeb24ef4b23d9dd28812
Parents: b438fa7
Author: Etienne Chauchot <ec...@gmail.com>
Authored: Fri Apr 28 10:29:38 2017 +0200
Committer: Ismaël Mejía <ie...@gmail.com>
Committed: Wed Aug 23 19:07:28 2017 +0200
----------------------------------------------------------------------
.../integration/nexmark/NexmarkOptions.java | 3 +--
.../beam/integration/nexmark/WinningBids.java | 23 +++++++++++---------
.../integration/nexmark/io/PubsubClient.java | 2 +-
.../integration/nexmark/io/PubsubHelper.java | 2 +-
.../nexmark/io/PubsubJsonClient.java | 2 +-
.../nexmark/io/PubsubTestClient.java | 2 +-
.../beam/integration/nexmark/model/Auction.java | 6 ++---
.../integration/nexmark/model/AuctionBid.java | 6 ++---
.../integration/nexmark/model/AuctionCount.java | 6 ++---
.../integration/nexmark/model/AuctionPrice.java | 6 ++---
.../beam/integration/nexmark/model/Bid.java | 8 +++----
.../nexmark/model/BidsPerSession.java | 7 +++---
.../nexmark/model/CategoryPrice.java | 7 +++---
.../beam/integration/nexmark/model/Done.java | 7 +++---
.../beam/integration/nexmark/model/Event.java | 6 ++---
.../nexmark/model/IdNameReserve.java | 7 +++---
.../nexmark/model/NameCityStateId.java | 7 +++---
.../beam/integration/nexmark/model/Person.java | 7 +++---
.../integration/nexmark/model/SellerPrice.java | 7 +++---
.../integration/nexmark/sources/Generator.java | 6 ++---
20 files changed, 57 insertions(+), 70 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/8b96949b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkOptions.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkOptions.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkOptions.java
index 5d093ae..e1c1af2 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkOptions.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkOptions.java
@@ -18,10 +18,9 @@
package org.apache.beam.integration.nexmark;
import javax.annotation.Nullable;
-
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubOptions;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
-import org.apache.beam.sdk.options.PubsubOptions;
/**
* Command line flags.
http://git-wip-us.apache.org/repos/asf/beam/blob/8b96949b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/WinningBids.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/WinningBids.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/WinningBids.java
index f2566b8..3815b9d 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/WinningBids.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/WinningBids.java
@@ -35,9 +35,9 @@ import org.apache.beam.integration.nexmark.model.AuctionBid;
import org.apache.beam.integration.nexmark.model.Bid;
import org.apache.beam.integration.nexmark.model.Event;
import org.apache.beam.integration.nexmark.sources.GeneratorConfig;
-import org.apache.beam.sdk.coders.AtomicCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.CustomCoder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.coders.VarLongCoder;
import org.apache.beam.sdk.metrics.Counter;
@@ -145,7 +145,7 @@ public class WinningBids extends PTransform<PCollection<Event>, PCollection<Auct
/**
* Encodes an {@link AuctionOrBidWindow} as an {@link IntervalWindow} and an auction id long.
*/
- private static class AuctionOrBidWindowCoder extends AtomicCoder<AuctionOrBidWindow> {
+ private static class AuctionOrBidWindowCoder extends CustomCoder<AuctionOrBidWindow> {
private static final AuctionOrBidWindowCoder INSTANCE = new AuctionOrBidWindowCoder();
private static final Coder<IntervalWindow> SUPER_CODER = IntervalWindow.getCoder();
private static final Coder<Long> ID_CODER = VarLongCoder.of();
@@ -157,22 +157,25 @@ public class WinningBids extends PTransform<PCollection<Event>, PCollection<Auct
}
@Override
- public void encode(AuctionOrBidWindow window, OutputStream outStream, Context context)
+ public void encode(AuctionOrBidWindow window, OutputStream outStream, Coder.Context context)
throws IOException, CoderException {
- SUPER_CODER.encode(window, outStream, Context.NESTED);
- ID_CODER.encode(window.auction, outStream, Context.NESTED);
- INT_CODER.encode(window.isAuctionWindow ? 1 : 0, outStream, Context.NESTED);
+ SUPER_CODER.encode(window, outStream, Coder.Context.NESTED);
+ ID_CODER.encode(window.auction, outStream, Coder.Context.NESTED);
+ INT_CODER.encode(window.isAuctionWindow ? 1 : 0, outStream, Coder.Context.NESTED);
}
@Override
- public AuctionOrBidWindow decode(InputStream inStream, Context context)
+ public AuctionOrBidWindow decode(InputStream inStream, Coder.Context context)
throws IOException, CoderException {
- IntervalWindow superWindow = SUPER_CODER.decode(inStream, Context.NESTED);
- long auction = ID_CODER.decode(inStream, Context.NESTED);
- boolean isAuctionWindow = INT_CODER.decode(inStream, Context.NESTED) == 0 ? false : true;
+ IntervalWindow superWindow = SUPER_CODER.decode(inStream, Coder.Context.NESTED);
+ long auction = ID_CODER.decode(inStream, Coder.Context.NESTED);
+ boolean isAuctionWindow =
+ INT_CODER.decode(inStream, Coder.Context.NESTED) == 0 ? false : true;
return new AuctionOrBidWindow(
superWindow.start(), superWindow.end(), auction, isAuctionWindow);
}
+
+ @Override public void verifyDeterministic() throws NonDeterministicException {}
}
/** Assign events to auction windows and merges them intelligently. */
http://git-wip-us.apache.org/repos/asf/beam/blob/8b96949b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/io/PubsubClient.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/io/PubsubClient.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/io/PubsubClient.java
index 687aa35..931fe6e 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/io/PubsubClient.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/io/PubsubClient.java
@@ -32,7 +32,7 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;
import javax.annotation.Nullable;
-import org.apache.beam.sdk.options.PubsubOptions;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubOptions;
/**
* An (abstract) helper class for talking to Pubsub via an underlying transport.
http://git-wip-us.apache.org/repos/asf/beam/blob/8b96949b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/io/PubsubHelper.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/io/PubsubHelper.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/io/PubsubHelper.java
index 15401b7..bcc5b1c 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/io/PubsubHelper.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/io/PubsubHelper.java
@@ -23,7 +23,7 @@ import java.util.Collection;
import java.util.List;
import org.apache.beam.integration.nexmark.NexmarkUtils;
-import org.apache.beam.sdk.options.PubsubOptions;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubOptions;
/**
* Helper for working with pubsub.
http://git-wip-us.apache.org/repos/asf/beam/blob/8b96949b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/io/PubsubJsonClient.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/io/PubsubJsonClient.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/io/PubsubJsonClient.java
index b778a09..afddbd8 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/io/PubsubJsonClient.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/io/PubsubJsonClient.java
@@ -47,7 +47,7 @@ import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import javax.annotation.Nullable;
-import org.apache.beam.sdk.options.PubsubOptions;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubOptions;
import org.apache.beam.sdk.util.RetryHttpRequestInitializer;
import org.apache.beam.sdk.util.Transport;
http://git-wip-us.apache.org/repos/asf/beam/blob/8b96949b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/io/PubsubTestClient.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/io/PubsubTestClient.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/io/PubsubTestClient.java
index 125a8d6..69ba2b0 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/io/PubsubTestClient.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/io/PubsubTestClient.java
@@ -33,7 +33,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import javax.annotation.Nullable;
-import org.apache.beam.sdk.options.PubsubOptions;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubOptions;
/**
* A (partial) implementation of {@link PubsubClient} for use by unit tests. Only suitable for
http://git-wip-us.apache.org/repos/asf/beam/blob/8b96949b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Auction.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Auction.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Auction.java
index ac30568..4b1a848 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Auction.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Auction.java
@@ -19,16 +19,14 @@ package org.apache.beam.integration.nexmark.model;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.JsonProcessingException;
-
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.Serializable;
-
import org.apache.beam.integration.nexmark.NexmarkUtils;
-import org.apache.beam.sdk.coders.AtomicCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.CustomCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarLongCoder;
@@ -39,7 +37,7 @@ public class Auction implements KnownSize, Serializable {
private static final Coder<Long> LONG_CODER = VarLongCoder.of();
private static final Coder<String> STRING_CODER = StringUtf8Coder.of();
- public static final Coder<Auction> CODER = new AtomicCoder<Auction>() {
+ public static final Coder<Auction> CODER = new CustomCoder<Auction>() {
@Override
public void encode(Auction value, OutputStream outStream,
Coder.Context context)
http://git-wip-us.apache.org/repos/asf/beam/blob/8b96949b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/AuctionBid.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/AuctionBid.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/AuctionBid.java
index c014257..7f6b7c9 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/AuctionBid.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/AuctionBid.java
@@ -19,23 +19,21 @@ package org.apache.beam.integration.nexmark.model;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.JsonProcessingException;
-
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.Serializable;
-
import org.apache.beam.integration.nexmark.NexmarkUtils;
import org.apache.beam.integration.nexmark.WinningBids;
-import org.apache.beam.sdk.coders.AtomicCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.CustomCoder;
/**
* Result of {@link WinningBids} transform.
*/
public class AuctionBid implements KnownSize, Serializable {
- public static final Coder<AuctionBid> CODER = new AtomicCoder<AuctionBid>() {
+ public static final Coder<AuctionBid> CODER = new CustomCoder<AuctionBid>() {
@Override
public void encode(AuctionBid value, OutputStream outStream,
Coder.Context context)
http://git-wip-us.apache.org/repos/asf/beam/blob/8b96949b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/AuctionCount.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/AuctionCount.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/AuctionCount.java
index aa16629..e6d3450 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/AuctionCount.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/AuctionCount.java
@@ -19,16 +19,14 @@ package org.apache.beam.integration.nexmark.model;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.JsonProcessingException;
-
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.Serializable;
-
import org.apache.beam.integration.nexmark.NexmarkUtils;
-import org.apache.beam.sdk.coders.AtomicCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.CustomCoder;
import org.apache.beam.sdk.coders.VarLongCoder;
/**
@@ -37,7 +35,7 @@ import org.apache.beam.sdk.coders.VarLongCoder;
public class AuctionCount implements KnownSize, Serializable {
private static final Coder<Long> LONG_CODER = VarLongCoder.of();
- public static final Coder<AuctionCount> CODER = new AtomicCoder<AuctionCount>() {
+ public static final Coder<AuctionCount> CODER = new CustomCoder<AuctionCount>() {
@Override
public void encode(AuctionCount value, OutputStream outStream,
Coder.Context context)
http://git-wip-us.apache.org/repos/asf/beam/blob/8b96949b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/AuctionPrice.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/AuctionPrice.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/AuctionPrice.java
index f365cc8..cb971e2 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/AuctionPrice.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/AuctionPrice.java
@@ -19,16 +19,14 @@ package org.apache.beam.integration.nexmark.model;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.JsonProcessingException;
-
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.Serializable;
-
import org.apache.beam.integration.nexmark.NexmarkUtils;
-import org.apache.beam.sdk.coders.AtomicCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.CustomCoder;
import org.apache.beam.sdk.coders.VarLongCoder;
/**
@@ -37,7 +35,7 @@ import org.apache.beam.sdk.coders.VarLongCoder;
public class AuctionPrice implements KnownSize, Serializable {
private static final Coder<Long> LONG_CODER = VarLongCoder.of();
- public static final Coder<AuctionPrice> CODER = new AtomicCoder<AuctionPrice>() {
+ public static final Coder<AuctionPrice> CODER = new CustomCoder<AuctionPrice>() {
@Override
public void encode(AuctionPrice value, OutputStream outStream,
Coder.Context context)
http://git-wip-us.apache.org/repos/asf/beam/blob/8b96949b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Bid.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Bid.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Bid.java
index 59a33c1..faeb928 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Bid.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Bid.java
@@ -19,17 +19,15 @@ package org.apache.beam.integration.nexmark.model;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.JsonProcessingException;
-
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.Serializable;
import java.util.Comparator;
-
import org.apache.beam.integration.nexmark.NexmarkUtils;
-import org.apache.beam.sdk.coders.AtomicCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.CustomCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarLongCoder;
@@ -40,7 +38,7 @@ public class Bid implements KnownSize, Serializable {
private static final Coder<Long> LONG_CODER = VarLongCoder.of();
private static final Coder<String> STRING_CODER = StringUtf8Coder.of();
- public static final Coder<Bid> CODER = new AtomicCoder<Bid>() {
+ public static final Coder<Bid> CODER = new CustomCoder<Bid>() {
@Override
public void encode(Bid value, OutputStream outStream,
Coder.Context context)
@@ -63,6 +61,8 @@ public class Bid implements KnownSize, Serializable {
String extra = STRING_CODER.decode(inStream, Context.NESTED);
return new Bid(auction, bidder, price, dateTime, extra);
}
+
+ @Override public void verifyDeterministic() throws NonDeterministicException {}
};
/**
http://git-wip-us.apache.org/repos/asf/beam/blob/8b96949b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/BidsPerSession.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/BidsPerSession.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/BidsPerSession.java
index 7c4dfae..26b6a41 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/BidsPerSession.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/BidsPerSession.java
@@ -19,16 +19,14 @@ package org.apache.beam.integration.nexmark.model;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.JsonProcessingException;
-
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.Serializable;
-
import org.apache.beam.integration.nexmark.NexmarkUtils;
-import org.apache.beam.sdk.coders.AtomicCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.CustomCoder;
import org.apache.beam.sdk.coders.VarLongCoder;
/**
@@ -37,7 +35,7 @@ import org.apache.beam.sdk.coders.VarLongCoder;
public class BidsPerSession implements KnownSize, Serializable {
private static final Coder<Long> LONG_CODER = VarLongCoder.of();
- public static final Coder<BidsPerSession> CODER = new AtomicCoder<BidsPerSession>() {
+ public static final Coder<BidsPerSession> CODER = new CustomCoder<BidsPerSession>() {
@Override
public void encode(BidsPerSession value, OutputStream outStream,
Coder.Context context)
@@ -54,6 +52,7 @@ public class BidsPerSession implements KnownSize, Serializable {
long bidsPerSession = LONG_CODER.decode(inStream, Context.NESTED);
return new BidsPerSession(personId, bidsPerSession);
}
+ @Override public void verifyDeterministic() throws NonDeterministicException {}
};
@JsonProperty
http://git-wip-us.apache.org/repos/asf/beam/blob/8b96949b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/CategoryPrice.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/CategoryPrice.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/CategoryPrice.java
index 6512cc1..ccb2bc7 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/CategoryPrice.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/CategoryPrice.java
@@ -19,16 +19,14 @@ package org.apache.beam.integration.nexmark.model;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.JsonProcessingException;
-
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.Serializable;
-
import org.apache.beam.integration.nexmark.NexmarkUtils;
-import org.apache.beam.sdk.coders.AtomicCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.CustomCoder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.coders.VarLongCoder;
@@ -39,7 +37,7 @@ public class CategoryPrice implements KnownSize, Serializable {
private static final Coder<Long> LONG_CODER = VarLongCoder.of();
private static final Coder<Integer> INT_CODER = VarIntCoder.of();
- public static final Coder<CategoryPrice> CODER = new AtomicCoder<CategoryPrice>() {
+ public static final Coder<CategoryPrice> CODER = new CustomCoder<CategoryPrice>() {
@Override
public void encode(CategoryPrice value, OutputStream outStream,
Coder.Context context)
@@ -58,6 +56,7 @@ public class CategoryPrice implements KnownSize, Serializable {
boolean isLast = INT_CODER.decode(inStream, context) != 0;
return new CategoryPrice(category, price, isLast);
}
+ @Override public void verifyDeterministic() throws NonDeterministicException {}
};
@JsonProperty
http://git-wip-us.apache.org/repos/asf/beam/blob/8b96949b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Done.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Done.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Done.java
index 6009463..42999cd 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Done.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Done.java
@@ -19,16 +19,14 @@ package org.apache.beam.integration.nexmark.model;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.JsonProcessingException;
-
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.Serializable;
-
import org.apache.beam.integration.nexmark.NexmarkUtils;
-import org.apache.beam.sdk.coders.AtomicCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.CustomCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
/**
@@ -37,7 +35,7 @@ import org.apache.beam.sdk.coders.StringUtf8Coder;
public class Done implements KnownSize, Serializable {
private static final Coder<String> STRING_CODER = StringUtf8Coder.of();
- public static final Coder<Done> CODER = new AtomicCoder<Done>() {
+ public static final Coder<Done> CODER = new CustomCoder<Done>() {
@Override
public void encode(Done value, OutputStream outStream,
Coder.Context context)
@@ -52,6 +50,7 @@ public class Done implements KnownSize, Serializable {
String message = STRING_CODER.decode(inStream, Context.NESTED);
return new Done(message);
}
+ @Override public void verifyDeterministic() throws NonDeterministicException {}
};
@JsonProperty
http://git-wip-us.apache.org/repos/asf/beam/blob/8b96949b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Event.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Event.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Event.java
index 8a278bf..e2130c9 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Event.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Event.java
@@ -22,10 +22,9 @@ import java.io.InputStream;
import java.io.OutputStream;
import java.io.Serializable;
import javax.annotation.Nullable;
-
-import org.apache.beam.sdk.coders.AtomicCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.CustomCoder;
import org.apache.beam.sdk.coders.VarIntCoder;
/**
@@ -35,7 +34,7 @@ import org.apache.beam.sdk.coders.VarIntCoder;
public class Event implements KnownSize, Serializable {
private static final Coder<Integer> INT_CODER = VarIntCoder.of();
- public static final Coder<Event> CODER = new AtomicCoder<Event>() {
+ public static final Coder<Event> CODER = new CustomCoder<Event>() {
@Override
public void encode(Event value, OutputStream outStream, Coder.Context context)
throws CoderException, IOException {
@@ -71,6 +70,7 @@ public class Event implements KnownSize, Serializable {
throw new RuntimeException("invalid event encoding");
}
}
+ @Override public void verifyDeterministic() throws NonDeterministicException {}
};
@Nullable
http://git-wip-us.apache.org/repos/asf/beam/blob/8b96949b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/IdNameReserve.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/IdNameReserve.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/IdNameReserve.java
index 5d22651..cf1e571 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/IdNameReserve.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/IdNameReserve.java
@@ -19,16 +19,14 @@ package org.apache.beam.integration.nexmark.model;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.JsonProcessingException;
-
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.Serializable;
-
import org.apache.beam.integration.nexmark.NexmarkUtils;
-import org.apache.beam.sdk.coders.AtomicCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.CustomCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarLongCoder;
@@ -39,7 +37,7 @@ public class IdNameReserve implements KnownSize, Serializable {
private static final Coder<Long> LONG_CODER = VarLongCoder.of();
private static final Coder<String> STRING_CODER = StringUtf8Coder.of();
- public static final Coder<IdNameReserve> CODER = new AtomicCoder<IdNameReserve>() {
+ public static final Coder<IdNameReserve> CODER = new CustomCoder<IdNameReserve>() {
@Override
public void encode(IdNameReserve value, OutputStream outStream,
Coder.Context context)
@@ -58,6 +56,7 @@ public class IdNameReserve implements KnownSize, Serializable {
long reserve = LONG_CODER.decode(inStream, Context.NESTED);
return new IdNameReserve(id, name, reserve);
}
+ @Override public void verifyDeterministic() throws NonDeterministicException {}
};
@JsonProperty
http://git-wip-us.apache.org/repos/asf/beam/blob/8b96949b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/NameCityStateId.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/NameCityStateId.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/NameCityStateId.java
index ac22879..86d1738 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/NameCityStateId.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/NameCityStateId.java
@@ -19,16 +19,14 @@ package org.apache.beam.integration.nexmark.model;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.JsonProcessingException;
-
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.Serializable;
-
import org.apache.beam.integration.nexmark.NexmarkUtils;
-import org.apache.beam.sdk.coders.AtomicCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.CustomCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarLongCoder;
@@ -39,7 +37,7 @@ public class NameCityStateId implements KnownSize, Serializable {
private static final Coder<Long> LONG_CODER = VarLongCoder.of();
private static final Coder<String> STRING_CODER = StringUtf8Coder.of();
- public static final Coder<NameCityStateId> CODER = new AtomicCoder<NameCityStateId>() {
+ public static final Coder<NameCityStateId> CODER = new CustomCoder<NameCityStateId>() {
@Override
public void encode(NameCityStateId value, OutputStream outStream,
Coder.Context context)
@@ -60,6 +58,7 @@ public class NameCityStateId implements KnownSize, Serializable {
long id = LONG_CODER.decode(inStream, Context.NESTED);
return new NameCityStateId(name, city, state, id);
}
+ @Override public void verifyDeterministic() throws NonDeterministicException {}
};
@JsonProperty
http://git-wip-us.apache.org/repos/asf/beam/blob/8b96949b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Person.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Person.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Person.java
index 85c7183..906df94 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Person.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Person.java
@@ -19,16 +19,14 @@ package org.apache.beam.integration.nexmark.model;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.JsonProcessingException;
-
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.Serializable;
-
import org.apache.beam.integration.nexmark.NexmarkUtils;
-import org.apache.beam.sdk.coders.AtomicCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.CustomCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarLongCoder;
@@ -38,7 +36,7 @@ import org.apache.beam.sdk.coders.VarLongCoder;
public class Person implements KnownSize, Serializable {
private static final Coder<Long> LONG_CODER = VarLongCoder.of();
private static final Coder<String> STRING_CODER = StringUtf8Coder.of();
- public static final Coder<Person> CODER = new AtomicCoder<Person>() {
+ public static final Coder<Person> CODER = new CustomCoder<Person>() {
@Override
public void encode(Person value, OutputStream outStream,
Coder.Context context)
@@ -67,6 +65,7 @@ public class Person implements KnownSize, Serializable {
String extra = STRING_CODER.decode(inStream, Context.NESTED);
return new Person(id, name, emailAddress, creditCard, city, state, dateTime, extra);
}
+ @Override public void verifyDeterministic() throws NonDeterministicException {}
};
/** Id of person. */
http://git-wip-us.apache.org/repos/asf/beam/blob/8b96949b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/SellerPrice.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/SellerPrice.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/SellerPrice.java
index b7c2b14..68f2697 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/SellerPrice.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/SellerPrice.java
@@ -19,16 +19,14 @@ package org.apache.beam.integration.nexmark.model;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.JsonProcessingException;
-
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.Serializable;
-
import org.apache.beam.integration.nexmark.NexmarkUtils;
-import org.apache.beam.sdk.coders.AtomicCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.CustomCoder;
import org.apache.beam.sdk.coders.VarLongCoder;
/**
@@ -37,7 +35,7 @@ import org.apache.beam.sdk.coders.VarLongCoder;
public class SellerPrice implements KnownSize, Serializable {
private static final Coder<Long> LONG_CODER = VarLongCoder.of();
- public static final Coder<SellerPrice> CODER = new AtomicCoder<SellerPrice>() {
+ public static final Coder<SellerPrice> CODER = new CustomCoder<SellerPrice>() {
@Override
public void encode(SellerPrice value, OutputStream outStream,
Coder.Context context)
@@ -54,6 +52,7 @@ public class SellerPrice implements KnownSize, Serializable {
long price = LONG_CODER.decode(inStream, Context.NESTED);
return new SellerPrice(seller, price);
}
+ @Override public void verifyDeterministic() throws NonDeterministicException {}
};
@JsonProperty
http://git-wip-us.apache.org/repos/asf/beam/blob/8b96949b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/sources/Generator.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/sources/Generator.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/sources/Generator.java
index cffc7a5..012d4e6 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/sources/Generator.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/sources/Generator.java
@@ -27,14 +27,13 @@ import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Random;
-
import org.apache.beam.integration.nexmark.model.Auction;
import org.apache.beam.integration.nexmark.model.Bid;
import org.apache.beam.integration.nexmark.model.Event;
import org.apache.beam.integration.nexmark.model.Person;
-import org.apache.beam.sdk.coders.AtomicCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.CustomCoder;
import org.apache.beam.sdk.coders.VarLongCoder;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.values.TimestampedValue;
@@ -102,7 +101,7 @@ public class Generator implements Iterator<TimestampedValue<Event>>, Serializabl
/** Coder for this class. */
public static final Coder<Checkpoint> CODER_INSTANCE =
- new AtomicCoder<Checkpoint>() {
+ new CustomCoder<Checkpoint>() {
@Override
public void encode(
Checkpoint value,
@@ -121,6 +120,7 @@ public class Generator implements Iterator<TimestampedValue<Event>>, Serializabl
long wallclockBaseTime = LONG_CODER.decode(inStream, Context.NESTED);
return new Checkpoint(numEvents, wallclockBaseTime);
}
+ @Override public void verifyDeterministic() throws NonDeterministicException {}
};
private long numEvents;