You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ie...@apache.org on 2017/08/23 17:09:36 UTC
[28/55] [abbrv] beam git commit: Move WinningBids into the queries
package
http://git-wip-us.apache.org/repos/asf/beam/blob/a39cb800/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query3.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query3.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query3.java
index 12b16f1..71364ba 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query3.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query3.java
@@ -21,7 +21,6 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.beam.integration.nexmark.NexmarkConfiguration;
-import org.apache.beam.integration.nexmark.NexmarkQuery;
import org.apache.beam.integration.nexmark.NexmarkUtils;
import org.apache.beam.integration.nexmark.model.Auction;
import org.apache.beam.integration.nexmark.model.Event;
http://git-wip-us.apache.org/repos/asf/beam/blob/a39cb800/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query3Model.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query3Model.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query3Model.java
index e4b72d2..6b98e2a 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query3Model.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query3Model.java
@@ -26,9 +26,7 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
-import org.apache.beam.integration.nexmark.AbstractSimulator;
import org.apache.beam.integration.nexmark.NexmarkConfiguration;
-import org.apache.beam.integration.nexmark.NexmarkQueryModel;
import org.apache.beam.integration.nexmark.NexmarkUtils;
import org.apache.beam.integration.nexmark.model.Auction;
import org.apache.beam.integration.nexmark.model.Event;
http://git-wip-us.apache.org/repos/asf/beam/blob/a39cb800/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query4.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query4.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query4.java
index 61991c8..9c0fe6d 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query4.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query4.java
@@ -19,9 +19,7 @@ package org.apache.beam.integration.nexmark.queries;
import org.apache.beam.integration.nexmark.Monitor;
import org.apache.beam.integration.nexmark.NexmarkConfiguration;
-import org.apache.beam.integration.nexmark.NexmarkQuery;
import org.apache.beam.integration.nexmark.NexmarkUtils;
-import org.apache.beam.integration.nexmark.WinningBids;
import org.apache.beam.integration.nexmark.model.Auction;
import org.apache.beam.integration.nexmark.model.AuctionBid;
import org.apache.beam.integration.nexmark.model.Bid;
http://git-wip-us.apache.org/repos/asf/beam/blob/a39cb800/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query4Model.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query4Model.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query4Model.java
index 9405ac8..634a58e 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query4Model.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query4Model.java
@@ -24,11 +24,8 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
-import org.apache.beam.integration.nexmark.AbstractSimulator;
import org.apache.beam.integration.nexmark.NexmarkConfiguration;
-import org.apache.beam.integration.nexmark.NexmarkQueryModel;
import org.apache.beam.integration.nexmark.NexmarkUtils;
-import org.apache.beam.integration.nexmark.WinningBidsSimulator;
import org.apache.beam.integration.nexmark.model.Auction;
import org.apache.beam.integration.nexmark.model.AuctionBid;
import org.apache.beam.integration.nexmark.model.Bid;
http://git-wip-us.apache.org/repos/asf/beam/blob/a39cb800/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query5.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query5.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query5.java
index 34b7b50..18ce578 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query5.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query5.java
@@ -21,7 +21,6 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.beam.integration.nexmark.NexmarkConfiguration;
-import org.apache.beam.integration.nexmark.NexmarkQuery;
import org.apache.beam.integration.nexmark.NexmarkUtils;
import org.apache.beam.integration.nexmark.model.AuctionCount;
import org.apache.beam.integration.nexmark.model.Bid;
http://git-wip-us.apache.org/repos/asf/beam/blob/a39cb800/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query5Model.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query5Model.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query5Model.java
index 6bf65dc..24d9a00 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query5Model.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query5Model.java
@@ -24,9 +24,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
-import org.apache.beam.integration.nexmark.AbstractSimulator;
import org.apache.beam.integration.nexmark.NexmarkConfiguration;
-import org.apache.beam.integration.nexmark.NexmarkQueryModel;
import org.apache.beam.integration.nexmark.NexmarkUtils;
import org.apache.beam.integration.nexmark.model.AuctionCount;
import org.apache.beam.integration.nexmark.model.Bid;
http://git-wip-us.apache.org/repos/asf/beam/blob/a39cb800/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query6.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query6.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query6.java
index 2a5ab702..65789ab 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query6.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query6.java
@@ -22,9 +22,7 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import org.apache.beam.integration.nexmark.NexmarkConfiguration;
-import org.apache.beam.integration.nexmark.NexmarkQuery;
import org.apache.beam.integration.nexmark.NexmarkUtils;
-import org.apache.beam.integration.nexmark.WinningBids;
import org.apache.beam.integration.nexmark.model.Auction;
import org.apache.beam.integration.nexmark.model.AuctionBid;
import org.apache.beam.integration.nexmark.model.Bid;
http://git-wip-us.apache.org/repos/asf/beam/blob/a39cb800/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query6Model.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query6Model.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query6Model.java
index 4325337..0691714 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query6Model.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query6Model.java
@@ -22,11 +22,8 @@ import java.util.Collection;
import java.util.Iterator;
import java.util.Map;
import java.util.TreeMap;
-import org.apache.beam.integration.nexmark.AbstractSimulator;
import org.apache.beam.integration.nexmark.NexmarkConfiguration;
-import org.apache.beam.integration.nexmark.NexmarkQueryModel;
import org.apache.beam.integration.nexmark.NexmarkUtils;
-import org.apache.beam.integration.nexmark.WinningBidsSimulator;
import org.apache.beam.integration.nexmark.model.Auction;
import org.apache.beam.integration.nexmark.model.AuctionBid;
import org.apache.beam.integration.nexmark.model.Bid;
http://git-wip-us.apache.org/repos/asf/beam/blob/a39cb800/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query7.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query7.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query7.java
index f3d1ba4..2a94ca9 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query7.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query7.java
@@ -18,7 +18,6 @@
package org.apache.beam.integration.nexmark.queries;
import org.apache.beam.integration.nexmark.NexmarkConfiguration;
-import org.apache.beam.integration.nexmark.NexmarkQuery;
import org.apache.beam.integration.nexmark.NexmarkUtils;
import org.apache.beam.integration.nexmark.model.Bid;
import org.apache.beam.integration.nexmark.model.Event;
http://git-wip-us.apache.org/repos/asf/beam/blob/a39cb800/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query7Model.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query7Model.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query7Model.java
index 0a80e59..5c039f9 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query7Model.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query7Model.java
@@ -23,9 +23,7 @@ import java.util.Collection;
import java.util.Iterator;
import java.util.List;
-import org.apache.beam.integration.nexmark.AbstractSimulator;
import org.apache.beam.integration.nexmark.NexmarkConfiguration;
-import org.apache.beam.integration.nexmark.NexmarkQueryModel;
import org.apache.beam.integration.nexmark.NexmarkUtils;
import org.apache.beam.integration.nexmark.model.Bid;
import org.apache.beam.integration.nexmark.model.Event;
http://git-wip-us.apache.org/repos/asf/beam/blob/a39cb800/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query8.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query8.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query8.java
index e7daccd..603841b 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query8.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query8.java
@@ -18,7 +18,6 @@
package org.apache.beam.integration.nexmark.queries;
import org.apache.beam.integration.nexmark.NexmarkConfiguration;
-import org.apache.beam.integration.nexmark.NexmarkQuery;
import org.apache.beam.integration.nexmark.NexmarkUtils;
import org.apache.beam.integration.nexmark.model.Auction;
import org.apache.beam.integration.nexmark.model.Event;
http://git-wip-us.apache.org/repos/asf/beam/blob/a39cb800/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query8Model.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query8Model.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query8Model.java
index 1161994..8c76bc6 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query8Model.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query8Model.java
@@ -24,9 +24,7 @@ import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
-import org.apache.beam.integration.nexmark.AbstractSimulator;
import org.apache.beam.integration.nexmark.NexmarkConfiguration;
-import org.apache.beam.integration.nexmark.NexmarkQueryModel;
import org.apache.beam.integration.nexmark.NexmarkUtils;
import org.apache.beam.integration.nexmark.model.Auction;
import org.apache.beam.integration.nexmark.model.Event;
http://git-wip-us.apache.org/repos/asf/beam/blob/a39cb800/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query9.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query9.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query9.java
index aed827b..6dd189d 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query9.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query9.java
@@ -18,9 +18,7 @@
package org.apache.beam.integration.nexmark.queries;
import org.apache.beam.integration.nexmark.NexmarkConfiguration;
-import org.apache.beam.integration.nexmark.NexmarkQuery;
import org.apache.beam.integration.nexmark.NexmarkUtils;
-import org.apache.beam.integration.nexmark.WinningBids;
import org.apache.beam.integration.nexmark.model.AuctionBid;
import org.apache.beam.integration.nexmark.model.Event;
import org.apache.beam.integration.nexmark.model.KnownSize;
http://git-wip-us.apache.org/repos/asf/beam/blob/a39cb800/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query9Model.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query9Model.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query9Model.java
index b88d60a..d117e2d 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query9Model.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query9Model.java
@@ -21,10 +21,7 @@ import java.io.Serializable;
import java.util.Collection;
import java.util.Iterator;
-import org.apache.beam.integration.nexmark.AbstractSimulator;
import org.apache.beam.integration.nexmark.NexmarkConfiguration;
-import org.apache.beam.integration.nexmark.NexmarkQueryModel;
-import org.apache.beam.integration.nexmark.WinningBidsSimulator;
import org.apache.beam.sdk.values.TimestampedValue;
/**
http://git-wip-us.apache.org/repos/asf/beam/blob/a39cb800/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/WinningBids.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/WinningBids.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/WinningBids.java
new file mode 100644
index 0000000..11a4d38
--- /dev/null
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/WinningBids.java
@@ -0,0 +1,379 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.integration.nexmark.queries;
+
+import static com.google.common.base.Preconditions.checkState;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import org.apache.beam.integration.nexmark.NexmarkConfiguration;
+import org.apache.beam.integration.nexmark.NexmarkUtils;
+import org.apache.beam.integration.nexmark.model.Auction;
+import org.apache.beam.integration.nexmark.model.AuctionBid;
+import org.apache.beam.integration.nexmark.model.Bid;
+import org.apache.beam.integration.nexmark.model.Event;
+import org.apache.beam.integration.nexmark.sources.GeneratorConfig;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.CustomCoder;
+import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.coders.VarLongCoder;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Metrics;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.join.CoGbkResult;
+import org.apache.beam.sdk.transforms.join.CoGroupByKey;
+import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple;
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.transforms.windowing.WindowMappingFn;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.joda.time.Instant;
+
+/**
+ * A transform to find the winning bid for each closed auction. In pseudo CQL syntax:
+ *
+ * <pre>{@code
+ * SELECT Rstream(A.*, B.auction, B.bidder, MAX(B.price), B.dateTime)
+ * FROM Auction A [ROWS UNBOUNDED], Bid B [ROWS UNBOUNDED]
+ * WHERE A.id = B.auction AND B.datetime < A.expires AND A.expires < CURRENT_TIME
+ * GROUP BY A.id
+ * }</pre>
+ *
+ * <p>We will also check that the winning bid is above the auction reserve. Note that
+ * we ignore the auction opening bid value since it has no impact on which bid eventually wins,
+ * if any.
+ *
+ * <p>Our implementation will use a custom windowing function in order to bring bids and
+ * auctions together without requiring global state.
+ */
+public class WinningBids extends PTransform<PCollection<Event>, PCollection<AuctionBid>> {
+ /** Windows for open auctions and bids. */
+ private static class AuctionOrBidWindow extends IntervalWindow implements Serializable {
+ /** Id of auction this window is for. */
+ public final long auction;
+
+ /**
+ * True if this window represents an actual auction, and thus has a start/end
+ * time matching that of the auction. False if this window represents a bid, and
+ * thus has an unbounded start/end time.
+ */
+ public final boolean isAuctionWindow;
+
+ /** For avro only. */
+ private AuctionOrBidWindow() {
+ super(TIMESTAMP_MIN_VALUE, TIMESTAMP_MAX_VALUE);
+ auction = 0;
+ isAuctionWindow = false;
+ }
+
+ private AuctionOrBidWindow(
+ Instant start, Instant end, long auctionId, boolean isAuctionWindow) {
+ super(start, end);
+ this.auction = auctionId;
+ this.isAuctionWindow = isAuctionWindow;
+ }
+
+ /** Return an auction window for {@code auction}. */
+ public static AuctionOrBidWindow forAuction(Instant timestamp, Auction auction) {
+ AuctionOrBidWindow result =
+ new AuctionOrBidWindow(timestamp, new Instant(auction.expires), auction.id, true);
+ return result;
+ }
+
+ /**
+ * Return a bid window for {@code bid}. It should later be merged into
+ * the corresponding auction window. However, it is possible this bid is for an already
+ * expired auction, or for an auction which the system has not yet seen. So we
+ * give the bid a bit of wiggle room in its interval.
+ */
+ public static AuctionOrBidWindow forBid(
+ long expectedAuctionDurationMs, Instant timestamp, Bid bid) {
+ // At this point we don't know which auctions are still valid, and the bid may
+ // be for an auction which won't start until some unknown time in the future
+ // (due to Generator.AUCTION_ID_LEAD in Generator.nextBid).
+ // A real system would atomically reconcile bids and auctions by a separate mechanism.
+ // If we give bids an unbounded window it is possible a bid for an auction which
+ // has already expired would cause the system watermark to stall, since that window
+ // would never be retired.
+ // Instead, we will just give the bid a finite window which expires at
+ // the upper bound of auctions assuming the auction starts at the same time as the bid,
+ // and assuming the system is running at its lowest event rate (as per interEventDelayUs).
+ AuctionOrBidWindow result = new AuctionOrBidWindow(
+ timestamp, timestamp.plus(expectedAuctionDurationMs * 2), bid.auction, false);
+ return result;
+ }
+
+ /** Is this an auction window? */
+ public boolean isAuctionWindow() {
+ return isAuctionWindow;
+ }
+
+ @Override
+ public String toString() {
+ return String.format("AuctionOrBidWindow{start:%s; end:%s; auction:%d; isAuctionWindow:%s}",
+ start(), end(), auction, isAuctionWindow);
+ }
+ }
+
+ /**
+ * Encodes an {@link AuctionOrBidWindow} as an {@link IntervalWindow} and an auction id long.
+ */
+ private static class AuctionOrBidWindowCoder extends CustomCoder<AuctionOrBidWindow> {
+ private static final AuctionOrBidWindowCoder INSTANCE = new AuctionOrBidWindowCoder();
+ private static final Coder<IntervalWindow> SUPER_CODER = IntervalWindow.getCoder();
+ private static final Coder<Long> ID_CODER = VarLongCoder.of();
+ private static final Coder<Integer> INT_CODER = VarIntCoder.of();
+
+ @JsonCreator
+ public static AuctionOrBidWindowCoder of() {
+ return INSTANCE;
+ }
+
+ @Override
+ public void encode(AuctionOrBidWindow window, OutputStream outStream, Coder.Context context)
+ throws IOException, CoderException {
+ SUPER_CODER.encode(window, outStream, Coder.Context.NESTED);
+ ID_CODER.encode(window.auction, outStream, Coder.Context.NESTED);
+ INT_CODER.encode(window.isAuctionWindow ? 1 : 0, outStream, Coder.Context.NESTED);
+ }
+
+ @Override
+ public AuctionOrBidWindow decode(InputStream inStream, Coder.Context context)
+ throws IOException, CoderException {
+ IntervalWindow superWindow = SUPER_CODER.decode(inStream, Coder.Context.NESTED);
+ long auction = ID_CODER.decode(inStream, Coder.Context.NESTED);
+ boolean isAuctionWindow =
+ INT_CODER.decode(inStream, Coder.Context.NESTED) == 0 ? false : true;
+ return new AuctionOrBidWindow(
+ superWindow.start(), superWindow.end(), auction, isAuctionWindow);
+ }
+
+ @Override public void verifyDeterministic() throws NonDeterministicException {}
+ }
+
+ /** Assign events to auction windows and merges them intelligently. */
+ private static class AuctionOrBidWindowFn extends WindowFn<Event, AuctionOrBidWindow> {
+ /** Expected duration of auctions in ms. */
+ private final long expectedAuctionDurationMs;
+
+ public AuctionOrBidWindowFn(long expectedAuctionDurationMs) {
+ this.expectedAuctionDurationMs = expectedAuctionDurationMs;
+ }
+
+ @Override
+ public Collection<AuctionOrBidWindow> assignWindows(AssignContext c) {
+ Event event = c.element();
+ if (event.newAuction != null) {
+ // Assign auctions to an auction window which expires at the auction's close.
+ return Arrays.asList(AuctionOrBidWindow.forAuction(c.timestamp(), event.newAuction));
+ } else if (event.bid != null) {
+ // Assign bids to a temporary bid window which will later be merged into the appropriate
+ // auction window.
+ return Arrays.asList(
+ AuctionOrBidWindow.forBid(expectedAuctionDurationMs, c.timestamp(), event.bid));
+ } else {
+ // Don't assign people to any window. They will thus be dropped.
+ return Arrays.asList();
+ }
+ }
+
+ @Override
+ public void mergeWindows(MergeContext c) throws Exception {
+ // Split and index the auction and bid windows by auction id.
+ Map<Long, AuctionOrBidWindow> idToTrueAuctionWindow = new TreeMap<>();
+ Map<Long, List<AuctionOrBidWindow>> idToBidAuctionWindows = new TreeMap<>();
+ for (AuctionOrBidWindow window : c.windows()) {
+ if (window.isAuctionWindow()) {
+ idToTrueAuctionWindow.put(window.auction, window);
+ } else {
+ List<AuctionOrBidWindow> bidWindows = idToBidAuctionWindows.get(window.auction);
+ if (bidWindows == null) {
+ bidWindows = new ArrayList<>();
+ idToBidAuctionWindows.put(window.auction, bidWindows);
+ }
+ bidWindows.add(window);
+ }
+ }
+
+ // Merge all 'bid' windows into their corresponding 'auction' window, provided the
+ // auction has not expired.
+ for (long auction : idToTrueAuctionWindow.keySet()) {
+ AuctionOrBidWindow auctionWindow = idToTrueAuctionWindow.get(auction);
+ List<AuctionOrBidWindow> bidWindows = idToBidAuctionWindows.get(auction);
+ if (bidWindows != null) {
+ List<AuctionOrBidWindow> toBeMerged = new ArrayList<>();
+ for (AuctionOrBidWindow bidWindow : bidWindows) {
+ if (bidWindow.start().isBefore(auctionWindow.end())) {
+ toBeMerged.add(bidWindow);
+ }
+ // else: This bid window will remain until its expire time, at which point it
+ // will expire without ever contributing to an output.
+ }
+ if (!toBeMerged.isEmpty()) {
+ toBeMerged.add(auctionWindow);
+ c.merge(toBeMerged, auctionWindow);
+ }
+ }
+ }
+ }
+
+ @Override
+ public boolean isCompatible(WindowFn<?, ?> other) {
+ return other instanceof AuctionOrBidWindowFn;
+ }
+
+ @Override
+ public Coder<AuctionOrBidWindow> windowCoder() {
+ return AuctionOrBidWindowCoder.of();
+ }
+
+ @Override
+ public WindowMappingFn<AuctionOrBidWindow> getDefaultWindowMappingFn() {
+ throw new UnsupportedOperationException("AuctionWindowFn not supported for side inputs");
+ }
+
+ /**
+ * Below we will GBK auctions and bids on their auction ids. Then we will reduce those
+ * per id to emit {@code (auction, winning bid)} pairs for auctions which have expired with at
+ * least one valid bid. We would like those output pairs to have a timestamp of the auction's
+ * expiry (since that's the earliest we know for sure we have the correct winner). We would
+ * also like to make that winning results are available to following stages at the auction's
+ * expiry.
+ *
+ * <p>Each result of the GBK will have a timestamp of the min of the result of this object's
+ * assignOutputTime over all records which end up in one of its iterables. Thus we get the
+ * desired behavior if we ignore each record's timestamp and always return the auction window's
+ * 'maxTimestamp', which will correspond to the auction's expiry.
+ *
+ * <p>In contrast, if this object's assignOutputTime were to return 'inputTimestamp'
+ * (the usual implementation), then each GBK record will take as its timestamp the minimum of
+ * the timestamps of all bids and auctions within it, which will always be the auction's
+ * timestamp. An auction which expires well into the future would thus hold up the watermark
+ * of the GBK results until that auction expired. That in turn would hold up all winning pairs.
+ */
+ @Override
+ public Instant getOutputTime(
+ Instant inputTimestamp, AuctionOrBidWindow window) {
+ return window.maxTimestamp();
+ }
+ }
+
+ private final AuctionOrBidWindowFn auctionOrBidWindowFn;
+
+ public WinningBids(String name, NexmarkConfiguration configuration) {
+ super(name);
+ // What's the expected auction time (when the system is running at the lowest event rate).
+ long[] interEventDelayUs = configuration.rateShape.interEventDelayUs(
+ configuration.firstEventRate, configuration.nextEventRate,
+ configuration.rateUnit, configuration.numEventGenerators);
+ long longestDelayUs = 0;
+ for (int i = 0; i < interEventDelayUs.length; i++) {
+ longestDelayUs = Math.max(longestDelayUs, interEventDelayUs[i]);
+ }
+ // Adjust for proportion of auction events amongst all events.
+ longestDelayUs =
+ (longestDelayUs * GeneratorConfig.PROPORTION_DENOMINATOR)
+ / GeneratorConfig.AUCTION_PROPORTION;
+ // Adjust for number of in-flight auctions.
+ longestDelayUs = longestDelayUs * configuration.numInFlightAuctions;
+ long expectedAuctionDurationMs = (longestDelayUs + 999) / 1000;
+ NexmarkUtils.console("Expected auction duration is %d ms", expectedAuctionDurationMs);
+ auctionOrBidWindowFn = new AuctionOrBidWindowFn(expectedAuctionDurationMs);
+ }
+
+ @Override
+ public PCollection<AuctionBid> expand(PCollection<Event> events) {
+ // Window auctions and bids into custom auction windows. New people events will be discarded.
+ // This will allow us to bring bids and auctions together irrespective of how long
+ // each auction is open for.
+ events = events.apply("Window", Window.into(auctionOrBidWindowFn));
+
+ // Key auctions by their id.
+ PCollection<KV<Long, Auction>> auctionsById =
+ events.apply(NexmarkQuery.JUST_NEW_AUCTIONS)
+ .apply("AuctionById:", NexmarkQuery.AUCTION_BY_ID);
+
+ // Key bids by their auction id.
+ PCollection<KV<Long, Bid>> bidsByAuctionId =
+ events.apply(NexmarkQuery.JUST_BIDS).apply("BidByAuction", NexmarkQuery.BID_BY_AUCTION);
+
+ // Find the highest price valid bid for each closed auction.
+ return
+ // Join auctions and bids.
+ KeyedPCollectionTuple.of(NexmarkQuery.AUCTION_TAG, auctionsById)
+ .and(NexmarkQuery.BID_TAG, bidsByAuctionId)
+ .apply(CoGroupByKey.<Long>create())
+ // Filter and select.
+ .apply(name + ".Join",
+ ParDo.of(new DoFn<KV<Long, CoGbkResult>, AuctionBid>() {
+ private final Counter noAuctionCounter = Metrics.counter(name, "noAuction");
+ private final Counter underReserveCounter = Metrics.counter(name, "underReserve");
+ private final Counter noValidBidsCounter = Metrics.counter(name, "noValidBids");
+
+ @ProcessElement
+ public void processElement(ProcessContext c) {
+ Auction auction =
+ c.element().getValue().getOnly(NexmarkQuery.AUCTION_TAG, null);
+ if (auction == null) {
+ // We have bids without a matching auction. Give up.
+ noAuctionCounter.inc();
+ return;
+ }
+ // Find the current winning bid for auction.
+ // The earliest bid with the maximum price above the reserve wins.
+ Bid bestBid = null;
+ for (Bid bid : c.element().getValue().getAll(NexmarkQuery.BID_TAG)) {
+ // Bids too late for their auction will have been
+ // filtered out by the window merge function.
+ checkState(bid.dateTime < auction.expires);
+ if (bid.price < auction.reserve) {
+ // Bid price is below auction reserve.
+ underReserveCounter.inc();
+ continue;
+ }
+
+ if (bestBid == null
+ || Bid.PRICE_THEN_DESCENDING_TIME.compare(bid, bestBid) > 0) {
+ bestBid = bid;
+ }
+ }
+ if (bestBid == null) {
+ // We don't have any valid bids for auction.
+ noValidBidsCounter.inc();
+ return;
+ }
+ c.output(new AuctionBid(auction, bestBid));
+ }
+ }
+ ));
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/a39cb800/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/WinningBidsSimulator.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/WinningBidsSimulator.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/WinningBidsSimulator.java
new file mode 100644
index 0000000..7d74f8f
--- /dev/null
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/WinningBidsSimulator.java
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.integration.nexmark.queries;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import javax.annotation.Nullable;
+
+import org.apache.beam.integration.nexmark.NexmarkConfiguration;
+import org.apache.beam.integration.nexmark.NexmarkUtils;
+import org.apache.beam.integration.nexmark.model.Auction;
+import org.apache.beam.integration.nexmark.model.AuctionBid;
+import org.apache.beam.integration.nexmark.model.Bid;
+import org.apache.beam.integration.nexmark.model.Event;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.joda.time.Instant;
+
+/**
+ * A simulator of the {@code WinningBids} query.
+ */
+public class WinningBidsSimulator extends AbstractSimulator<Event, AuctionBid> {
+ /** Auctions currently still open, indexed by auction id. */
+ private final Map<Long, Auction> openAuctions;
+
+ /** The ids of auctions known to be closed. */
+ private final Set<Long> closedAuctions;
+
+ /** Current best valid bids for open auctions, indexed by auction id. */
+ private final Map<Long, Bid> bestBids;
+
+ /** Bids for auctions we havn't seen yet. */
+ private final List<Bid> bidsWithoutAuctions;
+
+ /**
+ * Timestamp of last new auction or bid event (ms since epoch).
+ */
+ private long lastTimestamp;
+
+ public WinningBidsSimulator(NexmarkConfiguration configuration) {
+ super(NexmarkUtils.standardEventIterator(configuration));
+ openAuctions = new TreeMap<>();
+ closedAuctions = new TreeSet<>();
+ bestBids = new TreeMap<>();
+ bidsWithoutAuctions = new ArrayList<>();
+ lastTimestamp = BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis();
+ }
+
+ /**
+ * Try to account for {@code bid} in state. Return true if bid has now been
+ * accounted for by {@code bestBids}.
+ */
+ private boolean captureBestBid(Bid bid, boolean shouldLog) {
+ if (closedAuctions.contains(bid.auction)) {
+ // Ignore bids for known, closed auctions.
+ if (shouldLog) {
+ NexmarkUtils.info("closed auction: %s", bid);
+ }
+ return true;
+ }
+ Auction auction = openAuctions.get(bid.auction);
+ if (auction == null) {
+ // We don't have an auction for this bid yet, so can't determine if it is
+ // winning or not.
+ if (shouldLog) {
+ NexmarkUtils.info("pending auction: %s", bid);
+ }
+ return false;
+ }
+ if (bid.price < auction.reserve) {
+ // Bid price is too low.
+ if (shouldLog) {
+ NexmarkUtils.info("below reserve: %s", bid);
+ }
+ return true;
+ }
+ Bid existingBid = bestBids.get(bid.auction);
+ if (existingBid == null || Bid.PRICE_THEN_DESCENDING_TIME.compare(existingBid, bid) < 0) {
+ // We've found a (new) best bid for a known auction.
+ bestBids.put(bid.auction, bid);
+ if (shouldLog) {
+ NexmarkUtils.info("new winning bid: %s", bid);
+ }
+ } else {
+ if (shouldLog) {
+ NexmarkUtils.info("ignoring low bid: %s", bid);
+ }
+ }
+ return true;
+ }
+
+ /**
+ * Try to match bids without auctions to auctions.
+ */
+ private void flushBidsWithoutAuctions() {
+ Iterator<Bid> itr = bidsWithoutAuctions.iterator();
+ while (itr.hasNext()) {
+ Bid bid = itr.next();
+ if (captureBestBid(bid, false)) {
+ NexmarkUtils.info("bid now accounted for: %s", bid);
+ itr.remove();
+ }
+ }
+ }
+
+ /**
+ * Return the next winning bid for an expired auction relative to {@code timestamp}.
+ * Return null if no more winning bids, in which case all expired auctions will
+ * have been removed from our state. Retire auctions in order of expire time.
+ */
+ @Nullable
+ private TimestampedValue<AuctionBid> nextWinningBid(long timestamp) {
+ Map<Long, List<Long>> toBeRetired = new TreeMap<>();
+ for (Map.Entry<Long, Auction> entry : openAuctions.entrySet()) {
+ if (entry.getValue().expires <= timestamp) {
+ List<Long> idsAtTime = toBeRetired.get(entry.getValue().expires);
+ if (idsAtTime == null) {
+ idsAtTime = new ArrayList<>();
+ toBeRetired.put(entry.getValue().expires, idsAtTime);
+ }
+ idsAtTime.add(entry.getKey());
+ }
+ }
+ for (Map.Entry<Long, List<Long>> entry : toBeRetired.entrySet()) {
+ for (long id : entry.getValue()) {
+ Auction auction = openAuctions.get(id);
+ NexmarkUtils.info("retiring auction: %s", auction);
+ openAuctions.remove(id);
+ Bid bestBid = bestBids.get(id);
+ if (bestBid != null) {
+ TimestampedValue<AuctionBid> result =
+ TimestampedValue.of(new AuctionBid(auction, bestBid), new Instant(auction.expires));
+ NexmarkUtils.info("winning: %s", result);
+ return result;
+ }
+ }
+ }
+ return null;
+ }
+
+ @Override
+ protected void run() {
+ if (lastTimestamp > BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis()) {
+ // We may have finally seen the auction a bid was intended for.
+ flushBidsWithoutAuctions();
+ TimestampedValue<AuctionBid> result = nextWinningBid(lastTimestamp);
+ if (result != null) {
+ addResult(result);
+ return;
+ }
+ }
+
+ TimestampedValue<Event> timestampedEvent = nextInput();
+ if (timestampedEvent == null) {
+ // No more events. Flush any still open auctions.
+ TimestampedValue<AuctionBid> result =
+ nextWinningBid(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis());
+ if (result == null) {
+ // We are done.
+ allDone();
+ return;
+ }
+ addResult(result);
+ //TODO test fails because offset of some hundreds of ms beween expect and actual
+ return;
+ }
+
+ Event event = timestampedEvent.getValue();
+ if (event.newPerson != null) {
+ // Ignore new person events.
+ return;
+ }
+
+ lastTimestamp = timestampedEvent.getTimestamp().getMillis();
+ if (event.newAuction != null) {
+ // Add this new open auction to our state.
+ openAuctions.put(event.newAuction.id, event.newAuction);
+ } else {
+ if (!captureBestBid(event.bid, true)) {
+ // We don't know what to do with this bid yet.
+ NexmarkUtils.info("bid not yet accounted for: %s", event.bid);
+ bidsWithoutAuctions.add(event.bid);
+ }
+ }
+ // Keep looking for winning bids.
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/a39cb800/integration/java/nexmark/src/test/java/org/apache/beam/integration/nexmark/queries/QueryTest.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/test/java/org/apache/beam/integration/nexmark/queries/QueryTest.java b/integration/java/nexmark/src/test/java/org/apache/beam/integration/nexmark/queries/QueryTest.java
index 284aa7e..b005d65 100644
--- a/integration/java/nexmark/src/test/java/org/apache/beam/integration/nexmark/queries/QueryTest.java
+++ b/integration/java/nexmark/src/test/java/org/apache/beam/integration/nexmark/queries/QueryTest.java
@@ -18,8 +18,6 @@
package org.apache.beam.integration.nexmark.queries;
import org.apache.beam.integration.nexmark.NexmarkConfiguration;
-import org.apache.beam.integration.nexmark.NexmarkQuery;
-import org.apache.beam.integration.nexmark.NexmarkQueryModel;
import org.apache.beam.integration.nexmark.NexmarkUtils;
import org.apache.beam.integration.nexmark.model.KnownSize;
import org.apache.beam.sdk.PipelineResult;