You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ie...@apache.org on 2017/08/23 17:09:41 UTC
[33/55] [abbrv] beam git commit: Migrate to Beam 2.1.0-SNAPSHOT
Migrate to Beam 2.1.0-SNAPSHOT
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/3d5c3d00
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/3d5c3d00
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/3d5c3d00
Branch: refs/heads/master
Commit: 3d5c3d009b441a8085189f9d4ed1926a4042f816
Parents: 69953a0
Author: Etienne Chauchot <ec...@gmail.com>
Authored: Tue May 9 15:25:54 2017 +0200
Committer: Ismaël Mejía <ie...@gmail.com>
Committed: Wed Aug 23 19:07:28 2017 +0200
----------------------------------------------------------------------
integration/java/nexmark/pom.xml | 2 +-
.../integration/nexmark/NexmarkLauncher.java | 4 +-
.../beam/integration/nexmark/NexmarkUtils.java | 43 +++++++++----------
.../beam/integration/nexmark/model/Auction.java | 45 ++++++++++----------
.../integration/nexmark/model/AuctionBid.java | 13 +++---
.../integration/nexmark/model/AuctionCount.java | 14 +++---
.../integration/nexmark/model/AuctionPrice.java | 13 +++---
.../beam/integration/nexmark/model/Bid.java | 25 ++++++-----
.../nexmark/model/BidsPerSession.java | 13 +++---
.../nexmark/model/CategoryPrice.java | 18 ++++----
.../beam/integration/nexmark/model/Done.java | 10 ++---
.../beam/integration/nexmark/model/Event.java | 24 +++++------
.../nexmark/model/IdNameReserve.java | 17 ++++----
.../nexmark/model/NameCityStateId.java | 22 +++++-----
.../beam/integration/nexmark/model/Person.java | 38 ++++++++---------
.../integration/nexmark/model/SellerPrice.java | 13 +++---
.../nexmark/queries/WinningBids.java | 16 +++----
.../integration/nexmark/sources/Generator.java | 19 +++------
integration/java/pom.xml | 2 +-
integration/pom.xml | 2 +-
20 files changed, 163 insertions(+), 190 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/3d5c3d00/integration/java/nexmark/pom.xml
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/pom.xml b/integration/java/nexmark/pom.xml
index 35fe0f3..86b88bd 100644
--- a/integration/java/nexmark/pom.xml
+++ b/integration/java/nexmark/pom.xml
@@ -23,7 +23,7 @@
<parent>
<groupId>org.apache.beam</groupId>
<artifactId>beam-integration-java-parent</artifactId>
- <version>0.7.0-SNAPSHOT</version>
+ <version>2.1.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
http://git-wip-us.apache.org/repos/asf/beam/blob/3d5c3d00/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkLauncher.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkLauncher.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkLauncher.java
index ea4ff58..db53191 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkLauncher.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkLauncher.java
@@ -739,7 +739,7 @@ public class NexmarkLauncher<OptionT extends NexmarkOptions> {
NexmarkUtils.console("Reading events from Pubsub %s", shortSubscription);
PubsubIO.Read<PubsubMessage> io =
- PubsubIO.readPubsubMessagesWithAttributes().fromSubscription(shortSubscription)
+ PubsubIO.readMessagesWithAttributes().fromSubscription(shortSubscription)
.withIdAttribute(NexmarkUtils.PUBSUB_ID);
if (!configuration.usePubsubPublishTime) {
io = io.withTimestampAttribute(NexmarkUtils.PUBSUB_TIMESTAMP);
@@ -784,7 +784,7 @@ public class NexmarkLauncher<OptionT extends NexmarkOptions> {
NexmarkUtils.console("Writing events to Pubsub %s", shortTopic);
PubsubIO.Write<PubsubMessage> io =
- PubsubIO.writePubsubMessages().to(shortTopic)
+ PubsubIO.writeMessages().to(shortTopic)
.withIdAttribute(NexmarkUtils.PUBSUB_ID);
if (!configuration.usePubsubPublishTime) {
io = io.withTimestampAttribute(NexmarkUtils.PUBSUB_TIMESTAMP);
http://git-wip-us.apache.org/repos/asf/beam/blob/3d5c3d00/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkUtils.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkUtils.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkUtils.java
index ea851af..7707429 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkUtils.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkUtils.java
@@ -351,25 +351,25 @@ public class NexmarkUtils {
CoderRegistry registry = p.getCoderRegistry();
switch (coderStrategy) {
case HAND:
- registry.registerCoder(Auction.class, Auction.CODER);
- registry.registerCoder(AuctionBid.class, AuctionBid.CODER);
- registry.registerCoder(AuctionCount.class, AuctionCount.CODER);
- registry.registerCoder(AuctionPrice.class, AuctionPrice.CODER);
- registry.registerCoder(Bid.class, Bid.CODER);
- registry.registerCoder(CategoryPrice.class, CategoryPrice.CODER);
- registry.registerCoder(Event.class, Event.CODER);
- registry.registerCoder(IdNameReserve.class, IdNameReserve.CODER);
- registry.registerCoder(NameCityStateId.class, NameCityStateId.CODER);
- registry.registerCoder(Person.class, Person.CODER);
- registry.registerCoder(SellerPrice.class, SellerPrice.CODER);
- registry.registerCoder(Done.class, Done.CODER);
- registry.registerCoder(BidsPerSession.class, BidsPerSession.CODER);
+ registry.registerCoderForClass(Auction.class, Auction.CODER);
+ registry.registerCoderForClass(AuctionBid.class, AuctionBid.CODER);
+ registry.registerCoderForClass(AuctionCount.class, AuctionCount.CODER);
+ registry.registerCoderForClass(AuctionPrice.class, AuctionPrice.CODER);
+ registry.registerCoderForClass(Bid.class, Bid.CODER);
+ registry.registerCoderForClass(CategoryPrice.class, CategoryPrice.CODER);
+ registry.registerCoderForClass(Event.class, Event.CODER);
+ registry.registerCoderForClass(IdNameReserve.class, IdNameReserve.CODER);
+ registry.registerCoderForClass(NameCityStateId.class, NameCityStateId.CODER);
+ registry.registerCoderForClass(Person.class, Person.CODER);
+ registry.registerCoderForClass(SellerPrice.class, SellerPrice.CODER);
+ registry.registerCoderForClass(Done.class, Done.CODER);
+ registry.registerCoderForClass(BidsPerSession.class, BidsPerSession.CODER);
break;
case AVRO:
- registry.setFallbackCoderProvider(AvroCoder.PROVIDER);
+ registry.registerCoderProvider(AvroCoder.getCoderProvider());
break;
case JAVA:
- registry.setFallbackCoderProvider(SerializableCoder.PROVIDER);
+ registry.registerCoderProvider(SerializableCoder.getCoderProvider());
break;
}
}
@@ -621,22 +621,17 @@ public class NexmarkUtils {
}
@Override
- public void encode(KnownSize value, OutputStream outStream, Context context)
+ public void encode(KnownSize value, OutputStream outStream)
throws CoderException, IOException {
@SuppressWarnings("unchecked")
T typedValue = (T) value;
- trueCoder.encode(typedValue, outStream, context);
+ trueCoder.encode(typedValue, outStream);
}
@Override
- public KnownSize decode(InputStream inStream, Context context)
+ public KnownSize decode(InputStream inStream)
throws CoderException, IOException {
- return trueCoder.decode(inStream, context);
- }
-
- @Override
- public List<? extends Coder<?>> getComponents() {
- return ImmutableList.of(trueCoder);
+ return trueCoder.decode(inStream);
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/3d5c3d00/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Auction.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Auction.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Auction.java
index 5c018dc..9f5d7c0 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Auction.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Auction.java
@@ -39,35 +39,34 @@ public class Auction implements KnownSize, Serializable {
public static final Coder<Auction> CODER = new CustomCoder<Auction>() {
@Override
- public void encode(Auction value, OutputStream outStream,
- Coder.Context context)
+ public void encode(Auction value, OutputStream outStream)
throws CoderException, IOException {
- LONG_CODER.encode(value.id, outStream, Context.NESTED);
- STRING_CODER.encode(value.itemName, outStream, Context.NESTED);
- STRING_CODER.encode(value.description, outStream, Context.NESTED);
- LONG_CODER.encode(value.initialBid, outStream, Context.NESTED);
- LONG_CODER.encode(value.reserve, outStream, Context.NESTED);
- LONG_CODER.encode(value.dateTime, outStream, Context.NESTED);
- LONG_CODER.encode(value.expires, outStream, Context.NESTED);
- LONG_CODER.encode(value.seller, outStream, Context.NESTED);
- LONG_CODER.encode(value.category, outStream, Context.NESTED);
- STRING_CODER.encode(value.extra, outStream, Context.NESTED);
+ LONG_CODER.encode(value.id, outStream);
+ STRING_CODER.encode(value.itemName, outStream);
+ STRING_CODER.encode(value.description, outStream);
+ LONG_CODER.encode(value.initialBid, outStream);
+ LONG_CODER.encode(value.reserve, outStream);
+ LONG_CODER.encode(value.dateTime, outStream);
+ LONG_CODER.encode(value.expires, outStream);
+ LONG_CODER.encode(value.seller, outStream);
+ LONG_CODER.encode(value.category, outStream);
+ STRING_CODER.encode(value.extra, outStream);
}
@Override
public Auction decode(
- InputStream inStream, Coder.Context context)
+ InputStream inStream)
throws CoderException, IOException {
- long id = LONG_CODER.decode(inStream, Context.NESTED);
- String itemName = STRING_CODER.decode(inStream, Context.NESTED);
- String description = STRING_CODER.decode(inStream, Context.NESTED);
- long initialBid = LONG_CODER.decode(inStream, Context.NESTED);
- long reserve = LONG_CODER.decode(inStream, Context.NESTED);
- long dateTime = LONG_CODER.decode(inStream, Context.NESTED);
- long expires = LONG_CODER.decode(inStream, Context.NESTED);
- long seller = LONG_CODER.decode(inStream, Context.NESTED);
- long category = LONG_CODER.decode(inStream, Context.NESTED);
- String extra = STRING_CODER.decode(inStream, Context.NESTED);
+ long id = LONG_CODER.decode(inStream);
+ String itemName = STRING_CODER.decode(inStream);
+ String description = STRING_CODER.decode(inStream);
+ long initialBid = LONG_CODER.decode(inStream);
+ long reserve = LONG_CODER.decode(inStream);
+ long dateTime = LONG_CODER.decode(inStream);
+ long expires = LONG_CODER.decode(inStream);
+ long seller = LONG_CODER.decode(inStream);
+ long category = LONG_CODER.decode(inStream);
+ String extra = STRING_CODER.decode(inStream);
return new Auction(
id, itemName, description, initialBid, reserve, dateTime, expires, seller, category,
extra);
http://git-wip-us.apache.org/repos/asf/beam/blob/3d5c3d00/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/AuctionBid.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/AuctionBid.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/AuctionBid.java
index b1d9ec2..b9d79db 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/AuctionBid.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/AuctionBid.java
@@ -34,19 +34,18 @@ import org.apache.beam.sdk.coders.CustomCoder;
public class AuctionBid implements KnownSize, Serializable {
public static final Coder<AuctionBid> CODER = new CustomCoder<AuctionBid>() {
@Override
- public void encode(AuctionBid value, OutputStream outStream,
- Coder.Context context)
+ public void encode(AuctionBid value, OutputStream outStream)
throws CoderException, IOException {
- Auction.CODER.encode(value.auction, outStream, Context.NESTED);
- Bid.CODER.encode(value.bid, outStream, Context.NESTED);
+ Auction.CODER.encode(value.auction, outStream);
+ Bid.CODER.encode(value.bid, outStream);
}
@Override
public AuctionBid decode(
- InputStream inStream, Coder.Context context)
+ InputStream inStream)
throws CoderException, IOException {
- Auction auction = Auction.CODER.decode(inStream, Context.NESTED);
- Bid bid = Bid.CODER.decode(inStream, Context.NESTED);
+ Auction auction = Auction.CODER.decode(inStream);
+ Bid bid = Bid.CODER.decode(inStream);
return new AuctionBid(auction, bid);
}
};
http://git-wip-us.apache.org/repos/asf/beam/blob/3d5c3d00/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/AuctionCount.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/AuctionCount.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/AuctionCount.java
index c83a455..0e643ff 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/AuctionCount.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/AuctionCount.java
@@ -37,19 +37,17 @@ public class AuctionCount implements KnownSize, Serializable {
public static final Coder<AuctionCount> CODER = new CustomCoder<AuctionCount>() {
@Override
- public void encode(AuctionCount value, OutputStream outStream,
- Coder.Context context)
+ public void encode(AuctionCount value, OutputStream outStream)
throws CoderException, IOException {
- LONG_CODER.encode(value.auction, outStream, Context.NESTED);
- LONG_CODER.encode(value.count, outStream, Context.NESTED);
+ LONG_CODER.encode(value.auction, outStream);
+ LONG_CODER.encode(value.count, outStream);
}
@Override
- public AuctionCount decode(
- InputStream inStream, Coder.Context context)
+ public AuctionCount decode(InputStream inStream)
throws CoderException, IOException {
- long auction = LONG_CODER.decode(inStream, Context.NESTED);
- long count = LONG_CODER.decode(inStream, Context.NESTED);
+ long auction = LONG_CODER.decode(inStream);
+ long count = LONG_CODER.decode(inStream);
return new AuctionCount(auction, count);
}
};
http://git-wip-us.apache.org/repos/asf/beam/blob/3d5c3d00/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/AuctionPrice.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/AuctionPrice.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/AuctionPrice.java
index 43d0b27..7d51a21 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/AuctionPrice.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/AuctionPrice.java
@@ -37,19 +37,18 @@ public class AuctionPrice implements KnownSize, Serializable {
public static final Coder<AuctionPrice> CODER = new CustomCoder<AuctionPrice>() {
@Override
- public void encode(AuctionPrice value, OutputStream outStream,
- Coder.Context context)
+ public void encode(AuctionPrice value, OutputStream outStream)
throws CoderException, IOException {
- LONG_CODER.encode(value.auction, outStream, Context.NESTED);
- LONG_CODER.encode(value.price, outStream, Context.NESTED);
+ LONG_CODER.encode(value.auction, outStream);
+ LONG_CODER.encode(value.price, outStream);
}
@Override
public AuctionPrice decode(
- InputStream inStream, Coder.Context context)
+ InputStream inStream)
throws CoderException, IOException {
- long auction = LONG_CODER.decode(inStream, Context.NESTED);
- long price = LONG_CODER.decode(inStream, Context.NESTED);
+ long auction = LONG_CODER.decode(inStream);
+ long price = LONG_CODER.decode(inStream);
return new AuctionPrice(auction, price);
}
};
http://git-wip-us.apache.org/repos/asf/beam/blob/3d5c3d00/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Bid.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Bid.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Bid.java
index faeb928..4fa9ea0 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Bid.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Bid.java
@@ -40,25 +40,24 @@ public class Bid implements KnownSize, Serializable {
public static final Coder<Bid> CODER = new CustomCoder<Bid>() {
@Override
- public void encode(Bid value, OutputStream outStream,
- Coder.Context context)
+ public void encode(Bid value, OutputStream outStream)
throws CoderException, IOException {
- LONG_CODER.encode(value.auction, outStream, Context.NESTED);
- LONG_CODER.encode(value.bidder, outStream, Context.NESTED);
- LONG_CODER.encode(value.price, outStream, Context.NESTED);
- LONG_CODER.encode(value.dateTime, outStream, Context.NESTED);
- STRING_CODER.encode(value.extra, outStream, Context.NESTED);
+ LONG_CODER.encode(value.auction, outStream);
+ LONG_CODER.encode(value.bidder, outStream);
+ LONG_CODER.encode(value.price, outStream);
+ LONG_CODER.encode(value.dateTime, outStream);
+ STRING_CODER.encode(value.extra, outStream);
}
@Override
public Bid decode(
- InputStream inStream, Coder.Context context)
+ InputStream inStream)
throws CoderException, IOException {
- long auction = LONG_CODER.decode(inStream, Context.NESTED);
- long bidder = LONG_CODER.decode(inStream, Context.NESTED);
- long price = LONG_CODER.decode(inStream, Context.NESTED);
- long dateTime = LONG_CODER.decode(inStream, Context.NESTED);
- String extra = STRING_CODER.decode(inStream, Context.NESTED);
+ long auction = LONG_CODER.decode(inStream);
+ long bidder = LONG_CODER.decode(inStream);
+ long price = LONG_CODER.decode(inStream);
+ long dateTime = LONG_CODER.decode(inStream);
+ String extra = STRING_CODER.decode(inStream);
return new Bid(auction, bidder, price, dateTime, extra);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/3d5c3d00/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/BidsPerSession.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/BidsPerSession.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/BidsPerSession.java
index 6dddf34..3211456 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/BidsPerSession.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/BidsPerSession.java
@@ -37,19 +37,18 @@ public class BidsPerSession implements KnownSize, Serializable {
public static final Coder<BidsPerSession> CODER = new CustomCoder<BidsPerSession>() {
@Override
- public void encode(BidsPerSession value, OutputStream outStream,
- Coder.Context context)
+ public void encode(BidsPerSession value, OutputStream outStream)
throws CoderException, IOException {
- LONG_CODER.encode(value.personId, outStream, Context.NESTED);
- LONG_CODER.encode(value.bidsPerSession, outStream, Context.NESTED);
+ LONG_CODER.encode(value.personId, outStream);
+ LONG_CODER.encode(value.bidsPerSession, outStream);
}
@Override
public BidsPerSession decode(
- InputStream inStream, Coder.Context context)
+ InputStream inStream)
throws CoderException, IOException {
- long personId = LONG_CODER.decode(inStream, Context.NESTED);
- long bidsPerSession = LONG_CODER.decode(inStream, Context.NESTED);
+ long personId = LONG_CODER.decode(inStream);
+ long bidsPerSession = LONG_CODER.decode(inStream);
return new BidsPerSession(personId, bidsPerSession);
}
@Override public void verifyDeterministic() throws NonDeterministicException {}
http://git-wip-us.apache.org/repos/asf/beam/blob/3d5c3d00/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/CategoryPrice.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/CategoryPrice.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/CategoryPrice.java
index ccb2bc7..2678198 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/CategoryPrice.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/CategoryPrice.java
@@ -39,21 +39,19 @@ public class CategoryPrice implements KnownSize, Serializable {
public static final Coder<CategoryPrice> CODER = new CustomCoder<CategoryPrice>() {
@Override
- public void encode(CategoryPrice value, OutputStream outStream,
- Coder.Context context)
+ public void encode(CategoryPrice value, OutputStream outStream)
throws CoderException, IOException {
- LONG_CODER.encode(value.category, outStream, Context.NESTED);
- LONG_CODER.encode(value.price, outStream, Context.NESTED);
- INT_CODER.encode(value.isLast ? 1 : 0, outStream, Context.NESTED);
+ LONG_CODER.encode(value.category, outStream);
+ LONG_CODER.encode(value.price, outStream);
+ INT_CODER.encode(value.isLast ? 1 : 0, outStream);
}
@Override
- public CategoryPrice decode(
- InputStream inStream, Coder.Context context)
+ public CategoryPrice decode(InputStream inStream)
throws CoderException, IOException {
- long category = LONG_CODER.decode(inStream, Context.NESTED);
- long price = LONG_CODER.decode(inStream, Context.NESTED);
- boolean isLast = INT_CODER.decode(inStream, context) != 0;
+ long category = LONG_CODER.decode(inStream);
+ long price = LONG_CODER.decode(inStream);
+ boolean isLast = INT_CODER.decode(inStream) != 0;
return new CategoryPrice(category, price, isLast);
}
@Override public void verifyDeterministic() throws NonDeterministicException {}
http://git-wip-us.apache.org/repos/asf/beam/blob/3d5c3d00/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Done.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Done.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Done.java
index 0c14e8f..b0a88d4 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Done.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Done.java
@@ -37,17 +37,15 @@ public class Done implements KnownSize, Serializable {
public static final Coder<Done> CODER = new CustomCoder<Done>() {
@Override
- public void encode(Done value, OutputStream outStream,
- Coder.Context context)
+ public void encode(Done value, OutputStream outStream)
throws CoderException, IOException {
- STRING_CODER.encode(value.message, outStream, Context.NESTED);
+ STRING_CODER.encode(value.message, outStream);
}
@Override
- public Done decode(
- InputStream inStream, Coder.Context context)
+ public Done decode(InputStream inStream)
throws CoderException, IOException {
- String message = STRING_CODER.decode(inStream, Context.NESTED);
+ String message = STRING_CODER.decode(inStream);
return new Done(message);
}
@Override public void verifyDeterministic() throws NonDeterministicException {}
http://git-wip-us.apache.org/repos/asf/beam/blob/3d5c3d00/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Event.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Event.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Event.java
index 1f1f096..d813833 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Event.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Event.java
@@ -36,17 +36,17 @@ public class Event implements KnownSize, Serializable {
public static final Coder<Event> CODER = new CustomCoder<Event>() {
@Override
- public void encode(Event value, OutputStream outStream, Coder.Context context)
+ public void encode(Event value, OutputStream outStream)
throws CoderException, IOException {
if (value.newPerson != null) {
- INT_CODER.encode(0, outStream, Context.NESTED);
- Person.CODER.encode(value.newPerson, outStream, Context.NESTED);
+ INT_CODER.encode(0, outStream);
+ Person.CODER.encode(value.newPerson, outStream);
} else if (value.newAuction != null) {
- INT_CODER.encode(1, outStream, Context.NESTED);
- Auction.CODER.encode(value.newAuction, outStream, Context.NESTED);
+ INT_CODER.encode(1, outStream);
+ Auction.CODER.encode(value.newAuction, outStream);
} else if (value.bid != null) {
- INT_CODER.encode(2, outStream, Context.NESTED);
- Bid.CODER.encode(value.bid, outStream, Context.NESTED);
+ INT_CODER.encode(2, outStream);
+ Bid.CODER.encode(value.bid, outStream);
} else {
throw new RuntimeException("invalid event");
}
@@ -54,17 +54,17 @@ public class Event implements KnownSize, Serializable {
@Override
public Event decode(
- InputStream inStream, Coder.Context context)
+ InputStream inStream)
throws CoderException, IOException {
- int tag = INT_CODER.decode(inStream, context);
+ int tag = INT_CODER.decode(inStream);
if (tag == 0) {
- Person person = Person.CODER.decode(inStream, Context.NESTED);
+ Person person = Person.CODER.decode(inStream);
return new Event(person);
} else if (tag == 1) {
- Auction auction = Auction.CODER.decode(inStream, Context.NESTED);
+ Auction auction = Auction.CODER.decode(inStream);
return new Event(auction);
} else if (tag == 2) {
- Bid bid = Bid.CODER.decode(inStream, Context.NESTED);
+ Bid bid = Bid.CODER.decode(inStream);
return new Event(bid);
} else {
throw new RuntimeException("invalid event encoding");
http://git-wip-us.apache.org/repos/asf/beam/blob/3d5c3d00/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/IdNameReserve.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/IdNameReserve.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/IdNameReserve.java
index 17b8c4a..8cade4e 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/IdNameReserve.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/IdNameReserve.java
@@ -39,21 +39,20 @@ public class IdNameReserve implements KnownSize, Serializable {
public static final Coder<IdNameReserve> CODER = new CustomCoder<IdNameReserve>() {
@Override
- public void encode(IdNameReserve value, OutputStream outStream,
- Coder.Context context)
+ public void encode(IdNameReserve value, OutputStream outStream)
throws CoderException, IOException {
- LONG_CODER.encode(value.id, outStream, Context.NESTED);
- STRING_CODER.encode(value.name, outStream, Context.NESTED);
- LONG_CODER.encode(value.reserve, outStream, Context.NESTED);
+ LONG_CODER.encode(value.id, outStream);
+ STRING_CODER.encode(value.name, outStream);
+ LONG_CODER.encode(value.reserve, outStream);
}
@Override
public IdNameReserve decode(
- InputStream inStream, Coder.Context context)
+ InputStream inStream)
throws CoderException, IOException {
- long id = LONG_CODER.decode(inStream, Context.NESTED);
- String name = STRING_CODER.decode(inStream, Context.NESTED);
- long reserve = LONG_CODER.decode(inStream, Context.NESTED);
+ long id = LONG_CODER.decode(inStream);
+ String name = STRING_CODER.decode(inStream);
+ long reserve = LONG_CODER.decode(inStream);
return new IdNameReserve(id, name, reserve);
}
@Override public void verifyDeterministic() throws NonDeterministicException {}
http://git-wip-us.apache.org/repos/asf/beam/blob/3d5c3d00/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/NameCityStateId.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/NameCityStateId.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/NameCityStateId.java
index 28f25cd..37bd3c6 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/NameCityStateId.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/NameCityStateId.java
@@ -39,23 +39,21 @@ public class NameCityStateId implements KnownSize, Serializable {
public static final Coder<NameCityStateId> CODER = new CustomCoder<NameCityStateId>() {
@Override
- public void encode(NameCityStateId value, OutputStream outStream,
- Coder.Context context)
+ public void encode(NameCityStateId value, OutputStream outStream)
throws CoderException, IOException {
- STRING_CODER.encode(value.name, outStream, Context.NESTED);
- STRING_CODER.encode(value.city, outStream, Context.NESTED);
- STRING_CODER.encode(value.state, outStream, Context.NESTED);
- LONG_CODER.encode(value.id, outStream, Context.NESTED);
+ STRING_CODER.encode(value.name, outStream);
+ STRING_CODER.encode(value.city, outStream);
+ STRING_CODER.encode(value.state, outStream);
+ LONG_CODER.encode(value.id, outStream);
}
@Override
- public NameCityStateId decode(
- InputStream inStream, Coder.Context context)
+ public NameCityStateId decode(InputStream inStream)
throws CoderException, IOException {
- String name = STRING_CODER.decode(inStream, Context.NESTED);
- String city = STRING_CODER.decode(inStream, Context.NESTED);
- String state = STRING_CODER.decode(inStream, Context.NESTED);
- long id = LONG_CODER.decode(inStream, Context.NESTED);
+ String name = STRING_CODER.decode(inStream);
+ String city = STRING_CODER.decode(inStream);
+ String state = STRING_CODER.decode(inStream);
+ long id = LONG_CODER.decode(inStream);
return new NameCityStateId(name, city, state, id);
}
@Override public void verifyDeterministic() throws NonDeterministicException {}
http://git-wip-us.apache.org/repos/asf/beam/blob/3d5c3d00/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Person.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Person.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Person.java
index c690fd4..bde587d 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Person.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Person.java
@@ -38,31 +38,29 @@ public class Person implements KnownSize, Serializable {
private static final Coder<String> STRING_CODER = StringUtf8Coder.of();
public static final Coder<Person> CODER = new CustomCoder<Person>() {
@Override
- public void encode(Person value, OutputStream outStream,
- Coder.Context context)
+ public void encode(Person value, OutputStream outStream)
throws CoderException, IOException {
- LONG_CODER.encode(value.id, outStream, Context.NESTED);
- STRING_CODER.encode(value.name, outStream, Context.NESTED);
- STRING_CODER.encode(value.emailAddress, outStream, Context.NESTED);
- STRING_CODER.encode(value.creditCard, outStream, Context.NESTED);
- STRING_CODER.encode(value.city, outStream, Context.NESTED);
- STRING_CODER.encode(value.state, outStream, Context.NESTED);
- LONG_CODER.encode(value.dateTime, outStream, Context.NESTED);
- STRING_CODER.encode(value.extra, outStream, Context.NESTED);
+ LONG_CODER.encode(value.id, outStream);
+ STRING_CODER.encode(value.name, outStream);
+ STRING_CODER.encode(value.emailAddress, outStream);
+ STRING_CODER.encode(value.creditCard, outStream);
+ STRING_CODER.encode(value.city, outStream);
+ STRING_CODER.encode(value.state, outStream);
+ LONG_CODER.encode(value.dateTime, outStream);
+ STRING_CODER.encode(value.extra, outStream);
}
@Override
- public Person decode(
- InputStream inStream, Coder.Context context)
+ public Person decode(InputStream inStream)
throws CoderException, IOException {
- long id = LONG_CODER.decode(inStream, Context.NESTED);
- String name = STRING_CODER.decode(inStream, Context.NESTED);
- String emailAddress = STRING_CODER.decode(inStream, Context.NESTED);
- String creditCard = STRING_CODER.decode(inStream, Context.NESTED);
- String city = STRING_CODER.decode(inStream, Context.NESTED);
- String state = STRING_CODER.decode(inStream, Context.NESTED);
- long dateTime = LONG_CODER.decode(inStream, Context.NESTED);
- String extra = STRING_CODER.decode(inStream, Context.NESTED);
+ long id = LONG_CODER.decode(inStream);
+ String name = STRING_CODER.decode(inStream);
+ String emailAddress = STRING_CODER.decode(inStream);
+ String creditCard = STRING_CODER.decode(inStream);
+ String city = STRING_CODER.decode(inStream);
+ String state = STRING_CODER.decode(inStream);
+ long dateTime = LONG_CODER.decode(inStream);
+ String extra = STRING_CODER.decode(inStream);
return new Person(id, name, emailAddress, creditCard, city, state, dateTime, extra);
}
@Override public void verifyDeterministic() throws NonDeterministicException {}
http://git-wip-us.apache.org/repos/asf/beam/blob/3d5c3d00/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/SellerPrice.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/SellerPrice.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/SellerPrice.java
index 52ff540..61537f6 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/SellerPrice.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/SellerPrice.java
@@ -37,19 +37,18 @@ public class SellerPrice implements KnownSize, Serializable {
public static final Coder<SellerPrice> CODER = new CustomCoder<SellerPrice>() {
@Override
- public void encode(SellerPrice value, OutputStream outStream,
- Coder.Context context)
+ public void encode(SellerPrice value, OutputStream outStream)
throws CoderException, IOException {
- LONG_CODER.encode(value.seller, outStream, Context.NESTED);
- LONG_CODER.encode(value.price, outStream, Context.NESTED);
+ LONG_CODER.encode(value.seller, outStream);
+ LONG_CODER.encode(value.price, outStream);
}
@Override
public SellerPrice decode(
- InputStream inStream, Coder.Context context)
+ InputStream inStream)
throws CoderException, IOException {
- long seller = LONG_CODER.decode(inStream, Context.NESTED);
- long price = LONG_CODER.decode(inStream, Context.NESTED);
+ long seller = LONG_CODER.decode(inStream);
+ long price = LONG_CODER.decode(inStream);
return new SellerPrice(seller, price);
}
@Override public void verifyDeterministic() throws NonDeterministicException {}
http://git-wip-us.apache.org/repos/asf/beam/blob/3d5c3d00/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/WinningBids.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/WinningBids.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/WinningBids.java
index 52891a7..bd6c2ed 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/WinningBids.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/WinningBids.java
@@ -156,19 +156,19 @@ public class WinningBids extends PTransform<PCollection<Event>, PCollection<Auct
}
@Override
- public void encode(AuctionOrBidWindow window, OutputStream outStream, Coder.Context context)
+ public void encode(AuctionOrBidWindow window, OutputStream outStream)
throws IOException, CoderException {
- SUPER_CODER.encode(window, outStream, Coder.Context.NESTED);
- ID_CODER.encode(window.auction, outStream, Coder.Context.NESTED);
- INT_CODER.encode(window.isAuctionWindow ? 1 : 0, outStream, Coder.Context.NESTED);
+ SUPER_CODER.encode(window, outStream);
+ ID_CODER.encode(window.auction, outStream);
+ INT_CODER.encode(window.isAuctionWindow ? 1 : 0, outStream);
}
@Override
- public AuctionOrBidWindow decode(InputStream inStream, Coder.Context context)
+ public AuctionOrBidWindow decode(InputStream inStream)
throws IOException, CoderException {
- IntervalWindow superWindow = SUPER_CODER.decode(inStream, Coder.Context.NESTED);
- long auction = ID_CODER.decode(inStream, Coder.Context.NESTED);
- boolean isAuctionWindow = INT_CODER.decode(inStream, Context.NESTED) != 0;
+ IntervalWindow superWindow = SUPER_CODER.decode(inStream);
+ long auction = ID_CODER.decode(inStream);
+ boolean isAuctionWindow = INT_CODER.decode(inStream) != 0;
return new AuctionOrBidWindow(
superWindow.start(), superWindow.end(), auction, isAuctionWindow);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/3d5c3d00/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/sources/Generator.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/sources/Generator.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/sources/Generator.java
index 2a2732b..4f548cd 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/sources/Generator.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/sources/Generator.java
@@ -102,22 +102,17 @@ public class Generator implements Iterator<TimestampedValue<Event>>, Serializabl
/** Coder for this class. */
public static final Coder<Checkpoint> CODER_INSTANCE =
new CustomCoder<Checkpoint>() {
- @Override
- public void encode(
- Checkpoint value,
- OutputStream outStream,
- Coder.Context context)
- throws CoderException, IOException {
- LONG_CODER.encode(value.numEvents, outStream, Context.NESTED);
- LONG_CODER.encode(value.wallclockBaseTime, outStream, Context.NESTED);
+ @Override public void encode(Checkpoint value, OutputStream outStream)
+ throws CoderException, IOException {
+ LONG_CODER.encode(value.numEvents, outStream);
+ LONG_CODER.encode(value.wallclockBaseTime, outStream);
}
@Override
- public Checkpoint decode(
- InputStream inStream, Coder.Context context)
+ public Checkpoint decode(InputStream inStream)
throws CoderException, IOException {
- long numEvents = LONG_CODER.decode(inStream, Context.NESTED);
- long wallclockBaseTime = LONG_CODER.decode(inStream, Context.NESTED);
+ long numEvents = LONG_CODER.decode(inStream);
+ long wallclockBaseTime = LONG_CODER.decode(inStream);
return new Checkpoint(numEvents, wallclockBaseTime);
}
@Override public void verifyDeterministic() throws NonDeterministicException {}
http://git-wip-us.apache.org/repos/asf/beam/blob/3d5c3d00/integration/java/pom.xml
----------------------------------------------------------------------
diff --git a/integration/java/pom.xml b/integration/java/pom.xml
index dcad4c3..b0c3853 100644
--- a/integration/java/pom.xml
+++ b/integration/java/pom.xml
@@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.beam</groupId>
<artifactId>beam-integration-parent</artifactId>
- <version>0.7.0-SNAPSHOT</version>
+ <version>2.1.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
http://git-wip-us.apache.org/repos/asf/beam/blob/3d5c3d00/integration/pom.xml
----------------------------------------------------------------------
diff --git a/integration/pom.xml b/integration/pom.xml
index 31f293e..4254819 100644
--- a/integration/pom.xml
+++ b/integration/pom.xml
@@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.beam</groupId>
<artifactId>beam-parent</artifactId>
- <version>0.7.0-SNAPSHOT</version>
+ <version>2.1.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>