You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ie...@apache.org on 2017/08/23 17:09:36 UTC

[28/55] [abbrv] beam git commit: Move WinningBids into the queries package

http://git-wip-us.apache.org/repos/asf/beam/blob/a39cb800/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query3.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query3.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query3.java
index 12b16f1..71364ba 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query3.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query3.java
@@ -21,7 +21,6 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import org.apache.beam.integration.nexmark.NexmarkConfiguration;
-import org.apache.beam.integration.nexmark.NexmarkQuery;
 import org.apache.beam.integration.nexmark.NexmarkUtils;
 import org.apache.beam.integration.nexmark.model.Auction;
 import org.apache.beam.integration.nexmark.model.Event;

http://git-wip-us.apache.org/repos/asf/beam/blob/a39cb800/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query3Model.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query3Model.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query3Model.java
index e4b72d2..6b98e2a 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query3Model.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query3Model.java
@@ -26,9 +26,7 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
 
-import org.apache.beam.integration.nexmark.AbstractSimulator;
 import org.apache.beam.integration.nexmark.NexmarkConfiguration;
-import org.apache.beam.integration.nexmark.NexmarkQueryModel;
 import org.apache.beam.integration.nexmark.NexmarkUtils;
 import org.apache.beam.integration.nexmark.model.Auction;
 import org.apache.beam.integration.nexmark.model.Event;

http://git-wip-us.apache.org/repos/asf/beam/blob/a39cb800/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query4.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query4.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query4.java
index 61991c8..9c0fe6d 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query4.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query4.java
@@ -19,9 +19,7 @@ package org.apache.beam.integration.nexmark.queries;
 
 import org.apache.beam.integration.nexmark.Monitor;
 import org.apache.beam.integration.nexmark.NexmarkConfiguration;
-import org.apache.beam.integration.nexmark.NexmarkQuery;
 import org.apache.beam.integration.nexmark.NexmarkUtils;
-import org.apache.beam.integration.nexmark.WinningBids;
 import org.apache.beam.integration.nexmark.model.Auction;
 import org.apache.beam.integration.nexmark.model.AuctionBid;
 import org.apache.beam.integration.nexmark.model.Bid;

http://git-wip-us.apache.org/repos/asf/beam/blob/a39cb800/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query4Model.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query4Model.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query4Model.java
index 9405ac8..634a58e 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query4Model.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query4Model.java
@@ -24,11 +24,8 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
-import org.apache.beam.integration.nexmark.AbstractSimulator;
 import org.apache.beam.integration.nexmark.NexmarkConfiguration;
-import org.apache.beam.integration.nexmark.NexmarkQueryModel;
 import org.apache.beam.integration.nexmark.NexmarkUtils;
-import org.apache.beam.integration.nexmark.WinningBidsSimulator;
 import org.apache.beam.integration.nexmark.model.Auction;
 import org.apache.beam.integration.nexmark.model.AuctionBid;
 import org.apache.beam.integration.nexmark.model.Bid;

http://git-wip-us.apache.org/repos/asf/beam/blob/a39cb800/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query5.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query5.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query5.java
index 34b7b50..18ce578 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query5.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query5.java
@@ -21,7 +21,6 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import org.apache.beam.integration.nexmark.NexmarkConfiguration;
-import org.apache.beam.integration.nexmark.NexmarkQuery;
 import org.apache.beam.integration.nexmark.NexmarkUtils;
 import org.apache.beam.integration.nexmark.model.AuctionCount;
 import org.apache.beam.integration.nexmark.model.Bid;

http://git-wip-us.apache.org/repos/asf/beam/blob/a39cb800/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query5Model.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query5Model.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query5Model.java
index 6bf65dc..24d9a00 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query5Model.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query5Model.java
@@ -24,9 +24,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
-import org.apache.beam.integration.nexmark.AbstractSimulator;
 import org.apache.beam.integration.nexmark.NexmarkConfiguration;
-import org.apache.beam.integration.nexmark.NexmarkQueryModel;
 import org.apache.beam.integration.nexmark.NexmarkUtils;
 import org.apache.beam.integration.nexmark.model.AuctionCount;
 import org.apache.beam.integration.nexmark.model.Bid;

http://git-wip-us.apache.org/repos/asf/beam/blob/a39cb800/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query6.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query6.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query6.java
index 2a5ab702..65789ab 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query6.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query6.java
@@ -22,9 +22,7 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import org.apache.beam.integration.nexmark.NexmarkConfiguration;
-import org.apache.beam.integration.nexmark.NexmarkQuery;
 import org.apache.beam.integration.nexmark.NexmarkUtils;
-import org.apache.beam.integration.nexmark.WinningBids;
 import org.apache.beam.integration.nexmark.model.Auction;
 import org.apache.beam.integration.nexmark.model.AuctionBid;
 import org.apache.beam.integration.nexmark.model.Bid;

http://git-wip-us.apache.org/repos/asf/beam/blob/a39cb800/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query6Model.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query6Model.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query6Model.java
index 4325337..0691714 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query6Model.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query6Model.java
@@ -22,11 +22,8 @@ import java.util.Collection;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.TreeMap;
-import org.apache.beam.integration.nexmark.AbstractSimulator;
 import org.apache.beam.integration.nexmark.NexmarkConfiguration;
-import org.apache.beam.integration.nexmark.NexmarkQueryModel;
 import org.apache.beam.integration.nexmark.NexmarkUtils;
-import org.apache.beam.integration.nexmark.WinningBidsSimulator;
 import org.apache.beam.integration.nexmark.model.Auction;
 import org.apache.beam.integration.nexmark.model.AuctionBid;
 import org.apache.beam.integration.nexmark.model.Bid;

http://git-wip-us.apache.org/repos/asf/beam/blob/a39cb800/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query7.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query7.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query7.java
index f3d1ba4..2a94ca9 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query7.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query7.java
@@ -18,7 +18,6 @@
 package org.apache.beam.integration.nexmark.queries;
 
 import org.apache.beam.integration.nexmark.NexmarkConfiguration;
-import org.apache.beam.integration.nexmark.NexmarkQuery;
 import org.apache.beam.integration.nexmark.NexmarkUtils;
 import org.apache.beam.integration.nexmark.model.Bid;
 import org.apache.beam.integration.nexmark.model.Event;

http://git-wip-us.apache.org/repos/asf/beam/blob/a39cb800/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query7Model.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query7Model.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query7Model.java
index 0a80e59..5c039f9 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query7Model.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query7Model.java
@@ -23,9 +23,7 @@ import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
 
-import org.apache.beam.integration.nexmark.AbstractSimulator;
 import org.apache.beam.integration.nexmark.NexmarkConfiguration;
-import org.apache.beam.integration.nexmark.NexmarkQueryModel;
 import org.apache.beam.integration.nexmark.NexmarkUtils;
 import org.apache.beam.integration.nexmark.model.Bid;
 import org.apache.beam.integration.nexmark.model.Event;

http://git-wip-us.apache.org/repos/asf/beam/blob/a39cb800/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query8.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query8.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query8.java
index e7daccd..603841b 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query8.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query8.java
@@ -18,7 +18,6 @@
 package org.apache.beam.integration.nexmark.queries;
 
 import org.apache.beam.integration.nexmark.NexmarkConfiguration;
-import org.apache.beam.integration.nexmark.NexmarkQuery;
 import org.apache.beam.integration.nexmark.NexmarkUtils;
 import org.apache.beam.integration.nexmark.model.Auction;
 import org.apache.beam.integration.nexmark.model.Event;

http://git-wip-us.apache.org/repos/asf/beam/blob/a39cb800/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query8Model.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query8Model.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query8Model.java
index 1161994..8c76bc6 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query8Model.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query8Model.java
@@ -24,9 +24,7 @@ import java.util.Collection;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
-import org.apache.beam.integration.nexmark.AbstractSimulator;
 import org.apache.beam.integration.nexmark.NexmarkConfiguration;
-import org.apache.beam.integration.nexmark.NexmarkQueryModel;
 import org.apache.beam.integration.nexmark.NexmarkUtils;
 import org.apache.beam.integration.nexmark.model.Auction;
 import org.apache.beam.integration.nexmark.model.Event;

http://git-wip-us.apache.org/repos/asf/beam/blob/a39cb800/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query9.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query9.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query9.java
index aed827b..6dd189d 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query9.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query9.java
@@ -18,9 +18,7 @@
 package org.apache.beam.integration.nexmark.queries;
 
 import org.apache.beam.integration.nexmark.NexmarkConfiguration;
-import org.apache.beam.integration.nexmark.NexmarkQuery;
 import org.apache.beam.integration.nexmark.NexmarkUtils;
-import org.apache.beam.integration.nexmark.WinningBids;
 import org.apache.beam.integration.nexmark.model.AuctionBid;
 import org.apache.beam.integration.nexmark.model.Event;
 import org.apache.beam.integration.nexmark.model.KnownSize;

http://git-wip-us.apache.org/repos/asf/beam/blob/a39cb800/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query9Model.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query9Model.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query9Model.java
index b88d60a..d117e2d 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query9Model.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query9Model.java
@@ -21,10 +21,7 @@ import java.io.Serializable;
 import java.util.Collection;
 import java.util.Iterator;
 
-import org.apache.beam.integration.nexmark.AbstractSimulator;
 import org.apache.beam.integration.nexmark.NexmarkConfiguration;
-import org.apache.beam.integration.nexmark.NexmarkQueryModel;
-import org.apache.beam.integration.nexmark.WinningBidsSimulator;
 import org.apache.beam.sdk.values.TimestampedValue;
 
 /**

http://git-wip-us.apache.org/repos/asf/beam/blob/a39cb800/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/WinningBids.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/WinningBids.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/WinningBids.java
new file mode 100644
index 0000000..11a4d38
--- /dev/null
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/WinningBids.java
@@ -0,0 +1,379 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.integration.nexmark.queries;
+
+import static com.google.common.base.Preconditions.checkState;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import org.apache.beam.integration.nexmark.NexmarkConfiguration;
+import org.apache.beam.integration.nexmark.NexmarkUtils;
+import org.apache.beam.integration.nexmark.model.Auction;
+import org.apache.beam.integration.nexmark.model.AuctionBid;
+import org.apache.beam.integration.nexmark.model.Bid;
+import org.apache.beam.integration.nexmark.model.Event;
+import org.apache.beam.integration.nexmark.sources.GeneratorConfig;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.CustomCoder;
+import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.coders.VarLongCoder;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Metrics;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.join.CoGbkResult;
+import org.apache.beam.sdk.transforms.join.CoGroupByKey;
+import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple;
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.transforms.windowing.WindowMappingFn;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.joda.time.Instant;
+
+/**
+ * A transform to find the winning bid for each closed auction. In pseudo CQL syntax:
+ *
+ * <pre>{@code
+ * SELECT Rstream(A.*, B.auction, B.bidder, MAX(B.price), B.dateTime)
+ * FROM Auction A [ROWS UNBOUNDED], Bid B [ROWS UNBOUNDED]
+ * WHERE A.id = B.auction AND B.datetime < A.expires AND A.expires < CURRENT_TIME
+ * GROUP BY A.id
+ * }</pre>
+ *
+ * <p>We will also check that the winning bid is above the auction reserve. Note that
+ * we ignore the auction opening bid value since it has no impact on which bid eventually wins,
+ * if any.
+ *
+ * <p>Our implementation will use a custom windowing function in order to bring bids and
+ * auctions together without requiring global state.
+ */
+public class WinningBids extends PTransform<PCollection<Event>, PCollection<AuctionBid>> {
+  /** Windows for open auctions and bids. */
+  private static class AuctionOrBidWindow extends IntervalWindow implements Serializable {
+    /** Id of auction this window is for. */
+    public final long auction;
+
+    /**
+     * True if this window represents an actual auction, and thus has a start/end
+     * time matching that of the auction. False if this window represents a bid, and
+     * thus has an unbounded start/end time.
+     */
+    public final boolean isAuctionWindow;
+
+    /** For avro only. */
+    private AuctionOrBidWindow() {
+      super(TIMESTAMP_MIN_VALUE, TIMESTAMP_MAX_VALUE);
+      auction = 0;
+      isAuctionWindow = false;
+    }
+
+    private AuctionOrBidWindow(
+        Instant start, Instant end, long auctionId, boolean isAuctionWindow) {
+      super(start, end);
+      this.auction = auctionId;
+      this.isAuctionWindow = isAuctionWindow;
+    }
+
+    /** Return an auction window for {@code auction}. */
+    public static AuctionOrBidWindow forAuction(Instant timestamp, Auction auction) {
+      AuctionOrBidWindow result =
+          new AuctionOrBidWindow(timestamp, new Instant(auction.expires), auction.id, true);
+      return result;
+    }
+
+    /**
+     * Return a bid window for {@code bid}. It should later be merged into
+     * the corresponding auction window. However, it is possible this bid is for an already
+     * expired auction, or for an auction which the system has not yet seen. So we
+     * give the bid a bit of wiggle room in its interval.
+     */
+    public static AuctionOrBidWindow forBid(
+        long expectedAuctionDurationMs, Instant timestamp, Bid bid) {
+      // At this point we don't know which auctions are still valid, and the bid may
+      // be for an auction which won't start until some unknown time in the future
+      // (due to Generator.AUCTION_ID_LEAD in Generator.nextBid).
+      // A real system would atomically reconcile bids and auctions by a separate mechanism.
+      // If we give bids an unbounded window it is possible a bid for an auction which
+      // has already expired would cause the system watermark to stall, since that window
+      // would never be retired.
+      // Instead, we will just give the bid a finite window which expires at
+      // the upper bound of auctions assuming the auction starts at the same time as the bid,
+      // and assuming the system is running at its lowest event rate (as per interEventDelayUs).
+      AuctionOrBidWindow result = new AuctionOrBidWindow(
+          timestamp, timestamp.plus(expectedAuctionDurationMs * 2), bid.auction, false);
+      return result;
+    }
+
+    /** Is this an auction window? */
+    public boolean isAuctionWindow() {
+      return isAuctionWindow;
+    }
+
+    @Override
+    public String toString() {
+      return String.format("AuctionOrBidWindow{start:%s; end:%s; auction:%d; isAuctionWindow:%s}",
+          start(), end(), auction, isAuctionWindow);
+    }
+  }
+
+  /**
+   * Encodes an {@link AuctionOrBidWindow} as an {@link IntervalWindow} and an auction id long.
+   */
+  private static class AuctionOrBidWindowCoder extends CustomCoder<AuctionOrBidWindow> {
+    private static final AuctionOrBidWindowCoder INSTANCE = new AuctionOrBidWindowCoder();
+    private static final Coder<IntervalWindow> SUPER_CODER = IntervalWindow.getCoder();
+    private static final Coder<Long> ID_CODER = VarLongCoder.of();
+    private static final Coder<Integer> INT_CODER = VarIntCoder.of();
+
+    @JsonCreator
+    public static AuctionOrBidWindowCoder of() {
+      return INSTANCE;
+    }
+
+    @Override
+    public void encode(AuctionOrBidWindow window, OutputStream outStream, Coder.Context context)
+        throws IOException, CoderException {
+      SUPER_CODER.encode(window, outStream, Coder.Context.NESTED);
+      ID_CODER.encode(window.auction, outStream, Coder.Context.NESTED);
+      INT_CODER.encode(window.isAuctionWindow ? 1 : 0, outStream, Coder.Context.NESTED);
+    }
+
+    @Override
+    public AuctionOrBidWindow decode(InputStream inStream, Coder.Context context)
+        throws IOException, CoderException {
+      IntervalWindow superWindow = SUPER_CODER.decode(inStream, Coder.Context.NESTED);
+      long auction = ID_CODER.decode(inStream, Coder.Context.NESTED);
+      boolean isAuctionWindow =
+          INT_CODER.decode(inStream, Coder.Context.NESTED) == 0 ? false : true;
+      return new AuctionOrBidWindow(
+          superWindow.start(), superWindow.end(), auction, isAuctionWindow);
+    }
+
+    @Override public void verifyDeterministic() throws NonDeterministicException {}
+  }
+
+  /** Assign events to auction windows and merges them intelligently. */
+  private static class AuctionOrBidWindowFn extends WindowFn<Event, AuctionOrBidWindow> {
+    /** Expected duration of auctions in ms. */
+    private final long expectedAuctionDurationMs;
+
+    public AuctionOrBidWindowFn(long expectedAuctionDurationMs) {
+      this.expectedAuctionDurationMs = expectedAuctionDurationMs;
+    }
+
+    @Override
+    public Collection<AuctionOrBidWindow> assignWindows(AssignContext c) {
+      Event event = c.element();
+      if (event.newAuction != null) {
+        // Assign auctions to an auction window which expires at the auction's close.
+        return Arrays.asList(AuctionOrBidWindow.forAuction(c.timestamp(), event.newAuction));
+      } else if (event.bid != null) {
+        // Assign bids to a temporary bid window which will later be merged into the appropriate
+        // auction window.
+        return Arrays.asList(
+            AuctionOrBidWindow.forBid(expectedAuctionDurationMs, c.timestamp(), event.bid));
+      } else {
+        // Don't assign people to any window. They will thus be dropped.
+        return Arrays.asList();
+      }
+    }
+
+    @Override
+    public void mergeWindows(MergeContext c) throws Exception {
+      // Split and index the auction and bid windows by auction id.
+      Map<Long, AuctionOrBidWindow> idToTrueAuctionWindow = new TreeMap<>();
+      Map<Long, List<AuctionOrBidWindow>> idToBidAuctionWindows = new TreeMap<>();
+      for (AuctionOrBidWindow window : c.windows()) {
+        if (window.isAuctionWindow()) {
+          idToTrueAuctionWindow.put(window.auction, window);
+        } else {
+          List<AuctionOrBidWindow> bidWindows = idToBidAuctionWindows.get(window.auction);
+          if (bidWindows == null) {
+            bidWindows = new ArrayList<>();
+            idToBidAuctionWindows.put(window.auction, bidWindows);
+          }
+          bidWindows.add(window);
+        }
+      }
+
+      // Merge all 'bid' windows into their corresponding 'auction' window, provided the
+      // auction has not expired.
+      for (long auction : idToTrueAuctionWindow.keySet()) {
+        AuctionOrBidWindow auctionWindow = idToTrueAuctionWindow.get(auction);
+        List<AuctionOrBidWindow> bidWindows = idToBidAuctionWindows.get(auction);
+        if (bidWindows != null) {
+          List<AuctionOrBidWindow> toBeMerged = new ArrayList<>();
+          for (AuctionOrBidWindow bidWindow : bidWindows) {
+            if (bidWindow.start().isBefore(auctionWindow.end())) {
+              toBeMerged.add(bidWindow);
+            }
+            // else: This bid window will remain until its expire time, at which point it
+            // will expire without ever contributing to an output.
+          }
+          if (!toBeMerged.isEmpty()) {
+            toBeMerged.add(auctionWindow);
+            c.merge(toBeMerged, auctionWindow);
+          }
+        }
+      }
+    }
+
+    @Override
+    public boolean isCompatible(WindowFn<?, ?> other) {
+      return other instanceof AuctionOrBidWindowFn;
+    }
+
+    @Override
+    public Coder<AuctionOrBidWindow> windowCoder() {
+      return AuctionOrBidWindowCoder.of();
+    }
+
+    @Override
+    public WindowMappingFn<AuctionOrBidWindow> getDefaultWindowMappingFn() {
+      throw new UnsupportedOperationException("AuctionWindowFn not supported for side inputs");
+    }
+
+    /**
+     * Below we will GBK auctions and bids on their auction ids. Then we will reduce those
+     * per id to emit {@code (auction, winning bid)} pairs for auctions which have expired with at
+     * least one valid bid. We would like those output pairs to have a timestamp of the auction's
+     * expiry (since that's the earliest we know for sure we have the correct winner). We would
+     * also like to make that winning results are available to following stages at the auction's
+     * expiry.
+     *
+     * <p>Each result of the GBK will have a timestamp of the min of the result of this object's
+     * assignOutputTime over all records which end up in one of its iterables. Thus we get the
+     * desired behavior if we ignore each record's timestamp and always return the auction window's
+     * 'maxTimestamp', which will correspond to the auction's expiry.
+     *
+     * <p>In contrast, if this object's assignOutputTime were to return 'inputTimestamp'
+     * (the usual implementation), then each GBK record will take as its timestamp the minimum of
+     * the timestamps of all bids and auctions within it, which will always be the auction's
+     * timestamp. An auction which expires well into the future would thus hold up the watermark
+     * of the GBK results until that auction expired. That in turn would hold up all winning pairs.
+     */
+    @Override
+    public Instant getOutputTime(
+        Instant inputTimestamp, AuctionOrBidWindow window) {
+      return window.maxTimestamp();
+    }
+  }
+
+  private final AuctionOrBidWindowFn auctionOrBidWindowFn;
+
+  public WinningBids(String name, NexmarkConfiguration configuration) {
+    super(name);
+    // What's the expected auction time (when the system is running at the lowest event rate).
+    long[] interEventDelayUs = configuration.rateShape.interEventDelayUs(
+        configuration.firstEventRate, configuration.nextEventRate,
+        configuration.rateUnit, configuration.numEventGenerators);
+    long longestDelayUs = 0;
+    for (int i = 0; i < interEventDelayUs.length; i++) {
+      longestDelayUs = Math.max(longestDelayUs, interEventDelayUs[i]);
+    }
+    // Adjust for proportion of auction events amongst all events.
+    longestDelayUs =
+        (longestDelayUs * GeneratorConfig.PROPORTION_DENOMINATOR)
+        / GeneratorConfig.AUCTION_PROPORTION;
+    // Adjust for number of in-flight auctions.
+    longestDelayUs = longestDelayUs * configuration.numInFlightAuctions;
+    long expectedAuctionDurationMs = (longestDelayUs + 999) / 1000;
+    NexmarkUtils.console("Expected auction duration is %d ms", expectedAuctionDurationMs);
+    auctionOrBidWindowFn = new AuctionOrBidWindowFn(expectedAuctionDurationMs);
+  }
+
+  @Override
+  public PCollection<AuctionBid> expand(PCollection<Event> events) {
+    // Window auctions and bids into custom auction windows. New people events will be discarded.
+    // This will allow us to bring bids and auctions together irrespective of how long
+    // each auction is open for.
+    events = events.apply("Window", Window.into(auctionOrBidWindowFn));
+
+    // Key auctions by their id.
+    PCollection<KV<Long, Auction>> auctionsById =
+        events.apply(NexmarkQuery.JUST_NEW_AUCTIONS)
+              .apply("AuctionById:", NexmarkQuery.AUCTION_BY_ID);
+
+    // Key bids by their auction id.
+    PCollection<KV<Long, Bid>> bidsByAuctionId =
+        events.apply(NexmarkQuery.JUST_BIDS).apply("BidByAuction", NexmarkQuery.BID_BY_AUCTION);
+
+    // Find the highest price valid bid for each closed auction.
+    return
+      // Join auctions and bids.
+      KeyedPCollectionTuple.of(NexmarkQuery.AUCTION_TAG, auctionsById)
+        .and(NexmarkQuery.BID_TAG, bidsByAuctionId)
+        .apply(CoGroupByKey.<Long>create())
+        // Filter and select.
+        .apply(name + ".Join",
+          ParDo.of(new DoFn<KV<Long, CoGbkResult>, AuctionBid>() {
+            private final Counter noAuctionCounter = Metrics.counter(name, "noAuction");
+            private final Counter underReserveCounter = Metrics.counter(name, "underReserve");
+            private final Counter noValidBidsCounter = Metrics.counter(name, "noValidBids");
+
+            @ProcessElement
+            public void processElement(ProcessContext c) {
+              Auction auction =
+                  c.element().getValue().getOnly(NexmarkQuery.AUCTION_TAG, null);
+              if (auction == null) {
+                // We have bids without a matching auction. Give up.
+                noAuctionCounter.inc();
+                return;
+              }
+              // Find the current winning bid for auction.
+              // The earliest bid with the maximum price above the reserve wins.
+              Bid bestBid = null;
+              for (Bid bid : c.element().getValue().getAll(NexmarkQuery.BID_TAG)) {
+                // Bids too late for their auction will have been
+                // filtered out by the window merge function.
+                checkState(bid.dateTime < auction.expires);
+                if (bid.price < auction.reserve) {
+                  // Bid price is below auction reserve.
+                  underReserveCounter.inc();
+                  continue;
+                }
+
+                if (bestBid == null
+                    || Bid.PRICE_THEN_DESCENDING_TIME.compare(bid, bestBid) > 0) {
+                  bestBid = bid;
+                }
+              }
+              if (bestBid == null) {
+                // We don't have any valid bids for auction.
+                noValidBidsCounter.inc();
+                return;
+              }
+              c.output(new AuctionBid(auction, bestBid));
+            }
+          }
+        ));
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/a39cb800/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/WinningBidsSimulator.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/WinningBidsSimulator.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/WinningBidsSimulator.java
new file mode 100644
index 0000000..7d74f8f
--- /dev/null
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/WinningBidsSimulator.java
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.integration.nexmark.queries;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import javax.annotation.Nullable;
+
+import org.apache.beam.integration.nexmark.NexmarkConfiguration;
+import org.apache.beam.integration.nexmark.NexmarkUtils;
+import org.apache.beam.integration.nexmark.model.Auction;
+import org.apache.beam.integration.nexmark.model.AuctionBid;
+import org.apache.beam.integration.nexmark.model.Bid;
+import org.apache.beam.integration.nexmark.model.Event;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.joda.time.Instant;
+
+/**
+ * A simulator of the {@code WinningBids} query.
+ */
+public class WinningBidsSimulator extends AbstractSimulator<Event, AuctionBid> {
+  /** Auctions currently still open, indexed by auction id. */
+  private final Map<Long, Auction> openAuctions;
+
+  /** The ids of auctions known to be closed. */
+  private final Set<Long> closedAuctions;
+
+  /** Current best valid bids for open auctions, indexed by auction id. */
+  private final Map<Long, Bid> bestBids;
+
+  /** Bids for auctions we havn't seen yet. */
+  private final List<Bid> bidsWithoutAuctions;
+
+  /**
+   * Timestamp of last new auction or bid event (ms since epoch).
+   */
+  private long lastTimestamp;
+
+  public WinningBidsSimulator(NexmarkConfiguration configuration) {
+    super(NexmarkUtils.standardEventIterator(configuration));
+    openAuctions = new TreeMap<>();
+    closedAuctions = new TreeSet<>();
+    bestBids = new TreeMap<>();
+    bidsWithoutAuctions = new ArrayList<>();
+    lastTimestamp = BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis();
+  }
+
+  /**
+   * Try to account for {@code bid} in state. Return true if bid has now been
+   * accounted for by {@code bestBids}.
+   */
+  private boolean captureBestBid(Bid bid, boolean shouldLog) {
+    if (closedAuctions.contains(bid.auction)) {
+      // Ignore bids for known, closed auctions.
+      if (shouldLog) {
+        NexmarkUtils.info("closed auction: %s", bid);
+      }
+      return true;
+    }
+    Auction auction = openAuctions.get(bid.auction);
+    if (auction == null) {
+      // We don't have an auction for this bid yet, so can't determine if it is
+      // winning or not.
+      if (shouldLog) {
+        NexmarkUtils.info("pending auction: %s", bid);
+      }
+      return false;
+    }
+    if (bid.price < auction.reserve) {
+      // Bid price is too low.
+      if (shouldLog) {
+        NexmarkUtils.info("below reserve: %s", bid);
+      }
+      return true;
+    }
+    Bid existingBid = bestBids.get(bid.auction);
+    if (existingBid == null || Bid.PRICE_THEN_DESCENDING_TIME.compare(existingBid, bid) < 0) {
+      // We've found a (new) best bid for a known auction.
+      bestBids.put(bid.auction, bid);
+      if (shouldLog) {
+        NexmarkUtils.info("new winning bid: %s", bid);
+      }
+    } else {
+      if (shouldLog) {
+        NexmarkUtils.info("ignoring low bid: %s", bid);
+      }
+    }
+    return true;
+  }
+
+  /**
+   * Try to match bids without auctions to auctions.
+   */
+  private void flushBidsWithoutAuctions() {
+    Iterator<Bid> itr = bidsWithoutAuctions.iterator();
+    while (itr.hasNext()) {
+      Bid bid = itr.next();
+      if (captureBestBid(bid, false)) {
+        NexmarkUtils.info("bid now accounted for: %s", bid);
+        itr.remove();
+      }
+    }
+  }
+
+  /**
+   * Return the next winning bid for an expired auction relative to {@code timestamp}.
+   * Return null if no more winning bids, in which case all expired auctions will
+   * have been removed from our state. Retire auctions in order of expire time.
+   */
+  @Nullable
+  private TimestampedValue<AuctionBid> nextWinningBid(long timestamp) {
+    Map<Long, List<Long>> toBeRetired = new TreeMap<>();
+    for (Map.Entry<Long, Auction> entry : openAuctions.entrySet()) {
+      if (entry.getValue().expires <= timestamp) {
+        List<Long> idsAtTime = toBeRetired.get(entry.getValue().expires);
+        if (idsAtTime == null) {
+          idsAtTime = new ArrayList<>();
+          toBeRetired.put(entry.getValue().expires, idsAtTime);
+        }
+        idsAtTime.add(entry.getKey());
+      }
+    }
+    for (Map.Entry<Long, List<Long>> entry : toBeRetired.entrySet()) {
+      for (long id : entry.getValue()) {
+        Auction auction = openAuctions.get(id);
+        NexmarkUtils.info("retiring auction: %s", auction);
+        openAuctions.remove(id);
+        Bid bestBid = bestBids.get(id);
+        if (bestBid != null) {
+          TimestampedValue<AuctionBid> result =
+              TimestampedValue.of(new AuctionBid(auction, bestBid), new Instant(auction.expires));
+          NexmarkUtils.info("winning: %s", result);
+          return result;
+        }
+      }
+    }
+    return null;
+  }
+
+  @Override
+  protected void run() {
+    if (lastTimestamp > BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis()) {
+      // We may have finally seen the auction a bid was intended for.
+      flushBidsWithoutAuctions();
+      TimestampedValue<AuctionBid> result = nextWinningBid(lastTimestamp);
+      if (result != null) {
+        addResult(result);
+        return;
+      }
+    }
+
+    TimestampedValue<Event> timestampedEvent = nextInput();
+    if (timestampedEvent == null) {
+      // No more events. Flush any still open auctions.
+      TimestampedValue<AuctionBid> result =
+          nextWinningBid(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis());
+      if (result == null) {
+        // We are done.
+        allDone();
+        return;
+      }
+      addResult(result);
+      //TODO test fails because offset of some hundreds of ms beween expect and actual
+      return;
+    }
+
+    Event event = timestampedEvent.getValue();
+    if (event.newPerson != null) {
+      // Ignore new person events.
+      return;
+    }
+
+    lastTimestamp = timestampedEvent.getTimestamp().getMillis();
+    if (event.newAuction != null) {
+      // Add this new open auction to our state.
+      openAuctions.put(event.newAuction.id, event.newAuction);
+    } else {
+      if (!captureBestBid(event.bid, true)) {
+        // We don't know what to do with this bid yet.
+        NexmarkUtils.info("bid not yet accounted for: %s", event.bid);
+        bidsWithoutAuctions.add(event.bid);
+      }
+    }
+    // Keep looking for winning bids.
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/a39cb800/integration/java/nexmark/src/test/java/org/apache/beam/integration/nexmark/queries/QueryTest.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/test/java/org/apache/beam/integration/nexmark/queries/QueryTest.java b/integration/java/nexmark/src/test/java/org/apache/beam/integration/nexmark/queries/QueryTest.java
index 284aa7e..b005d65 100644
--- a/integration/java/nexmark/src/test/java/org/apache/beam/integration/nexmark/queries/QueryTest.java
+++ b/integration/java/nexmark/src/test/java/org/apache/beam/integration/nexmark/queries/QueryTest.java
@@ -18,8 +18,6 @@
 package org.apache.beam.integration.nexmark.queries;
 
 import org.apache.beam.integration.nexmark.NexmarkConfiguration;
-import org.apache.beam.integration.nexmark.NexmarkQuery;
-import org.apache.beam.integration.nexmark.NexmarkQueryModel;
 import org.apache.beam.integration.nexmark.NexmarkUtils;
 import org.apache.beam.integration.nexmark.model.KnownSize;
 import org.apache.beam.sdk.PipelineResult;