You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ie...@apache.org on 2017/08/23 17:09:50 UTC
[42/55] [abbrv] beam git commit: Move module
beam-integration-java-nexmark to beam-sdks-java-nexmark
http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query4.java
----------------------------------------------------------------------
diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query4.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query4.java
new file mode 100644
index 0000000..3c1cf3b
--- /dev/null
+++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query4.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.nexmark.queries;
+
+import org.apache.beam.sdk.nexmark.Monitor;
+import org.apache.beam.sdk.nexmark.NexmarkConfiguration;
+import org.apache.beam.sdk.nexmark.NexmarkUtils;
+import org.apache.beam.sdk.nexmark.model.Auction;
+import org.apache.beam.sdk.nexmark.model.AuctionBid;
+import org.apache.beam.sdk.nexmark.model.Bid;
+import org.apache.beam.sdk.nexmark.model.CategoryPrice;
+import org.apache.beam.sdk.nexmark.model.Event;
+import org.apache.beam.sdk.nexmark.model.KnownSize;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.Mean;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.joda.time.Duration;
+
+/**
+ * Query 4, 'Average Price for a Category'. Select the average of the wining bid prices for all
+ * closed auctions in each category. In CQL syntax:
+ *
+ * <pre>{@code
+ * SELECT Istream(AVG(Q.final))
+ * FROM Category C, (SELECT Rstream(MAX(B.price) AS final, A.category)
+ * FROM Auction A [ROWS UNBOUNDED], Bid B [ROWS UNBOUNDED]
+ * WHERE A.id=B.auction AND B.datetime < A.expires AND A.expires < CURRENT_TIME
+ * GROUP BY A.id, A.category) Q
+ * WHERE Q.category = C.id
+ * GROUP BY C.id;
+ * }</pre>
+ *
+ * <p>For extra spiciness our implementation differs slightly from the above:
+ * <ul>
+ * <li>We select both the average winning price and the category.
+ * <li>We don't bother joining with a static category table, since it's contents are never used.
+ * <li>We only consider bids which are above the auction's reserve price.
+ * <li>We accept the highest-price, earliest valid bid as the winner.
+ * <li>We calculate the averages oven a sliding window of size {@code windowSizeSec} and
+ * period {@code windowPeriodSec}.
+ * </ul>
+ */
+public class Query4 extends NexmarkQuery {
+ private final Monitor<AuctionBid> winningBidsMonitor;
+
+ public Query4(NexmarkConfiguration configuration) {
+ super(configuration, "Query4");
+ winningBidsMonitor = new Monitor<>(name + ".WinningBids", "winning");
+ }
+
+ private PCollection<CategoryPrice> applyTyped(PCollection<Event> events) {
+ PCollection<AuctionBid> winningBids =
+ events
+ // Find the winning bid for each closed auction.
+ .apply(new WinningBids(name + ".WinningBids", configuration));
+
+ // Monitor winning bids
+ winningBids = winningBids.apply(name + ".WinningBidsMonitor",
+ winningBidsMonitor.getTransform());
+
+ return winningBids
+ // Key the winning bid price by the auction category.
+ .apply(name + ".Rekey",
+ ParDo.of(new DoFn<AuctionBid, KV<Long, Long>>() {
+ @ProcessElement
+ public void processElement(ProcessContext c) {
+ Auction auction = c.element().auction;
+ Bid bid = c.element().bid;
+ c.output(KV.of(auction.category, bid.price));
+ }
+ }))
+
+ // Re-window so we can calculate a sliding average
+ .apply(Window.<KV<Long, Long>>into(
+ SlidingWindows.of(Duration.standardSeconds(configuration.windowSizeSec))
+ .every(Duration.standardSeconds(configuration.windowPeriodSec))))
+
+ // Find the average of the winning bids for each category.
+ // Make sure we share the work for each category between workers.
+ .apply(Mean.<Long, Long>perKey().withHotKeyFanout(configuration.fanout))
+
+ // For testing against Query4Model, capture which results are 'final'.
+ .apply(name + ".Project",
+ ParDo.of(new DoFn<KV<Long, Double>, CategoryPrice>() {
+ @ProcessElement
+ public void processElement(ProcessContext c) {
+ c.output(new CategoryPrice(c.element().getKey(),
+ Math.round(c.element().getValue()), c.pane().isLast()));
+ }
+ }));
+ }
+
+ @Override
+ protected PCollection<KnownSize> applyPrim(PCollection<Event> events) {
+ return NexmarkUtils.castToKnownSize(name, applyTyped(events));
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query4Model.java
----------------------------------------------------------------------
diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query4Model.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query4Model.java
new file mode 100644
index 0000000..84274a8
--- /dev/null
+++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query4Model.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.nexmark.queries;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import org.apache.beam.sdk.nexmark.NexmarkConfiguration;
+import org.apache.beam.sdk.nexmark.NexmarkUtils;
+import org.apache.beam.sdk.nexmark.model.Auction;
+import org.apache.beam.sdk.nexmark.model.AuctionBid;
+import org.apache.beam.sdk.nexmark.model.Bid;
+import org.apache.beam.sdk.nexmark.model.CategoryPrice;
+import org.apache.beam.sdk.nexmark.model.KnownSize;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Assert;
+
+/**
+ * A direct implementation of {@link Query4}.
+ */
+public class Query4Model extends NexmarkQueryModel implements Serializable {
+ /**
+ * Simulator for query 4.
+ */
+ private class Simulator extends AbstractSimulator<AuctionBid, CategoryPrice> {
+ /** The prices and categories for all winning bids in the last window size. */
+ private final List<TimestampedValue<CategoryPrice>> winningPricesByCategory;
+
+ /** Timestamp of last result (ms since epoch). */
+ private Instant lastTimestamp;
+
+ /** When oldest active window starts. */
+ private Instant windowStart;
+
+ /** The last seen result for each category. */
+ private final Map<Long, TimestampedValue<CategoryPrice>> lastSeenResults;
+
+ public Simulator(NexmarkConfiguration configuration) {
+ super(new WinningBidsSimulator(configuration).results());
+ winningPricesByCategory = new ArrayList<>();
+ lastTimestamp = BoundedWindow.TIMESTAMP_MIN_VALUE;
+ windowStart = NexmarkUtils.BEGINNING_OF_TIME;
+ lastSeenResults = new TreeMap<>();
+ }
+
+ /**
+ * Calculate the average bid price for each category for all winning bids
+ * which are strictly before {@code end}.
+ */
+ private void averages(Instant end) {
+ Map<Long, Long> counts = new TreeMap<>();
+ Map<Long, Long> totals = new TreeMap<>();
+ for (TimestampedValue<CategoryPrice> value : winningPricesByCategory) {
+ if (!value.getTimestamp().isBefore(end)) {
+ continue;
+ }
+ long category = value.getValue().category;
+ long price = value.getValue().price;
+ Long count = counts.get(category);
+ if (count == null) {
+ count = 1L;
+ } else {
+ count += 1;
+ }
+ counts.put(category, count);
+ Long total = totals.get(category);
+ if (total == null) {
+ total = price;
+ } else {
+ total += price;
+ }
+ totals.put(category, total);
+ }
+ for (Map.Entry<Long, Long> entry : counts.entrySet()) {
+ long category = entry.getKey();
+ long count = entry.getValue();
+ long total = totals.get(category);
+ TimestampedValue<CategoryPrice> result = TimestampedValue.of(
+ new CategoryPrice(category, Math.round((double) total / count), true), lastTimestamp);
+ addIntermediateResult(result);
+ lastSeenResults.put(category, result);
+ }
+ }
+
+ /**
+ * Calculate averages for any windows which can now be retired. Also prune entries
+ * which can no longer contribute to any future window.
+ */
+ private void prune(Instant newWindowStart) {
+ while (!newWindowStart.equals(windowStart)) {
+ averages(windowStart.plus(Duration.standardSeconds(configuration.windowSizeSec)));
+ windowStart = windowStart.plus(Duration.standardSeconds(configuration.windowPeriodSec));
+ Iterator<TimestampedValue<CategoryPrice>> itr = winningPricesByCategory.iterator();
+ while (itr.hasNext()) {
+ if (itr.next().getTimestamp().isBefore(windowStart)) {
+ itr.remove();
+ }
+ }
+ if (winningPricesByCategory.isEmpty()) {
+ windowStart = newWindowStart;
+ }
+ }
+ }
+
+ /**
+ * Capture the winning bid.
+ */
+ private void captureWinningBid(Auction auction, Bid bid, Instant timestamp) {
+ winningPricesByCategory.add(
+ TimestampedValue.of(new CategoryPrice(auction.category, bid.price, false), timestamp));
+ }
+
+ @Override
+ protected void run() {
+ TimestampedValue<AuctionBid> timestampedWinningBid = nextInput();
+ if (timestampedWinningBid == null) {
+ prune(NexmarkUtils.END_OF_TIME);
+ for (TimestampedValue<CategoryPrice> result : lastSeenResults.values()) {
+ addResult(result);
+ }
+ allDone();
+ return;
+ }
+ lastTimestamp = timestampedWinningBid.getTimestamp();
+ Instant newWindowStart = windowStart(Duration.standardSeconds(configuration.windowSizeSec),
+ Duration.standardSeconds(configuration.windowPeriodSec), lastTimestamp);
+ prune(newWindowStart);
+ captureWinningBid(timestampedWinningBid.getValue().auction,
+ timestampedWinningBid.getValue().bid, lastTimestamp);
+ }
+ }
+
+ public Query4Model(NexmarkConfiguration configuration) {
+ super(configuration);
+ }
+
+ @Override
+ public AbstractSimulator<?, ?> simulator() {
+ return new Simulator(configuration);
+ }
+
+ @Override
+ protected Iterable<TimestampedValue<KnownSize>> relevantResults(
+ Iterable<TimestampedValue<KnownSize>> results) {
+ // Find the last (in processing time) reported average price for each category.
+ Map<Long, TimestampedValue<KnownSize>> finalAverages = new TreeMap<>();
+ for (TimestampedValue<KnownSize> obj : results) {
+ Assert.assertTrue("have CategoryPrice", obj.getValue() instanceof CategoryPrice);
+ CategoryPrice categoryPrice = (CategoryPrice) obj.getValue();
+ if (categoryPrice.isLast) {
+ finalAverages.put(
+ categoryPrice.category,
+ TimestampedValue.of((KnownSize) categoryPrice, obj.getTimestamp()));
+ }
+ }
+
+ return finalAverages.values();
+ }
+
+ @Override
+ protected <T> Collection<String> toCollection(Iterator<TimestampedValue<T>> itr) {
+ return toValue(itr);
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query5.java
----------------------------------------------------------------------
diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query5.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query5.java
new file mode 100644
index 0000000..d027cb3
--- /dev/null
+++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query5.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.nexmark.queries;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import org.apache.beam.sdk.nexmark.NexmarkConfiguration;
+import org.apache.beam.sdk.nexmark.NexmarkUtils;
+import org.apache.beam.sdk.nexmark.model.AuctionCount;
+import org.apache.beam.sdk.nexmark.model.Bid;
+import org.apache.beam.sdk.nexmark.model.Event;
+import org.apache.beam.sdk.nexmark.model.KnownSize;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.Count;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.joda.time.Duration;
+
+/**
+ * Query 5, 'Hot Items'. Which auctions have seen the most bids in the last hour (updated every
+ * minute). In CQL syntax:
+ *
+ * <pre>{@code
+ * SELECT Rstream(auction)
+ * FROM (SELECT B1.auction, count(*) AS num
+ * FROM Bid [RANGE 60 MINUTE SLIDE 1 MINUTE] B1
+ * GROUP BY B1.auction)
+ * WHERE num >= ALL (SELECT count(*)
+ * FROM Bid [RANGE 60 MINUTE SLIDE 1 MINUTE] B2
+ * GROUP BY B2.auction);
+ * }</pre>
+ *
+ * <p>To make things a bit more dynamic and easier to test we use much shorter windows, and
+ * we'll also preserve the bid counts.
+ */
+public class Query5 extends NexmarkQuery {
+ public Query5(NexmarkConfiguration configuration) {
+ super(configuration, "Query5");
+ }
+
+ private PCollection<AuctionCount> applyTyped(PCollection<Event> events) {
+ return events
+ // Only want the bid events.
+ .apply(JUST_BIDS)
+ // Window the bids into sliding windows.
+ .apply(
+ Window.<Bid>into(
+ SlidingWindows.of(Duration.standardSeconds(configuration.windowSizeSec))
+ .every(Duration.standardSeconds(configuration.windowPeriodSec))))
+ // Project just the auction id.
+ .apply("BidToAuction", BID_TO_AUCTION)
+
+ // Count the number of bids per auction id.
+ .apply(Count.<Long>perElement())
+
+ // We'll want to keep all auctions with the maximal number of bids.
+ // Start by lifting each into a singleton list.
+ // need to do so because bellow combine returns a list of auctions in the key in case of
+ // equal number of bids. Combine needs to have same input type and return type.
+ .apply(
+ name + ".ToSingletons",
+ ParDo.of(
+ new DoFn<KV<Long, Long>, KV<List<Long>, Long>>() {
+ @ProcessElement
+ public void processElement(ProcessContext c) {
+ c.output(
+ KV.of(
+ Collections.singletonList(c.element().getKey()),
+ c.element().getValue()));
+ }
+ }))
+
+ // Keep only the auction ids with the most bids.
+ .apply(
+ Combine.globally(
+ new Combine.BinaryCombineFn<KV<List<Long>, Long>>() {
+ @Override
+ public KV<List<Long>, Long> apply(
+ KV<List<Long>, Long> left, KV<List<Long>, Long> right) {
+ List<Long> leftBestAuctions = left.getKey();
+ long leftCount = left.getValue();
+ List<Long> rightBestAuctions = right.getKey();
+ long rightCount = right.getValue();
+ if (leftCount > rightCount) {
+ return left;
+ } else if (leftCount < rightCount) {
+ return right;
+ } else {
+ List<Long> newBestAuctions = new ArrayList<>();
+ newBestAuctions.addAll(leftBestAuctions);
+ newBestAuctions.addAll(rightBestAuctions);
+ return KV.of(newBestAuctions, leftCount);
+ }
+ }
+ })
+ .withoutDefaults()
+ .withFanout(configuration.fanout))
+
+ // Project into result.
+ .apply(
+ name + ".Select",
+ ParDo.of(
+ new DoFn<KV<List<Long>, Long>, AuctionCount>() {
+ @ProcessElement
+ public void processElement(ProcessContext c) {
+ long count = c.element().getValue();
+ for (long auction : c.element().getKey()) {
+ c.output(new AuctionCount(auction, count));
+ }
+ }
+ }));
+ }
+
+ @Override
+ protected PCollection<KnownSize> applyPrim(PCollection<Event> events) {
+ return NexmarkUtils.castToKnownSize(name, applyTyped(events));
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query5Model.java
----------------------------------------------------------------------
diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query5Model.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query5Model.java
new file mode 100644
index 0000000..7ed0709
--- /dev/null
+++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query5Model.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.nexmark.queries;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import org.apache.beam.sdk.nexmark.NexmarkConfiguration;
+import org.apache.beam.sdk.nexmark.NexmarkUtils;
+import org.apache.beam.sdk.nexmark.model.AuctionCount;
+import org.apache.beam.sdk.nexmark.model.Bid;
+import org.apache.beam.sdk.nexmark.model.Event;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+/**
+ * A direct implementation of {@link Query5}.
+ */
+public class Query5Model extends NexmarkQueryModel implements Serializable {
+ /**
+ * Simulator for query 5.
+ */
+ private class Simulator extends AbstractSimulator<Event, AuctionCount> {
+ /** Time of bids still contributing to open windows, indexed by their auction id. */
+ private final Map<Long, List<Instant>> bids;
+
+ /** When oldest active window starts. */
+ private Instant windowStart;
+
+ public Simulator(NexmarkConfiguration configuration) {
+ super(NexmarkUtils.standardEventIterator(configuration));
+ bids = new TreeMap<>();
+ windowStart = NexmarkUtils.BEGINNING_OF_TIME;
+ }
+
+ /**
+ * Count bids per auction id for bids strictly before {@code end}. Add the auction ids with
+ * the maximum number of bids to results.
+ */
+ private void countBids(Instant end) {
+ Map<Long, Long> counts = new TreeMap<>();
+ long maxCount = 0L;
+ for (Map.Entry<Long, List<Instant>> entry : bids.entrySet()) {
+ long count = 0L;
+ long auction = entry.getKey();
+ for (Instant bid : entry.getValue()) {
+ if (bid.isBefore(end)) {
+ count++;
+ }
+ }
+ if (count > 0) {
+ counts.put(auction, count);
+ maxCount = Math.max(maxCount, count);
+ }
+ }
+ for (Map.Entry<Long, Long> entry : counts.entrySet()) {
+ long auction = entry.getKey();
+ long count = entry.getValue();
+ if (count == maxCount) {
+ AuctionCount result = new AuctionCount(auction, count);
+ addResult(TimestampedValue.of(result, end));
+ }
+ }
+ }
+
+ /**
+ * Retire bids which are strictly before {@code cutoff}. Return true if there are any bids
+ * remaining.
+ */
+ private boolean retireBids(Instant cutoff) {
+ boolean anyRemain = false;
+ for (Map.Entry<Long, List<Instant>> entry : bids.entrySet()) {
+ long auction = entry.getKey();
+ Iterator<Instant> itr = entry.getValue().iterator();
+ while (itr.hasNext()) {
+ Instant bid = itr.next();
+ if (bid.isBefore(cutoff)) {
+ NexmarkUtils.info("retire: %s for %s", bid, auction);
+ itr.remove();
+ } else {
+ anyRemain = true;
+ }
+ }
+ }
+ return anyRemain;
+ }
+
+ /**
+ * Retire active windows until we've reached {@code newWindowStart}.
+ */
+ private void retireWindows(Instant newWindowStart) {
+ while (!newWindowStart.equals(windowStart)) {
+ NexmarkUtils.info("retiring window %s, aiming for %s", windowStart, newWindowStart);
+ // Count bids in the window (windowStart, windowStart + size].
+ countBids(windowStart.plus(Duration.standardSeconds(configuration.windowSizeSec)));
+ // Advance the window.
+ windowStart = windowStart.plus(Duration.standardSeconds(configuration.windowPeriodSec));
+ // Retire bids which will never contribute to a future window.
+ if (!retireBids(windowStart)) {
+ // Can fast forward to latest window since no more outstanding bids.
+ windowStart = newWindowStart;
+ }
+ }
+ }
+
+ /**
+ * Add bid to state.
+ */
+ private void captureBid(Bid bid, Instant timestamp) {
+ List<Instant> existing = bids.get(bid.auction);
+ if (existing == null) {
+ existing = new ArrayList<>();
+ bids.put(bid.auction, existing);
+ }
+ existing.add(timestamp);
+ }
+
+ @Override
+ public void run() {
+ TimestampedValue<Event> timestampedEvent = nextInput();
+ if (timestampedEvent == null) {
+ // Drain the remaining windows.
+ retireWindows(NexmarkUtils.END_OF_TIME);
+ allDone();
+ return;
+ }
+
+ Event event = timestampedEvent.getValue();
+ if (event.bid == null) {
+ // Ignore non-bid events.
+ return;
+ }
+ Instant timestamp = timestampedEvent.getTimestamp();
+ Instant newWindowStart = windowStart(Duration.standardSeconds(configuration.windowSizeSec),
+ Duration.standardSeconds(configuration.windowPeriodSec), timestamp);
+ // Capture results from any windows we can now retire.
+ retireWindows(newWindowStart);
+ // Capture current bid.
+ captureBid(event.bid, timestamp);
+ }
+ }
+
+ public Query5Model(NexmarkConfiguration configuration) {
+ super(configuration);
+ }
+
+ @Override
+ public AbstractSimulator<?, ?> simulator() {
+ return new Simulator(configuration);
+ }
+
+ @Override
+ protected <T> Collection<String> toCollection(Iterator<TimestampedValue<T>> itr) {
+ return toValue(itr);
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query6.java
----------------------------------------------------------------------
diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query6.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query6.java
new file mode 100644
index 0000000..bc6b12c
--- /dev/null
+++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query6.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.nexmark.queries;
+
+import com.google.common.collect.Lists;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import org.apache.beam.sdk.nexmark.NexmarkConfiguration;
+import org.apache.beam.sdk.nexmark.NexmarkUtils;
+import org.apache.beam.sdk.nexmark.model.Auction;
+import org.apache.beam.sdk.nexmark.model.AuctionBid;
+import org.apache.beam.sdk.nexmark.model.Bid;
+import org.apache.beam.sdk.nexmark.model.Event;
+import org.apache.beam.sdk.nexmark.model.KnownSize;
+import org.apache.beam.sdk.nexmark.model.SellerPrice;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.windowing.AfterPane;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
+import org.apache.beam.sdk.transforms.windowing.Repeatedly;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.joda.time.Duration;
+
+/**
+ * Query 6, 'Average Selling Price by Seller'. Select the average selling price over the
+ * last 10 closed auctions by the same seller. In CQL syntax:
+ *
+ * <pre>{@code
+ * SELECT Istream(AVG(Q.final), Q.seller)
+ * FROM (SELECT Rstream(MAX(B.price) AS final, A.seller)
+ * FROM Auction A [ROWS UNBOUNDED], Bid B [ROWS UNBOUNDED]
+ * WHERE A.id=B.auction AND B.datetime < A.expires AND A.expires < CURRENT_TIME
+ * GROUP BY A.id, A.seller) [PARTITION BY A.seller ROWS 10] Q
+ * GROUP BY Q.seller;
+ * }</pre>
+ *
+ * <p>We are a little more exact with selecting winning bids: see {@link WinningBids}.
+ */
+public class Query6 extends NexmarkQuery {
+ /**
+ * Combiner to keep track of up to {@code maxNumBids} of the most recent wining bids and calculate
+ * their average selling price.
+ */
+ private static class MovingMeanSellingPrice extends Combine.CombineFn<Bid, List<Bid>, Long> {
+ private final int maxNumBids;
+
+ public MovingMeanSellingPrice(int maxNumBids) {
+ this.maxNumBids = maxNumBids;
+ }
+
+ @Override
+ public List<Bid> createAccumulator() {
+ return new ArrayList<>();
+ }
+
+ @Override
+ public List<Bid> addInput(List<Bid> accumulator, Bid input) {
+ accumulator.add(input);
+ Collections.sort(accumulator, Bid.ASCENDING_TIME_THEN_PRICE);
+ if (accumulator.size() > maxNumBids) {
+ accumulator.remove(0);
+ }
+ return accumulator;
+ }
+
+ @Override
+ public List<Bid> mergeAccumulators(Iterable<List<Bid>> accumulators) {
+ List<Bid> result = new ArrayList<>();
+ for (List<Bid> accumulator : accumulators) {
+ result.addAll(accumulator);
+ }
+ Collections.sort(result, Bid.ASCENDING_TIME_THEN_PRICE);
+ if (result.size() > maxNumBids) {
+ result = Lists.newArrayList(result.listIterator(result.size() - maxNumBids));
+ }
+ return result;
+ }
+
+ @Override
+ public Long extractOutput(List<Bid> accumulator) {
+ if (accumulator.isEmpty()) {
+ return 0L;
+ }
+ long sumOfPrice = 0;
+ for (Bid bid : accumulator) {
+ sumOfPrice += bid.price;
+ }
+ return Math.round((double) sumOfPrice / accumulator.size());
+ }
+ }
+
+ public Query6(NexmarkConfiguration configuration) {
+ super(configuration, "Query6");
+ }
+
+ private PCollection<SellerPrice> applyTyped(PCollection<Event> events) {
+ return events
+ // Find the winning bid for each closed auction.
+ .apply(new WinningBids(name + ".WinningBids", configuration))
+
+ // Key the winning bid by the seller id.
+ .apply(name + ".Rekey",
+ ParDo.of(new DoFn<AuctionBid, KV<Long, Bid>>() {
+ @ProcessElement
+ public void processElement(ProcessContext c) {
+ Auction auction = c.element().auction;
+ Bid bid = c.element().bid;
+ c.output(KV.of(auction.seller, bid));
+ }
+ }))
+
+ // Re-window to update on every wining bid.
+ .apply(
+ Window.<KV<Long, Bid>>into(new GlobalWindows())
+ .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))
+ .accumulatingFiredPanes()
+ .withAllowedLateness(Duration.ZERO))
+
+ // Find the average of last 10 winning bids for each seller.
+ .apply(Combine.<Long, Bid, Long>perKey(new MovingMeanSellingPrice(10)))
+
+ // Project into our datatype.
+ .apply(name + ".Select",
+ ParDo.of(new DoFn<KV<Long, Long>, SellerPrice>() {
+ @ProcessElement
+ public void processElement(ProcessContext c) {
+ c.output(new SellerPrice(c.element().getKey(), c.element().getValue()));
+ }
+ }));
+ }
+
+ @Override
+ protected PCollection<KnownSize> applyPrim(PCollection<Event> events) {
+ return NexmarkUtils.castToKnownSize(name, applyTyped(events));
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query6Model.java
----------------------------------------------------------------------
diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query6Model.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query6Model.java
new file mode 100644
index 0000000..b5152d8
--- /dev/null
+++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query6Model.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.nexmark.queries;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.TreeMap;
+import org.apache.beam.sdk.nexmark.NexmarkConfiguration;
+import org.apache.beam.sdk.nexmark.NexmarkUtils;
+import org.apache.beam.sdk.nexmark.model.Auction;
+import org.apache.beam.sdk.nexmark.model.AuctionBid;
+import org.apache.beam.sdk.nexmark.model.Bid;
+import org.apache.beam.sdk.nexmark.model.KnownSize;
+import org.apache.beam.sdk.nexmark.model.SellerPrice;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.joda.time.Instant;
+import org.junit.Assert;
+
+/**
+ * A direct implementation of {@link Query6}.
+ */
+public class Query6Model extends NexmarkQueryModel implements Serializable {
+ /**
+ * Simulator for query 6.
+ */
+ private static class Simulator extends AbstractSimulator<AuctionBid, SellerPrice> {
+ /** The cumulative count of winning bids, indexed by seller id. */
+ private final Map<Long, Long> numWinningBidsPerSeller;
+
+ /** The cumulative total of winning bid prices, indexed by seller id. */
+ private final Map<Long, Long> totalWinningBidPricesPerSeller;
+
+ private Instant lastTimestamp;
+
+ public Simulator(NexmarkConfiguration configuration) {
+ super(new WinningBidsSimulator(configuration).results());
+ numWinningBidsPerSeller = new TreeMap<>();
+ totalWinningBidPricesPerSeller = new TreeMap<>();
+ lastTimestamp = BoundedWindow.TIMESTAMP_MIN_VALUE;
+ }
+
+ /**
+ * Update the per-seller running counts/sums.
+ */
+ private void captureWinningBid(Auction auction, Bid bid, Instant timestamp) {
+ NexmarkUtils.info("winning auction, bid: %s, %s", auction, bid);
+ Long count = numWinningBidsPerSeller.get(auction.seller);
+ if (count == null) {
+ count = 1L;
+ } else {
+ count += 1;
+ }
+ numWinningBidsPerSeller.put(auction.seller, count);
+ Long total = totalWinningBidPricesPerSeller.get(auction.seller);
+ if (total == null) {
+ total = bid.price;
+ } else {
+ total += bid.price;
+ }
+ totalWinningBidPricesPerSeller.put(auction.seller, total);
+ TimestampedValue<SellerPrice> intermediateResult = TimestampedValue.of(
+ new SellerPrice(auction.seller, Math.round((double) total / count)), timestamp);
+ addIntermediateResult(intermediateResult);
+ }
+
+
+ @Override
+ protected void run() {
+ TimestampedValue<AuctionBid> timestampedWinningBid = nextInput();
+ if (timestampedWinningBid == null) {
+ for (Map.Entry<Long, Long> entry : numWinningBidsPerSeller.entrySet()) {
+ long seller = entry.getKey();
+ long count = entry.getValue();
+ long total = totalWinningBidPricesPerSeller.get(seller);
+ addResult(TimestampedValue.of(
+ new SellerPrice(seller, Math.round((double) total / count)), lastTimestamp));
+ }
+ allDone();
+ return;
+ }
+
+ lastTimestamp = timestampedWinningBid.getTimestamp();
+ captureWinningBid(timestampedWinningBid.getValue().auction,
+ timestampedWinningBid.getValue().bid, lastTimestamp);
+ }
+ }
+
+ public Query6Model(NexmarkConfiguration configuration) {
+ super(configuration);
+ }
+
+ @Override
+ public AbstractSimulator<?, ?> simulator() {
+ return new Simulator(configuration);
+ }
+
+ @Override
+ protected Iterable<TimestampedValue<KnownSize>> relevantResults(
+ Iterable<TimestampedValue<KnownSize>> results) {
+ // Find the last (in processing time) reported average price for each seller.
+ Map<Long, TimestampedValue<KnownSize>> finalAverages = new TreeMap<>();
+ for (TimestampedValue<KnownSize> obj : results) {
+ Assert.assertTrue("have SellerPrice", obj.getValue() instanceof SellerPrice);
+ SellerPrice sellerPrice = (SellerPrice) obj.getValue();
+ finalAverages.put(
+ sellerPrice.seller, TimestampedValue.of((KnownSize) sellerPrice, obj.getTimestamp()));
+ }
+ return finalAverages.values();
+ }
+
+ @Override
+ protected <T> Collection<String> toCollection(Iterator<TimestampedValue<T>> itr) {
+ return toValue(itr);
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query7.java
----------------------------------------------------------------------
diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query7.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query7.java
new file mode 100644
index 0000000..71b75c3
--- /dev/null
+++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query7.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.nexmark.queries;
+
+import org.apache.beam.sdk.nexmark.NexmarkConfiguration;
+import org.apache.beam.sdk.nexmark.NexmarkUtils;
+import org.apache.beam.sdk.nexmark.model.Bid;
+import org.apache.beam.sdk.nexmark.model.Event;
+import org.apache.beam.sdk.nexmark.model.KnownSize;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.Max;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionView;
+
+import org.joda.time.Duration;
+
+/**
+ * Query 7, 'Highest Bid'. Select the bids with the highest bid
+ * price in the last minute. In CQL syntax:
+ *
+ * <pre>
+ * SELECT Rstream(B.auction, B.price, B.bidder)
+ * FROM Bid [RANGE 1 MINUTE SLIDE 1 MINUTE] B
+ * WHERE B.price = (SELECT MAX(B1.price)
+ * FROM BID [RANGE 1 MINUTE SLIDE 1 MINUTE] B1);
+ * </pre>
+ *
+ * <p>We will use a shorter window to help make testing easier. We'll also implement this using
+ * a side-input in order to exercise that functionality. (A combiner, as used in Query 5, is
+ * a more efficient approach.).
+ */
+public class Query7 extends NexmarkQuery {
+ public Query7(NexmarkConfiguration configuration) {
+ super(configuration, "Query7");
+ }
+
+ private PCollection<Bid> applyTyped(PCollection<Event> events) {
+ // Window the bids.
+ PCollection<Bid> slidingBids = events.apply(JUST_BIDS).apply(
+ Window.<Bid>into(FixedWindows.of(Duration.standardSeconds(configuration.windowSizeSec))));
+
+ // Find the largest price in all bids.
+ // NOTE: It would be more efficient to write this query much as we did for Query5, using
+ // a binary combiner to accumulate the bids with maximal price. As written this query
+ // requires an additional scan per window, with the associated cost of snapshotted state and
+ // its I/O. We'll keep this implementation since it illustrates the use of side inputs.
+ final PCollectionView<Long> maxPriceView =
+ slidingBids
+ .apply("BidToPrice", BID_TO_PRICE)
+ .apply(Max.longsGlobally().withFanout(configuration.fanout).asSingletonView());
+
+ return slidingBids
+ // Select all bids which have that maximum price (there may be more than one).
+ .apply(name + ".Select", ParDo
+ .of(new DoFn<Bid, Bid>() {
+ @ProcessElement
+ public void processElement(ProcessContext c) {
+ long maxPrice = c.sideInput(maxPriceView);
+ Bid bid = c.element();
+ if (bid.price == maxPrice) {
+ c.output(bid);
+ }
+ }
+ })
+ .withSideInputs(maxPriceView));
+ }
+
+ @Override
+ protected PCollection<KnownSize> applyPrim(PCollection<Event> events) {
+ return NexmarkUtils.castToKnownSize(name, applyTyped(events));
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query7Model.java
----------------------------------------------------------------------
diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query7Model.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query7Model.java
new file mode 100644
index 0000000..4011746
--- /dev/null
+++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query7Model.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.nexmark.queries;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.beam.sdk.nexmark.NexmarkConfiguration;
+import org.apache.beam.sdk.nexmark.NexmarkUtils;
+import org.apache.beam.sdk.nexmark.model.Bid;
+import org.apache.beam.sdk.nexmark.model.Event;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+/**
+ * A direct implementation of {@link Query7}.
+ */
+public class Query7Model extends NexmarkQueryModel implements Serializable {
+ /**
+ * Simulator for query 7.
+ */
+ private class Simulator extends AbstractSimulator<Event, Bid> {
+ /** Bids with highest bid price seen in the current window. */
+ private final List<Bid> highestBids;
+
+ /** When current window started. */
+ private Instant windowStart;
+
+ private Instant lastTimestamp;
+
+ public Simulator(NexmarkConfiguration configuration) {
+ super(NexmarkUtils.standardEventIterator(configuration));
+ highestBids = new ArrayList<>();
+ windowStart = NexmarkUtils.BEGINNING_OF_TIME;
+ lastTimestamp = BoundedWindow.TIMESTAMP_MIN_VALUE;
+ }
+
+ /**
+ * Transfer the currently winning bids into results and retire them.
+ */
+ private void retireWindow(Instant timestamp) {
+ for (Bid bid : highestBids) {
+ addResult(TimestampedValue.of(bid, timestamp));
+ }
+ highestBids.clear();
+ }
+
+ /**
+ * Keep just the highest price bid.
+ */
+ private void captureBid(Bid bid) {
+ Iterator<Bid> itr = highestBids.iterator();
+ boolean isWinning = true;
+ while (itr.hasNext()) {
+ Bid existingBid = itr.next();
+ if (existingBid.price > bid.price) {
+ isWinning = false;
+ break;
+ }
+ NexmarkUtils.info("smaller price: %s", existingBid);
+ itr.remove();
+ }
+ if (isWinning) {
+ NexmarkUtils.info("larger price: %s", bid);
+ highestBids.add(bid);
+ }
+ }
+
+ @Override
+ protected void run() {
+ TimestampedValue<Event> timestampedEvent = nextInput();
+ if (timestampedEvent == null) {
+ // Capture all remaining bids in results.
+ retireWindow(lastTimestamp);
+ allDone();
+ return;
+ }
+
+ Event event = timestampedEvent.getValue();
+ if (event.bid == null) {
+ // Ignore non-bid events.
+ return;
+ }
+ lastTimestamp = timestampedEvent.getTimestamp();
+ Instant newWindowStart = windowStart(Duration.standardSeconds(configuration.windowSizeSec),
+ Duration.standardSeconds(configuration.windowSizeSec), lastTimestamp);
+ if (!newWindowStart.equals(windowStart)) {
+ // Capture highest priced bids in current window and retire it.
+ retireWindow(lastTimestamp);
+ windowStart = newWindowStart;
+ }
+ // Keep only the highest bids.
+ captureBid(event.bid);
+ }
+ }
+
+ public Query7Model(NexmarkConfiguration configuration) {
+ super(configuration);
+ }
+
+ @Override
+ public AbstractSimulator<?, ?> simulator() {
+ return new Simulator(configuration);
+ }
+
+ @Override
+ protected <T> Collection<String> toCollection(Iterator<TimestampedValue<T>> itr) {
+ return toValueOrder(itr);
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query8.java
----------------------------------------------------------------------
diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query8.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query8.java
new file mode 100644
index 0000000..fa3dd86
--- /dev/null
+++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query8.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.nexmark.queries;
+
+import org.apache.beam.sdk.nexmark.NexmarkConfiguration;
+import org.apache.beam.sdk.nexmark.NexmarkUtils;
+import org.apache.beam.sdk.nexmark.model.Auction;
+import org.apache.beam.sdk.nexmark.model.Event;
+import org.apache.beam.sdk.nexmark.model.IdNameReserve;
+import org.apache.beam.sdk.nexmark.model.KnownSize;
+import org.apache.beam.sdk.nexmark.model.Person;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.join.CoGbkResult;
+import org.apache.beam.sdk.transforms.join.CoGroupByKey;
+import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.joda.time.Duration;
+
+/**
+ * Query 8, 'Monitor New Users'. Select people who have entered the system and created auctions
+ * in the last 12 hours, updated every 12 hours. In CQL syntax:
+ *
+ * <pre>
+ * SELECT Rstream(P.id, P.name, A.reserve)
+ * FROM Person [RANGE 12 HOUR] P, Auction [RANGE 12 HOUR] A
+ * WHERE P.id = A.seller;
+ * </pre>
+ *
+ * <p>To make things a bit more dynamic and easier to test we'll use a much shorter window.
+ */
+public class Query8 extends NexmarkQuery {
+ public Query8(NexmarkConfiguration configuration) {
+ super(configuration, "Query8");
+ }
+
+ private PCollection<IdNameReserve> applyTyped(PCollection<Event> events) {
+ // Window and key new people by their id.
+ PCollection<KV<Long, Person>> personsById =
+ events
+ .apply(JUST_NEW_PERSONS)
+ .apply("Query8.WindowPersons",
+ Window.<Person>into(
+ FixedWindows.of(Duration.standardSeconds(configuration.windowSizeSec))))
+ .apply("PersonById", PERSON_BY_ID);
+
+ // Window and key new auctions by their id.
+ PCollection<KV<Long, Auction>> auctionsBySeller =
+ events.apply(JUST_NEW_AUCTIONS)
+ .apply("Query8.WindowAuctions",
+ Window.<Auction>into(
+ FixedWindows.of(Duration.standardSeconds(configuration.windowSizeSec))))
+ .apply("AuctionBySeller", AUCTION_BY_SELLER);
+
+ // Join people and auctions and project the person id, name and auction reserve price.
+ return KeyedPCollectionTuple.of(PERSON_TAG, personsById)
+ .and(AUCTION_TAG, auctionsBySeller)
+ .apply(CoGroupByKey.<Long>create())
+ .apply(name + ".Select",
+ ParDo.of(new DoFn<KV<Long, CoGbkResult>, IdNameReserve>() {
+ @ProcessElement
+ public void processElement(ProcessContext c) {
+ Person person = c.element().getValue().getOnly(PERSON_TAG, null);
+ if (person == null) {
+ // Person was not created in last window period.
+ return;
+ }
+ for (Auction auction : c.element().getValue().getAll(AUCTION_TAG)) {
+ c.output(new IdNameReserve(person.id, person.name, auction.reserve));
+ }
+ }
+ }));
+ }
+
+ @Override
+ protected PCollection<KnownSize> applyPrim(PCollection<Event> events) {
+ return NexmarkUtils.castToKnownSize(name, applyTyped(events));
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query8Model.java
----------------------------------------------------------------------
diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query8Model.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query8Model.java
new file mode 100644
index 0000000..351cef7
--- /dev/null
+++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query8Model.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.nexmark.queries;
+
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.Multimap;
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import org.apache.beam.sdk.nexmark.NexmarkConfiguration;
+import org.apache.beam.sdk.nexmark.NexmarkUtils;
+import org.apache.beam.sdk.nexmark.model.Auction;
+import org.apache.beam.sdk.nexmark.model.Event;
+import org.apache.beam.sdk.nexmark.model.IdNameReserve;
+import org.apache.beam.sdk.nexmark.model.Person;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+/**
+ * A direct implementation of {@link Query8}.
+ */
+public class Query8Model extends NexmarkQueryModel implements Serializable {
+ /**
+ * Simulator for query 8.
+ */
+ private class Simulator extends AbstractSimulator<Event, IdNameReserve> {
+ /** New persons seen in the current window, indexed by id. */
+ private final Map<Long, Person> newPersons;
+
+ /** New auctions seen in the current window, indexed by seller id. */
+ private final Multimap<Long, Auction> newAuctions;
+
+ /** When did the current window start. */
+ private Instant windowStart;
+
+ public Simulator(NexmarkConfiguration configuration) {
+ super(NexmarkUtils.standardEventIterator(configuration));
+ newPersons = new HashMap<>();
+ newAuctions = ArrayListMultimap.create();
+ windowStart = NexmarkUtils.BEGINNING_OF_TIME;
+ }
+
+ /**
+ * Retire all persons added in last window.
+ */
+ private void retirePersons() {
+ for (Map.Entry<Long, Person> entry : newPersons.entrySet()) {
+ NexmarkUtils.info("retire: %s", entry.getValue());
+ }
+ newPersons.clear();
+ }
+
+ /**
+ * Retire all auctions added in last window.
+ */
+ private void retireAuctions() {
+ for (Map.Entry<Long, Auction> entry : newAuctions.entries()) {
+ NexmarkUtils.info("retire: %s", entry.getValue());
+ }
+ newAuctions.clear();
+ }
+
+ /**
+ * Capture new result.
+ */
+ private void addResult(Auction auction, Person person, Instant timestamp) {
+ addResult(TimestampedValue.of(
+ new IdNameReserve(person.id, person.name, auction.reserve), timestamp));
+ }
+
+ @Override
+ public void run() {
+ TimestampedValue<Event> timestampedEvent = nextInput();
+ if (timestampedEvent == null) {
+ allDone();
+ return;
+ }
+
+ Event event = timestampedEvent.getValue();
+ if (event.bid != null) {
+ // Ignore bid events.
+ // Keep looking for next events.
+ return;
+ }
+ Instant timestamp = timestampedEvent.getTimestamp();
+ Instant newWindowStart = windowStart(Duration.standardSeconds(configuration.windowSizeSec),
+ Duration.standardSeconds(configuration.windowSizeSec), timestamp);
+ if (!newWindowStart.equals(windowStart)) {
+ // Retire this window.
+ retirePersons();
+ retireAuctions();
+ windowStart = newWindowStart;
+ }
+
+ if (event.newAuction != null) {
+ // Join new auction with existing person, if any.
+ Person person = newPersons.get(event.newAuction.seller);
+ if (person != null) {
+ addResult(event.newAuction, person, timestamp);
+ } else {
+ // Remember auction for future new people.
+ newAuctions.put(event.newAuction.seller, event.newAuction);
+ }
+ } else { // event is not an auction, nor a bid, so it is a person
+ // Join new person with existing auctions.
+ for (Auction auction : newAuctions.get(event.newPerson.id)) {
+ addResult(auction, event.newPerson, timestamp);
+ }
+ // We'll never need these auctions again.
+ newAuctions.removeAll(event.newPerson.id);
+ // Remember person for future auctions.
+ newPersons.put(event.newPerson.id, event.newPerson);
+ }
+ }
+ }
+
+ public Query8Model(NexmarkConfiguration configuration) {
+ super(configuration);
+ }
+
+ @Override
+ public AbstractSimulator<?, ?> simulator() {
+ return new Simulator(configuration);
+ }
+
+ @Override
+ protected <T> Collection<String> toCollection(Iterator<TimestampedValue<T>> itr) {
+ return toValue(itr);
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query9.java
----------------------------------------------------------------------
diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query9.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query9.java
new file mode 100644
index 0000000..5f11e4e
--- /dev/null
+++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query9.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.nexmark.queries;
+
+import org.apache.beam.sdk.nexmark.NexmarkConfiguration;
+import org.apache.beam.sdk.nexmark.NexmarkUtils;
+import org.apache.beam.sdk.nexmark.model.AuctionBid;
+import org.apache.beam.sdk.nexmark.model.Event;
+import org.apache.beam.sdk.nexmark.model.KnownSize;
+import org.apache.beam.sdk.values.PCollection;
+
+/**
+ * Query "9", 'Winning bids'. Select just the winning bids. Not in original NEXMark suite, but
+ * handy for testing. See {@link WinningBids} for the details.
+ */
+public class Query9 extends NexmarkQuery {
+ public Query9(NexmarkConfiguration configuration) {
+ super(configuration, "Query9");
+ }
+
+ private PCollection<AuctionBid> applyTyped(PCollection<Event> events) {
+ return events.apply(new WinningBids(name, configuration));
+ }
+
+ @Override
+ protected PCollection<KnownSize> applyPrim(PCollection<Event> events) {
+ return NexmarkUtils.castToKnownSize(name, applyTyped(events));
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query9Model.java
----------------------------------------------------------------------
diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query9Model.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query9Model.java
new file mode 100644
index 0000000..48d792e
--- /dev/null
+++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query9Model.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.nexmark.queries;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.Iterator;
+
+import org.apache.beam.sdk.nexmark.NexmarkConfiguration;
+import org.apache.beam.sdk.values.TimestampedValue;
+
+/**
+ * A direct implementation of {@link Query9}.
+ */
+public class Query9Model extends NexmarkQueryModel implements Serializable {
+ public Query9Model(NexmarkConfiguration configuration) {
+ super(configuration);
+ }
+
+ @Override
+ public AbstractSimulator<?, ?> simulator() {
+ return new WinningBidsSimulator(configuration);
+ }
+
+ @Override
+ protected <T> Collection<String> toCollection(Iterator<TimestampedValue<T>> itr) {
+ return toValue(itr);
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/WinningBids.java
----------------------------------------------------------------------
diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/WinningBids.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/WinningBids.java
new file mode 100644
index 0000000..816a81f
--- /dev/null
+++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/WinningBids.java
@@ -0,0 +1,412 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.nexmark.queries;
+
+import static com.google.common.base.Preconditions.checkState;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.TreeMap;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.CustomCoder;
+import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.coders.VarLongCoder;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Metrics;
+import org.apache.beam.sdk.nexmark.NexmarkConfiguration;
+import org.apache.beam.sdk.nexmark.NexmarkUtils;
+import org.apache.beam.sdk.nexmark.model.Auction;
+import org.apache.beam.sdk.nexmark.model.AuctionBid;
+import org.apache.beam.sdk.nexmark.model.Bid;
+import org.apache.beam.sdk.nexmark.model.Event;
+import org.apache.beam.sdk.nexmark.sources.GeneratorConfig;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.join.CoGbkResult;
+import org.apache.beam.sdk.transforms.join.CoGroupByKey;
+import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple;
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.transforms.windowing.WindowMappingFn;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.joda.time.Instant;
+
+/**
+ * A transform to find the winning bid for each closed auction. In pseudo CQL syntax:
+ *
+ * <pre>{@code
+ * SELECT Rstream(A.*, B.auction, B.bidder, MAX(B.price), B.dateTime)
+ * FROM Auction A [ROWS UNBOUNDED], Bid B [ROWS UNBOUNDED]
+ * WHERE A.id = B.auction AND B.datetime < A.expires AND A.expires < CURRENT_TIME
+ * GROUP BY A.id
+ * }</pre>
+ *
+ * <p>We will also check that the winning bid is above the auction reserve. Note that
+ * we ignore the auction opening bid value since it has no impact on which bid eventually wins,
+ * if any.
+ *
+ * <p>Our implementation will use a custom windowing function in order to bring bids and
+ * auctions together without requiring global state.
+ */
+public class WinningBids extends PTransform<PCollection<Event>, PCollection<AuctionBid>> {
+ /** Windows for open auctions and bids. */
+ private static class AuctionOrBidWindow extends IntervalWindow {
+ /** Id of auction this window is for. */
+ public final long auction;
+
+ /**
+ * True if this window represents an actual auction, and thus has a start/end
+ * time matching that of the auction. False if this window represents a bid, and
+ * thus has an unbounded start/end time.
+ */
+ public final boolean isAuctionWindow;
+
+ /** For avro only. */
+ private AuctionOrBidWindow() {
+ super(TIMESTAMP_MIN_VALUE, TIMESTAMP_MAX_VALUE);
+ auction = 0;
+ isAuctionWindow = false;
+ }
+
+ private AuctionOrBidWindow(
+ Instant start, Instant end, long auctionId, boolean isAuctionWindow) {
+ super(start, end);
+ this.auction = auctionId;
+ this.isAuctionWindow = isAuctionWindow;
+ }
+
+ /** Return an auction window for {@code auction}. */
+ public static AuctionOrBidWindow forAuction(Instant timestamp, Auction auction) {
+ return new AuctionOrBidWindow(timestamp, new Instant(auction.expires), auction.id, true);
+ }
+
+ /**
+ * Return a bid window for {@code bid}. It should later be merged into
+ * the corresponding auction window. However, it is possible this bid is for an already
+ * expired auction, or for an auction which the system has not yet seen. So we
+ * give the bid a bit of wiggle room in its interval.
+ */
+ public static AuctionOrBidWindow forBid(
+ long expectedAuctionDurationMs, Instant timestamp, Bid bid) {
+ // At this point we don't know which auctions are still valid, and the bid may
+ // be for an auction which won't start until some unknown time in the future
+ // (due to Generator.AUCTION_ID_LEAD in Generator.nextBid).
+ // A real system would atomically reconcile bids and auctions by a separate mechanism.
+ // If we give bids an unbounded window it is possible a bid for an auction which
+ // has already expired would cause the system watermark to stall, since that window
+ // would never be retired.
+ // Instead, we will just give the bid a finite window which expires at
+ // the upper bound of auctions assuming the auction starts at the same time as the bid,
+ // and assuming the system is running at its lowest event rate (as per interEventDelayUs).
+ return new AuctionOrBidWindow(
+ timestamp, timestamp.plus(expectedAuctionDurationMs * 2), bid.auction, false);
+ }
+
+ /** Is this an auction window? */
+ public boolean isAuctionWindow() {
+ return isAuctionWindow;
+ }
+
+ @Override
+ public String toString() {
+ return String.format("AuctionOrBidWindow{start:%s; end:%s; auction:%d; isAuctionWindow:%s}",
+ start(), end(), auction, isAuctionWindow);
+ }
+
+ @Override public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ if (!super.equals(o)) {
+ return false;
+ }
+ AuctionOrBidWindow that = (AuctionOrBidWindow) o;
+ return (isAuctionWindow == that.isAuctionWindow) && (auction == that.auction);
+ }
+
+ @Override public int hashCode() {
+ return Objects.hash(isAuctionWindow, auction);
+ }
+ }
+
+ /**
+ * Encodes an {@link AuctionOrBidWindow} as an {@link IntervalWindow} and an auction id long.
+ */
+ private static class AuctionOrBidWindowCoder extends CustomCoder<AuctionOrBidWindow> {
+ private static final AuctionOrBidWindowCoder INSTANCE = new AuctionOrBidWindowCoder();
+ private static final Coder<IntervalWindow> SUPER_CODER = IntervalWindow.getCoder();
+ private static final Coder<Long> ID_CODER = VarLongCoder.of();
+ private static final Coder<Integer> INT_CODER = VarIntCoder.of();
+
+ @JsonCreator
+ public static AuctionOrBidWindowCoder of() {
+ return INSTANCE;
+ }
+
+ @Override
+ public void encode(AuctionOrBidWindow window, OutputStream outStream)
+ throws IOException, CoderException {
+ SUPER_CODER.encode(window, outStream);
+ ID_CODER.encode(window.auction, outStream);
+ INT_CODER.encode(window.isAuctionWindow ? 1 : 0, outStream);
+ }
+
+ @Override
+ public AuctionOrBidWindow decode(InputStream inStream)
+ throws IOException, CoderException {
+ IntervalWindow superWindow = SUPER_CODER.decode(inStream);
+ long auction = ID_CODER.decode(inStream);
+ boolean isAuctionWindow = INT_CODER.decode(inStream) != 0;
+ return new AuctionOrBidWindow(
+ superWindow.start(), superWindow.end(), auction, isAuctionWindow);
+ }
+
+ @Override public void verifyDeterministic() throws NonDeterministicException {}
+ }
+
+ /** Assign events to auction windows and merges them intelligently. */
+ private static class AuctionOrBidWindowFn extends WindowFn<Event, AuctionOrBidWindow> {
+ /** Expected duration of auctions in ms. */
+ private final long expectedAuctionDurationMs;
+
+ public AuctionOrBidWindowFn(long expectedAuctionDurationMs) {
+ this.expectedAuctionDurationMs = expectedAuctionDurationMs;
+ }
+
+ @Override
+ public Collection<AuctionOrBidWindow> assignWindows(AssignContext c) {
+ Event event = c.element();
+ if (event.newAuction != null) {
+ // Assign auctions to an auction window which expires at the auction's close.
+ return Collections
+ .singletonList(AuctionOrBidWindow.forAuction(c.timestamp(), event.newAuction));
+ } else if (event.bid != null) {
+ // Assign bids to a temporary bid window which will later be merged into the appropriate
+ // auction window.
+ return Collections.singletonList(
+ AuctionOrBidWindow.forBid(expectedAuctionDurationMs, c.timestamp(), event.bid));
+ } else {
+ // Don't assign people to any window. They will thus be dropped.
+ return Collections.emptyList();
+ }
+ }
+
+ @Override
+ public void mergeWindows(MergeContext c) throws Exception {
+ // Split and index the auction and bid windows by auction id.
+ Map<Long, AuctionOrBidWindow> idToTrueAuctionWindow = new TreeMap<>();
+ Map<Long, List<AuctionOrBidWindow>> idToBidAuctionWindows = new TreeMap<>();
+ for (AuctionOrBidWindow window : c.windows()) {
+ if (window.isAuctionWindow()) {
+ idToTrueAuctionWindow.put(window.auction, window);
+ } else {
+ List<AuctionOrBidWindow> bidWindows = idToBidAuctionWindows.get(window.auction);
+ if (bidWindows == null) {
+ bidWindows = new ArrayList<>();
+ idToBidAuctionWindows.put(window.auction, bidWindows);
+ }
+ bidWindows.add(window);
+ }
+ }
+
+ // Merge all 'bid' windows into their corresponding 'auction' window, provided the
+ // auction has not expired.
+ for (Map.Entry<Long, AuctionOrBidWindow> entry : idToTrueAuctionWindow.entrySet()) {
+ long auction = entry.getKey();
+ AuctionOrBidWindow auctionWindow = entry.getValue();
+ List<AuctionOrBidWindow> bidWindows = idToBidAuctionWindows.get(auction);
+ if (bidWindows != null) {
+ List<AuctionOrBidWindow> toBeMerged = new ArrayList<>();
+ for (AuctionOrBidWindow bidWindow : bidWindows) {
+ if (bidWindow.start().isBefore(auctionWindow.end())) {
+ toBeMerged.add(bidWindow);
+ }
+ // else: This bid window will remain until its expire time, at which point it
+ // will expire without ever contributing to an output.
+ }
+ if (!toBeMerged.isEmpty()) {
+ toBeMerged.add(auctionWindow);
+ c.merge(toBeMerged, auctionWindow);
+ }
+ }
+ }
+ }
+
+ @Override
+ public boolean isCompatible(WindowFn<?, ?> other) {
+ return other instanceof AuctionOrBidWindowFn;
+ }
+
+ @Override
+ public Coder<AuctionOrBidWindow> windowCoder() {
+ return AuctionOrBidWindowCoder.of();
+ }
+
+ @Override
+ public WindowMappingFn<AuctionOrBidWindow> getDefaultWindowMappingFn() {
+ throw new UnsupportedOperationException("AuctionWindowFn not supported for side inputs");
+ }
+
+ /**
+ * Below we will GBK auctions and bids on their auction ids. Then we will reduce those
+ * per id to emit {@code (auction, winning bid)} pairs for auctions which have expired with at
+ * least one valid bid. We would like those output pairs to have a timestamp of the auction's
+ * expiry (since that's the earliest we know for sure we have the correct winner). We would
+ * also like to make that winning results are available to following stages at the auction's
+ * expiry.
+ *
+ * <p>Each result of the GBK will have a timestamp of the min of the result of this object's
+ * assignOutputTime over all records which end up in one of its iterables. Thus we get the
+ * desired behavior if we ignore each record's timestamp and always return the auction window's
+ * 'maxTimestamp', which will correspond to the auction's expiry.
+ *
+ * <p>In contrast, if this object's assignOutputTime were to return 'inputTimestamp'
+ * (the usual implementation), then each GBK record will take as its timestamp the minimum of
+ * the timestamps of all bids and auctions within it, which will always be the auction's
+ * timestamp. An auction which expires well into the future would thus hold up the watermark
+ * of the GBK results until that auction expired. That in turn would hold up all winning pairs.
+ */
+ @Override
+ public Instant getOutputTime(
+ Instant inputTimestamp, AuctionOrBidWindow window) {
+ return window.maxTimestamp();
+ }
+ }
+
+ private final AuctionOrBidWindowFn auctionOrBidWindowFn;
+
+ public WinningBids(String name, NexmarkConfiguration configuration) {
+ super(name);
+ // What's the expected auction time (when the system is running at the lowest event rate).
+ long[] interEventDelayUs = configuration.rateShape.interEventDelayUs(
+ configuration.firstEventRate, configuration.nextEventRate,
+ configuration.rateUnit, configuration.numEventGenerators);
+ long longestDelayUs = 0;
+ for (long interEventDelayU : interEventDelayUs) {
+ longestDelayUs = Math.max(longestDelayUs, interEventDelayU);
+ }
+ // Adjust for proportion of auction events amongst all events.
+ longestDelayUs =
+ (longestDelayUs * GeneratorConfig.PROPORTION_DENOMINATOR)
+ / GeneratorConfig.AUCTION_PROPORTION;
+ // Adjust for number of in-flight auctions.
+ longestDelayUs = longestDelayUs * configuration.numInFlightAuctions;
+ long expectedAuctionDurationMs = (longestDelayUs + 999) / 1000;
+ NexmarkUtils.console("Expected auction duration is %d ms", expectedAuctionDurationMs);
+ auctionOrBidWindowFn = new AuctionOrBidWindowFn(expectedAuctionDurationMs);
+ }
+
+ @Override
+ public PCollection<AuctionBid> expand(PCollection<Event> events) {
+ // Window auctions and bids into custom auction windows. New people events will be discarded.
+ // This will allow us to bring bids and auctions together irrespective of how long
+ // each auction is open for.
+ events = events.apply("Window", Window.into(auctionOrBidWindowFn));
+
+ // Key auctions by their id.
+ PCollection<KV<Long, Auction>> auctionsById =
+ events.apply(NexmarkQuery.JUST_NEW_AUCTIONS)
+ .apply("AuctionById:", NexmarkQuery.AUCTION_BY_ID);
+
+ // Key bids by their auction id.
+ PCollection<KV<Long, Bid>> bidsByAuctionId =
+ events.apply(NexmarkQuery.JUST_BIDS).apply("BidByAuction", NexmarkQuery.BID_BY_AUCTION);
+
+ // Find the highest price valid bid for each closed auction.
+ return
+ // Join auctions and bids.
+ KeyedPCollectionTuple.of(NexmarkQuery.AUCTION_TAG, auctionsById)
+ .and(NexmarkQuery.BID_TAG, bidsByAuctionId)
+ .apply(CoGroupByKey.<Long>create())
+ // Filter and select.
+ .apply(name + ".Join",
+ ParDo.of(new DoFn<KV<Long, CoGbkResult>, AuctionBid>() {
+ private final Counter noAuctionCounter = Metrics.counter(name, "noAuction");
+ private final Counter underReserveCounter = Metrics.counter(name, "underReserve");
+ private final Counter noValidBidsCounter = Metrics.counter(name, "noValidBids");
+
+ @ProcessElement
+ public void processElement(ProcessContext c) {
+ Auction auction =
+ c.element().getValue().getOnly(NexmarkQuery.AUCTION_TAG, null);
+ if (auction == null) {
+ // We have bids without a matching auction. Give up.
+ noAuctionCounter.inc();
+ return;
+ }
+ // Find the current winning bid for auction.
+ // The earliest bid with the maximum price above the reserve wins.
+ Bid bestBid = null;
+ for (Bid bid : c.element().getValue().getAll(NexmarkQuery.BID_TAG)) {
+ // Bids too late for their auction will have been
+ // filtered out by the window merge function.
+ checkState(bid.dateTime < auction.expires);
+ if (bid.price < auction.reserve) {
+ // Bid price is below auction reserve.
+ underReserveCounter.inc();
+ continue;
+ }
+
+ if (bestBid == null
+ || Bid.PRICE_THEN_DESCENDING_TIME.compare(bid, bestBid) > 0) {
+ bestBid = bid;
+ }
+ }
+ if (bestBid == null) {
+ // We don't have any valid bids for auction.
+ noValidBidsCounter.inc();
+ return;
+ }
+ c.output(new AuctionBid(auction, bestBid));
+ }
+ }
+ ));
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(auctionOrBidWindowFn);
+ }
+
+ @Override public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ WinningBids that = (WinningBids) o;
+ return auctionOrBidWindowFn.equals(that.auctionOrBidWindowFn);
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/WinningBidsSimulator.java
----------------------------------------------------------------------
diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/WinningBidsSimulator.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/WinningBidsSimulator.java
new file mode 100644
index 0000000..69b64c0
--- /dev/null
+++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/WinningBidsSimulator.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.nexmark.queries;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import javax.annotation.Nullable;
+
+import org.apache.beam.sdk.nexmark.NexmarkConfiguration;
+import org.apache.beam.sdk.nexmark.NexmarkUtils;
+import org.apache.beam.sdk.nexmark.model.Auction;
+import org.apache.beam.sdk.nexmark.model.AuctionBid;
+import org.apache.beam.sdk.nexmark.model.Bid;
+import org.apache.beam.sdk.nexmark.model.Event;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.joda.time.Instant;
+
+/**
+ * A simulator of the {@code WinningBids} query.
+ */
+public class WinningBidsSimulator extends AbstractSimulator<Event, AuctionBid> {
+ /** Auctions currently still open, indexed by auction id. */
+ private final Map<Long, Auction> openAuctions;
+
+ /** The ids of auctions known to be closed. */
+ private final Set<Long> closedAuctions;
+
+ /** Current best valid bids for open auctions, indexed by auction id. */
+ private final Map<Long, Bid> bestBids;
+
+ /** Bids for auctions we havn't seen yet. */
+ private final List<Bid> bidsWithoutAuctions;
+
+ /**
+ * Timestamp of last new auction or bid event (ms since epoch).
+ */
+ private long lastTimestamp;
+
+ public WinningBidsSimulator(NexmarkConfiguration configuration) {
+ super(NexmarkUtils.standardEventIterator(configuration));
+ openAuctions = new TreeMap<>();
+ closedAuctions = new TreeSet<>();
+ bestBids = new TreeMap<>();
+ bidsWithoutAuctions = new ArrayList<>();
+ lastTimestamp = BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis();
+ }
+
+ /**
+ * Try to account for {@code bid} in state. Return true if bid has now been
+ * accounted for by {@code bestBids}.
+ */
+ private boolean captureBestBid(Bid bid, boolean shouldLog) {
+ if (closedAuctions.contains(bid.auction)) {
+ // Ignore bids for known, closed auctions.
+ if (shouldLog) {
+ NexmarkUtils.info("closed auction: %s", bid);
+ }
+ return true;
+ }
+ Auction auction = openAuctions.get(bid.auction);
+ if (auction == null) {
+ // We don't have an auction for this bid yet, so can't determine if it is
+ // winning or not.
+ if (shouldLog) {
+ NexmarkUtils.info("pending auction: %s", bid);
+ }
+ return false;
+ }
+ if (bid.price < auction.reserve) {
+ // Bid price is too low.
+ if (shouldLog) {
+ NexmarkUtils.info("below reserve: %s", bid);
+ }
+ return true;
+ }
+ Bid existingBid = bestBids.get(bid.auction);
+ if (existingBid == null || Bid.PRICE_THEN_DESCENDING_TIME.compare(existingBid, bid) < 0) {
+ // We've found a (new) best bid for a known auction.
+ bestBids.put(bid.auction, bid);
+ if (shouldLog) {
+ NexmarkUtils.info("new winning bid: %s", bid);
+ }
+ } else {
+ if (shouldLog) {
+ NexmarkUtils.info("ignoring low bid: %s", bid);
+ }
+ }
+ return true;
+ }
+
+ /**
+ * Try to match bids without auctions to auctions.
+ */
+ private void flushBidsWithoutAuctions() {
+ Iterator<Bid> itr = bidsWithoutAuctions.iterator();
+ while (itr.hasNext()) {
+ Bid bid = itr.next();
+ if (captureBestBid(bid, false)) {
+ NexmarkUtils.info("bid now accounted for: %s", bid);
+ itr.remove();
+ }
+ }
+ }
+
+ /**
+ * Return the next winning bid for an expired auction relative to {@code timestamp}.
+ * Return null if no more winning bids, in which case all expired auctions will
+ * have been removed from our state. Retire auctions in order of expire time.
+ */
+ @Nullable
+ private TimestampedValue<AuctionBid> nextWinningBid(long timestamp) {
+ Map<Long, List<Long>> toBeRetired = new TreeMap<>();
+ for (Map.Entry<Long, Auction> entry : openAuctions.entrySet()) {
+ if (entry.getValue().expires <= timestamp) {
+ List<Long> idsAtTime = toBeRetired.get(entry.getValue().expires);
+ if (idsAtTime == null) {
+ idsAtTime = new ArrayList<>();
+ toBeRetired.put(entry.getValue().expires, idsAtTime);
+ }
+ idsAtTime.add(entry.getKey());
+ }
+ }
+ for (Map.Entry<Long, List<Long>> entry : toBeRetired.entrySet()) {
+ for (long id : entry.getValue()) {
+ Auction auction = openAuctions.get(id);
+ NexmarkUtils.info("retiring auction: %s", auction);
+ openAuctions.remove(id);
+ Bid bestBid = bestBids.get(id);
+ if (bestBid != null) {
+ TimestampedValue<AuctionBid> result =
+ TimestampedValue.of(new AuctionBid(auction, bestBid), new Instant(auction.expires));
+ NexmarkUtils.info("winning: %s", result);
+ return result;
+ }
+ }
+ }
+ return null;
+ }
+
+ @Override
+ protected void run() {
+ if (lastTimestamp > BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis()) {
+ // We may have finally seen the auction a bid was intended for.
+ flushBidsWithoutAuctions();
+ TimestampedValue<AuctionBid> result = nextWinningBid(lastTimestamp);
+ if (result != null) {
+ addResult(result);
+ return;
+ }
+ }
+
+ TimestampedValue<Event> timestampedEvent = nextInput();
+ if (timestampedEvent == null) {
+ // No more events. Flush any still open auctions.
+ TimestampedValue<AuctionBid> result =
+ nextWinningBid(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis());
+ if (result == null) {
+ // We are done.
+ allDone();
+ return;
+ }
+ addResult(result);
+ return;
+ }
+
+ Event event = timestampedEvent.getValue();
+ if (event.newPerson != null) {
+ // Ignore new person events.
+ return;
+ }
+
+ lastTimestamp = timestampedEvent.getTimestamp().getMillis();
+ if (event.newAuction != null) {
+ // Add this new open auction to our state.
+ openAuctions.put(event.newAuction.id, event.newAuction);
+ } else {
+ if (!captureBestBid(event.bid, true)) {
+ // We don't know what to do with this bid yet.
+ NexmarkUtils.info("bid not yet accounted for: %s", event.bid);
+ bidsWithoutAuctions.add(event.bid);
+ }
+ }
+ // Keep looking for winning bids.
+ }
+}