You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ie...@apache.org on 2017/08/23 17:09:21 UTC
[13/55] [abbrv] beam git commit: Refactor classes into packages
http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query3.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query3.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query3.java
deleted file mode 100644
index 71969c4..0000000
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query3.java
+++ /dev/null
@@ -1,249 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.integration.nexmark;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import javax.annotation.Nullable;
-
-import org.apache.beam.sdk.coders.ListCoder;
-import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.Filter;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.SerializableFunction;
-import org.apache.beam.sdk.transforms.Sum;
-import org.apache.beam.sdk.transforms.join.CoGbkResult;
-import org.apache.beam.sdk.transforms.join.CoGroupByKey;
-import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple;
-import org.apache.beam.sdk.transforms.windowing.FixedWindows;
-import org.apache.beam.sdk.transforms.windowing.Window;
-import org.apache.beam.sdk.util.state.StateSpec;
-import org.apache.beam.sdk.util.state.StateSpecs;
-import org.apache.beam.sdk.util.state.ValueState;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-import org.joda.time.Duration;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Query 3, 'Local Item Suggestion'. Who is selling in OR, ID or CA in category 10, and for what
- * auction ids? In CQL syntax:
- *
- * <pre>
- * SELECT Istream(P.name, P.city, P.state, A.id)
- * FROM Auction A [ROWS UNBOUNDED], Person P [ROWS UNBOUNDED]
- * WHERE A.seller = P.id AND (P.state = `OR' OR P.state = `ID' OR P.state = `CA') AND A.category
- * = 10;
- * </pre>
- *
- * <p>We'll implement this query to allow 'new auction' events to come before the 'new person'
- * events for the auction seller. Those auctions will be stored until the matching person is
- * seen. Then all subsequent auctions for a person will use the stored person record.
- *
- * <p>A real system would use an external system to maintain the id-to-person association.
- */
-class Query3 extends NexmarkQuery {
- private static final Logger LOG = LoggerFactory.getLogger(Query3.class);
-// private static final StateContext GLOBAL_NAMESPACE = StateContexts.global();
- private static final StateSpec<Object, ValueState<List<Auction>>> AUCTION_LIST_CODED_TAG =
- StateSpecs.value(ListCoder.of(Auction.CODER));
- private static final StateSpec<Object, ValueState<Person>> PERSON_CODED_TAG =
- StateSpecs.value(Person.CODER);
-
- /**
- * Join {@code auctions} and {@code people} by person id and emit their cross-product one pair
- * at a time.
- *
- * <p>We know a person may submit any number of auctions. Thus new person event must have the
- * person record stored in persistent state in order to match future auctions by that person.
- *
- * <p>However we know that each auction is associated with at most one person, so only need
- * to store auction records in persistent state until we have seen the corresponding person
- * record. And of course may have already seen that record.
- */
- private static class JoinDoFn extends DoFn<KV<Long, CoGbkResult>, KV<Auction, Person>> {
- private final Aggregator<Long, Long> newAuctionCounter =
- createAggregator("newAuction", Sum.ofLongs());
- private final Aggregator<Long, Long> newPersonCounter =
- createAggregator("newPerson", Sum.ofLongs());
- private final Aggregator<Long, Long> newNewOutputCounter =
- createAggregator("newNewOutput", Sum.ofLongs());
- private final Aggregator<Long, Long> newOldOutputCounter =
- createAggregator("newOldOutput", Sum.ofLongs());
- private final Aggregator<Long, Long> oldNewOutputCounter =
- createAggregator("oldNewOutput", Sum.ofLongs());
- public final Aggregator<Long, Long> fatalCounter = createAggregator("fatal", Sum.ofLongs());
-
- @ProcessElement
- public void processElement(ProcessContext c) throws IOException {
- //TODO: This is using the internal state API. Rework to use the
- //TODO Ismael this is broken for not access to state
- // We would *almost* implement this by rewindowing into the global window and
- // running a combiner over the result. The combiner's accumulator would be the
- // state we use below. However, combiners cannot emit intermediate results, thus
- // we need to wait for the pending ReduceFn API.
-// StateInternals<?> stateInternals = c.windowingInternals().stateInternals();
-// ValueState<Person> personState = stateInternals.state(GLOBAL_NAMESPACE, PERSON_CODED_TAG);
-// Person existingPerson = personState.read();
- Person existingPerson = null;
- if (existingPerson != null) {
- // We've already seen the new person event for this person id.
- // We can join with any new auctions on-the-fly without needing any
- // additional persistent state.
- for (Auction newAuction : c.element().getValue().getAll(AUCTION_TAG)) {
- newAuctionCounter.addValue(1L);
- newOldOutputCounter.addValue(1L);
- c.output(KV.of(newAuction, existingPerson));
- }
- return;
- }
-
-// ValueState<List<Auction>> auctionsState =
-// stateInternals.state(GLOBAL_NAMESPACE, AUCTION_LIST_CODED_TAG);
- Person theNewPerson = null;
- for (Person newPerson : c.element().getValue().getAll(PERSON_TAG)) {
- if (theNewPerson == null) {
- theNewPerson = newPerson;
- } else {
- if (theNewPerson.equals(newPerson)) {
- LOG.error("**** duplicate person {} ****", theNewPerson);
- } else {
- LOG.error("**** conflicting persons {} and {} ****", theNewPerson, newPerson);
- }
- fatalCounter.addValue(1L);
- continue;
- }
- newPersonCounter.addValue(1L);
- // We've now seen the person for this person id so can flush any
- // pending auctions for the same seller id.
- List<Auction> pendingAuctions = null; //auctionsState.read();
- if (pendingAuctions != null) {
- for (Auction pendingAuction : pendingAuctions) {
- oldNewOutputCounter.addValue(1L);
- c.output(KV.of(pendingAuction, newPerson));
- }
-// auctionsState.clear();
- }
- // Also deal with any new auctions.
- for (Auction newAuction : c.element().getValue().getAll(AUCTION_TAG)) {
- newAuctionCounter.addValue(1L);
- newNewOutputCounter.addValue(1L);
- c.output(KV.of(newAuction, newPerson));
- }
- // Remember this person for any future auctions.
-
-// personState.write(newPerson);
- }
- if (theNewPerson != null) {
- return;
- }
-
- // We'll need to remember the auctions until we see the corresponding
- // new person event.
- List<Auction> pendingAuctions = null; //auctionsState.read();
- if (pendingAuctions == null) {
- pendingAuctions = new ArrayList<>();
- }
- for (Auction newAuction : c.element().getValue().getAll(AUCTION_TAG)) {
- newAuctionCounter.addValue(1L);
- pendingAuctions.add(newAuction);
- }
-// auctionsState.write(pendingAuctions);
- }
- }
-
- private final JoinDoFn joinDoFn = new JoinDoFn();
-
- public Query3(NexmarkConfiguration configuration) {
- super(configuration, "Query3");
- }
-
- @Override
- @Nullable
- public Aggregator<Long, Long> getFatalCount() {
- return joinDoFn.fatalCounter;
- }
-
- private PCollection<NameCityStateId> applyTyped(PCollection<Event> events) {
- // Batch into incremental results windows.
- events = events.apply(
- Window.<Event>into(FixedWindows.of(Duration.standardSeconds(configuration.windowSizeSec))));
-
- PCollection<KV<Long, Auction>> auctionsBySellerId =
- events
- // Only want the new auction events.
- .apply(JUST_NEW_AUCTIONS)
-
- // We only want auctions in category 10.
- .apply(name + ".InCategory", Filter.by(new SerializableFunction<Auction, Boolean>() {
- @Override
- public Boolean apply(Auction auction) {
- return auction.category == 10;
- }
- }))
-
- // Key auctions by their seller id.
- .apply("AuctionBySeller", AUCTION_BY_SELLER);
-
- PCollection<KV<Long, Person>> personsById =
- events
- // Only want the new people events.
- .apply(JUST_NEW_PERSONS)
-
- // We only want people in OR, ID, CA.
- .apply(name + ".InState", Filter.by(new SerializableFunction<Person, Boolean>() {
- @Override
- public Boolean apply(Person person) {
- return person.state.equals("OR") || person.state.equals("ID")
- || person.state.equals("CA");
- }
- }))
-
- // Key people by their id.
- .apply("PersonById", PERSON_BY_ID);
-
- return
- // Join auctions and people.
- // concatenate KeyedPCollections
- KeyedPCollectionTuple.of(AUCTION_TAG, auctionsBySellerId)
- .and(PERSON_TAG, personsById)
- // group auctions and persons by personId
- .apply(CoGroupByKey.<Long>create())
- .apply(name + ".Join", ParDo.of(joinDoFn))
-
- // Project what we want.
- .apply(name + ".Project",
- ParDo.of(new DoFn<KV<Auction, Person>, NameCityStateId>() {
- @ProcessElement
- public void processElement(ProcessContext c) {
- Auction auction = c.element().getKey();
- Person person = c.element().getValue();
- c.output(new NameCityStateId(
- person.name, person.city, person.state, auction.id));
- }
- }));
- }
-
- @Override
- protected PCollection<KnownSize> applyPrim(PCollection<Event> events) {
- return NexmarkUtils.castToKnownSize(name, applyTyped(events));
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query3Model.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query3Model.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query3Model.java
deleted file mode 100644
index 85796ee..0000000
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query3Model.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.integration.nexmark;
-
-import com.google.common.collect.ArrayListMultimap;
-import com.google.common.collect.Multimap;
-
-import java.io.Serializable;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-
-import org.apache.beam.sdk.values.TimestampedValue;
-import org.joda.time.Instant;
-
-/**
- * A direct implementation of {@link Query3}.
- */
-public class Query3Model extends NexmarkQueryModel implements Serializable {
- /**
- * Simulator for query 3.
- */
- private class Simulator extends AbstractSimulator<Event, NameCityStateId> {
- /** Auctions, indexed by seller id. */
- private final Multimap<Long, Auction> newAuctions;
-
- /** Persons, indexed by id. */
- private final Map<Long, Person> newPersons;
-
- public Simulator(NexmarkConfiguration configuration) {
- super(NexmarkUtils.standardEventIterator(configuration));
- newPersons = new HashMap<>();
- newAuctions = ArrayListMultimap.create();
- }
-
- /**
- * Capture new result.
- */
- private void addResult(Auction auction, Person person, Instant timestamp) {
- TimestampedValue<NameCityStateId> result = TimestampedValue.of(
- new NameCityStateId(person.name, person.city, person.state, auction.id), timestamp);
- addResult(result);
- }
-
- @Override
- protected void run() {
- TimestampedValue<Event> timestampedEvent = nextInput();
- if (timestampedEvent == null) {
- allDone();
- return;
- }
- Event event = timestampedEvent.getValue();
- if (event.bid != null) {
- // Ignore bid events.
- return;
- }
-
- Instant timestamp = timestampedEvent.getTimestamp();
-
- if (event.newAuction != null) {
- // Only want auctions in category 10.
- if (event.newAuction.category == 10) {
- // Join new auction with existing person, if any.
- Person person = newPersons.get(event.newAuction.seller);
- if (person != null) {
- addResult(event.newAuction, person, timestamp);
- } else {
- // Remember auction for future new person event.
- newAuctions.put(event.newAuction.seller, event.newAuction);
- }
- }
- } else {
- // Only want people in OR, ID or CA.
- if (event.newPerson.state.equals("OR") || event.newPerson.state.equals("ID")
- || event.newPerson.state.equals("CA")) {
- // Join new person with existing auctions.
- for (Auction auction : newAuctions.get(event.newPerson.id)) {
- addResult(auction, event.newPerson, timestamp);
- }
- // We'll never need these auctions again.
- newAuctions.removeAll(event.newPerson.id);
- // Remember person for future auctions.
- newPersons.put(event.newPerson.id, event.newPerson);
- }
- }
- }
- }
-
- public Query3Model(NexmarkConfiguration configuration) {
- super(configuration);
- }
-
- @Override
- public AbstractSimulator<?, ?> simulator() {
- return new Simulator(configuration);
- }
-
- @Override
- protected <T> Collection<String> toCollection(Iterator<TimestampedValue<T>> itr) {
- return toValue(itr);
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query4.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query4.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query4.java
deleted file mode 100644
index b24410d..0000000
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query4.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.integration.nexmark;
-
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.Mean;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
-import org.apache.beam.sdk.transforms.windowing.Window;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-import org.joda.time.Duration;
-
-/**
- * Query 4, 'Average Price for a Category'. Select the average of the wining bid prices for all
- * closed auctions in each category. In CQL syntax:
- *
- * <pre>{@code
- * SELECT Istream(AVG(Q.final))
- * FROM Category C, (SELECT Rstream(MAX(B.price) AS final, A.category)
- * FROM Auction A [ROWS UNBOUNDED], Bid B [ROWS UNBOUNDED]
- * WHERE A.id=B.auction AND B.datetime < A.expires AND A.expires < CURRENT_TIME
- * GROUP BY A.id, A.category) Q
- * WHERE Q.category = C.id
- * GROUP BY C.id;
- * }</pre>
- *
- * <p>For extra spiciness our implementation differs slightly from the above:
- * <ul>
- * <li>We select both the average winning price and the category.
- * <li>We don't bother joining with a static category table, since it's contents are never used.
- * <li>We only consider bids which are above the auction's reserve price.
- * <li>We accept the highest-price, earliest valid bid as the winner.
- * <li>We calculate the averages oven a sliding window of size {@code windowSizeSec} and
- * period {@code windowPeriodSec}.
- * </ul>
- */
-class Query4 extends NexmarkQuery {
- private final Monitor<AuctionBid> winningBidsMonitor;
-
- public Query4(NexmarkConfiguration configuration) {
- super(configuration, "Query4");
- winningBidsMonitor = new Monitor<>(name + ".WinningBids", "winning");
- }
-
- private PCollection<CategoryPrice> applyTyped(PCollection<Event> events) {
- PCollection<AuctionBid> winningBids =
- events
- // Find the winning bid for each closed auction.
- .apply(new WinningBids(name + ".WinningBids", configuration));
-
- // Monitor winning bids
- winningBids = winningBids.apply(name + ".WinningBidsMonitor",
- winningBidsMonitor.getTransform());
-
- return winningBids
- // Key the winning bid price by the auction category.
- .apply(name + ".Rekey",
- ParDo.of(new DoFn<AuctionBid, KV<Long, Long>>() {
- @ProcessElement
- public void processElement(ProcessContext c) {
- Auction auction = c.element().auction;
- Bid bid = c.element().bid;
- c.output(KV.of(auction.category, bid.price));
- }
- }))
-
- // Re-window so we can calculate a sliding average
- .apply(Window.<KV<Long, Long>>into(
- SlidingWindows.of(Duration.standardSeconds(configuration.windowSizeSec))
- .every(Duration.standardSeconds(configuration.windowPeriodSec))))
-
- // Find the average of the winning bids for each category.
- // Make sure we share the work for each category between workers.
- .apply(Mean.<Long, Long>perKey().withHotKeyFanout(configuration.fanout))
-
- // For testing against Query4Model, capture which results are 'final'.
- .apply(name + ".Project",
- ParDo.of(new DoFn<KV<Long, Double>, CategoryPrice>() {
- @ProcessElement
- public void processElement(ProcessContext c) {
- c.output(new CategoryPrice(c.element().getKey(),
- Math.round(c.element().getValue()), c.pane().isLast()));
- }
- }));
- }
-
- @Override
- protected PCollection<KnownSize> applyPrim(PCollection<Event> events) {
- return NexmarkUtils.castToKnownSize(name, applyTyped(events));
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query4Model.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query4Model.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query4Model.java
deleted file mode 100644
index afab7e8..0000000
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query4Model.java
+++ /dev/null
@@ -1,179 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.integration.nexmark;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.TreeMap;
-
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.values.TimestampedValue;
-import org.joda.time.Duration;
-import org.joda.time.Instant;
-import org.junit.Assert;
-
-/**
- * A direct implementation of {@link Query4}.
- */
-public class Query4Model extends NexmarkQueryModel implements Serializable {
- /**
- * Simulator for query 4.
- */
- private class Simulator extends AbstractSimulator<AuctionBid, CategoryPrice> {
- /** The prices and categories for all winning bids in the last window size. */
- private final List<TimestampedValue<CategoryPrice>> winningPricesByCategory;
-
- /** Timestamp of last result (ms since epoch). */
- private Instant lastTimestamp;
-
- /** When oldest active window starts. */
- private Instant windowStart;
-
- /** The last seen result for each category. */
- private final Map<Long, TimestampedValue<CategoryPrice>> lastSeenResults;
-
- public Simulator(NexmarkConfiguration configuration) {
- super(new WinningBidsSimulator(configuration).results());
- winningPricesByCategory = new ArrayList<>();
- lastTimestamp = BoundedWindow.TIMESTAMP_MIN_VALUE;
- windowStart = NexmarkUtils.BEGINNING_OF_TIME;
- lastSeenResults = new TreeMap<>();
- }
-
- /**
- * Calculate the average bid price for each category for all winning bids
- * which are strictly before {@code end}.
- */
- private void averages(Instant end) {
- Map<Long, Long> counts = new TreeMap<>();
- Map<Long, Long> totals = new TreeMap<>();
- for (TimestampedValue<CategoryPrice> value : winningPricesByCategory) {
- if (!value.getTimestamp().isBefore(end)) {
- continue;
- }
- long category = value.getValue().category;
- long price = value.getValue().price;
- Long count = counts.get(category);
- if (count == null) {
- count = 1L;
- } else {
- count += 1;
- }
- counts.put(category, count);
- Long total = totals.get(category);
- if (total == null) {
- total = price;
- } else {
- total += price;
- }
- totals.put(category, total);
- }
- for (long category : counts.keySet()) {
- long count = counts.get(category);
- long total = totals.get(category);
- TimestampedValue<CategoryPrice> result = TimestampedValue.of(
- new CategoryPrice(category, Math.round((double) total / count), true), lastTimestamp);
- addIntermediateResult(result);
- lastSeenResults.put(category, result);
- }
- }
-
- /**
- * Calculate averages for any windows which can now be retired. Also prune entries
- * which can no longer contribute to any future window.
- */
- private void prune(Instant newWindowStart) {
- while (!newWindowStart.equals(windowStart)) {
- averages(windowStart.plus(Duration.standardSeconds(configuration.windowSizeSec)));
- windowStart = windowStart.plus(Duration.standardSeconds(configuration.windowPeriodSec));
- Iterator<TimestampedValue<CategoryPrice>> itr = winningPricesByCategory.iterator();
- while (itr.hasNext()) {
- if (itr.next().getTimestamp().isBefore(windowStart)) {
- itr.remove();
- }
- }
- if (winningPricesByCategory.isEmpty()) {
- windowStart = newWindowStart;
- }
- }
- }
-
- /**
- * Capture the winning bid.
- */
- private void captureWinningBid(Auction auction, Bid bid, Instant timestamp) {
- winningPricesByCategory.add(
- TimestampedValue.of(new CategoryPrice(auction.category, bid.price, false), timestamp));
- }
-
- @Override
- protected void run() {
- TimestampedValue<AuctionBid> timestampedWinningBid = nextInput();
- if (timestampedWinningBid == null) {
- prune(NexmarkUtils.END_OF_TIME);
- for (TimestampedValue<CategoryPrice> result : lastSeenResults.values()) {
- addResult(result);
- }
- allDone();
- return;
- }
- lastTimestamp = timestampedWinningBid.getTimestamp();
- Instant newWindowStart = windowStart(Duration.standardSeconds(configuration.windowSizeSec),
- Duration.standardSeconds(configuration.windowPeriodSec), lastTimestamp);
- prune(newWindowStart);
- captureWinningBid(timestampedWinningBid.getValue().auction,
- timestampedWinningBid.getValue().bid, lastTimestamp);
- }
- }
-
- public Query4Model(NexmarkConfiguration configuration) {
- super(configuration);
- }
-
- @Override
- public AbstractSimulator<?, ?> simulator() {
- return new Simulator(configuration);
- }
-
- @Override
- protected Iterable<TimestampedValue<KnownSize>> relevantResults(
- Iterable<TimestampedValue<KnownSize>> results) {
- // Find the last (in processing time) reported average price for each category.
- Map<Long, TimestampedValue<KnownSize>> finalAverages = new TreeMap<>();
- for (TimestampedValue<KnownSize> obj : results) {
- Assert.assertTrue("have CategoryPrice", obj.getValue() instanceof CategoryPrice);
- CategoryPrice categoryPrice = (CategoryPrice) obj.getValue();
- if (categoryPrice.isLast) {
- finalAverages.put(
- categoryPrice.category,
- TimestampedValue.of((KnownSize) categoryPrice, obj.getTimestamp()));
- }
- }
-
- return finalAverages.values();
- }
-
- @Override
- protected <T> Collection<String> toCollection(Iterator<TimestampedValue<T>> itr) {
- return toValue(itr);
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query5.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query5.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query5.java
deleted file mode 100644
index 2c9fb9b..0000000
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query5.java
+++ /dev/null
@@ -1,123 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.integration.nexmark;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-
-import org.apache.beam.sdk.transforms.Combine;
-import org.apache.beam.sdk.transforms.Count;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
-import org.apache.beam.sdk.transforms.windowing.Window;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-import org.joda.time.Duration;
-
-/**
- * Query 5, 'Hot Items'. Which auctions have seen the most bids in the last hour (updated every
- * minute). In CQL syntax:
- *
- * <pre>{@code
- * SELECT Rstream(auction)
- * FROM (SELECT B1.auction, count(*) AS num
- * FROM Bid [RANGE 60 MINUTE SLIDE 1 MINUTE] B1
- * GROUP BY B1.auction)
- * WHERE num >= ALL (SELECT count(*)
- * FROM Bid [RANGE 60 MINUTE SLIDE 1 MINUTE] B2
- * GROUP BY B2.auction);
- * }</pre>
- *
- * <p>To make things a bit more dynamic and easier to test we use much shorter windows, and
- * we'll also preserve the bid counts.
- */
-class Query5 extends NexmarkQuery {
- public Query5(NexmarkConfiguration configuration) {
- super(configuration, "Query5");
- }
-
- private PCollection<AuctionCount> applyTyped(PCollection<Event> events) {
- return events
- // Only want the bid events.
- .apply(JUST_BIDS)
- // Window the bids into sliding windows.
- .apply(Window.<Bid>into(
- SlidingWindows.of(Duration.standardSeconds(configuration.windowSizeSec))
- .every(Duration.standardSeconds(configuration.windowPeriodSec))))
- // Project just the auction id.
- .apply("BidToAuction", BID_TO_AUCTION)
-
- // Count the number of bids per auction id.
- .apply(Count.<Long>perElement())
-
- // We'll want to keep all auctions with the maximal number of bids.
- // Start by lifting each into a singleton list.
- .apply(name + ".ToSingletons",
- ParDo.of(new DoFn<KV<Long, Long>, KV<List<Long>, Long>>() {
- @ProcessElement
- public void processElement(ProcessContext c) {
- c.output(KV.of(Arrays.asList(c.element().getKey()), c.element().getValue()));
- }
- }))
-
- // Keep only the auction ids with the most bids.
- .apply(
- Combine
- .globally(new Combine.BinaryCombineFn<KV<List<Long>, Long>>() {
- @Override
- public KV<List<Long>, Long> apply(
- KV<List<Long>, Long> left, KV<List<Long>, Long> right) {
- List<Long> leftBestAuctions = left.getKey();
- long leftCount = left.getValue();
- List<Long> rightBestAuctions = right.getKey();
- long rightCount = right.getValue();
- if (leftCount > rightCount) {
- return left;
- } else if (leftCount < rightCount) {
- return right;
- } else {
- List<Long> newBestAuctions = new ArrayList<>();
- newBestAuctions.addAll(leftBestAuctions);
- newBestAuctions.addAll(rightBestAuctions);
- return KV.of(newBestAuctions, leftCount);
- }
- }
- })
- .withoutDefaults()
- .withFanout(configuration.fanout))
-
- // Project into result.
- .apply(name + ".Select",
- ParDo.of(new DoFn<KV<List<Long>, Long>, AuctionCount>() {
- @ProcessElement
- public void processElement(ProcessContext c) {
- long count = c.element().getValue();
- for (long auction : c.element().getKey()) {
- c.output(new AuctionCount(auction, count));
- }
- }
- }));
- }
-
- @Override
- protected PCollection<KnownSize> applyPrim(PCollection<Event> events) {
- return NexmarkUtils.castToKnownSize(name, applyTyped(events));
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query5Model.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query5Model.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query5Model.java
deleted file mode 100644
index f8e466e..0000000
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query5Model.java
+++ /dev/null
@@ -1,172 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.integration.nexmark;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.TreeMap;
-
-import org.apache.beam.sdk.values.TimestampedValue;
-import org.joda.time.Duration;
-import org.joda.time.Instant;
-
-/**
- * A direct implementation of {@link Query5}.
- */
-public class Query5Model extends NexmarkQueryModel implements Serializable {
- /**
- * Simulator for query 5.
- */
- private class Simulator extends AbstractSimulator<Event, AuctionCount> {
- /** Time of bids still contributing to open windows, indexed by their auction id. */
- private final Map<Long, List<Instant>> bids;
-
- /** When oldest active window starts. */
- private Instant windowStart;
-
- public Simulator(NexmarkConfiguration configuration) {
- super(NexmarkUtils.standardEventIterator(configuration));
- bids = new TreeMap<>();
- windowStart = NexmarkUtils.BEGINNING_OF_TIME;
- }
-
- /**
- * Count bids per auction id for bids strictly before {@code end}. Add the auction ids with
- * the maximum number of bids to results.
- */
- private void countBids(Instant end) {
- Map<Long, Long> counts = new TreeMap<>();
- long maxCount = 0L;
- for (Map.Entry<Long, List<Instant>> entry : bids.entrySet()) {
- long count = 0L;
- long auction = entry.getKey();
- for (Instant bid : entry.getValue()) {
- if (bid.isBefore(end)) {
- count++;
- }
- }
- if (count > 0) {
- counts.put(auction, count);
- maxCount = Math.max(maxCount, count);
- }
- }
- for (Map.Entry<Long, Long> entry : counts.entrySet()) {
- long auction = entry.getKey();
- long count = entry.getValue();
- if (count == maxCount) {
- AuctionCount result = new AuctionCount(auction, count);
- addResult(TimestampedValue.of(result, end));
- }
- }
- }
-
- /**
- * Retire bids which are strictly before {@code cutoff}. Return true if there are any bids
- * remaining.
- */
- private boolean retireBids(Instant cutoff) {
- boolean anyRemain = false;
- for (Map.Entry<Long, List<Instant>> entry : bids.entrySet()) {
- long auction = entry.getKey();
- Iterator<Instant> itr = entry.getValue().iterator();
- while (itr.hasNext()) {
- Instant bid = itr.next();
- if (bid.isBefore(cutoff)) {
- NexmarkUtils.info("retire: %s for %s", bid, auction);
- itr.remove();
- } else {
- anyRemain = true;
- }
- }
- }
- return anyRemain;
- }
-
- /**
- * Retire active windows until we've reached {@code newWindowStart}.
- */
- private void retireWindows(Instant newWindowStart) {
- while (!newWindowStart.equals(windowStart)) {
- NexmarkUtils.info("retiring window %s, aiming for %s", windowStart, newWindowStart);
- // Count bids in the window (windowStart, windowStart + size].
- countBids(windowStart.plus(Duration.standardSeconds(configuration.windowSizeSec)));
- // Advance the window.
- windowStart = windowStart.plus(Duration.standardSeconds(configuration.windowPeriodSec));
- // Retire bids which will never contribute to a future window.
- if (!retireBids(windowStart)) {
- // Can fast forward to latest window since no more outstanding bids.
- windowStart = newWindowStart;
- }
- }
- }
-
- /**
- * Add bid to state.
- */
- private void captureBid(Bid bid, Instant timestamp) {
- List<Instant> existing = bids.get(bid.auction);
- if (existing == null) {
- existing = new ArrayList<>();
- bids.put(bid.auction, existing);
- }
- existing.add(timestamp);
- }
-
- @Override
- public void run() {
- TimestampedValue<Event> timestampedEvent = nextInput();
- if (timestampedEvent == null) {
- // Drain the remaining windows.
- retireWindows(NexmarkUtils.END_OF_TIME);
- allDone();
- return;
- }
-
- Event event = timestampedEvent.getValue();
- if (event.bid == null) {
- // Ignore non-bid events.
- return;
- }
- Instant timestamp = timestampedEvent.getTimestamp();
- Instant newWindowStart = windowStart(Duration.standardSeconds(configuration.windowSizeSec),
- Duration.standardSeconds(configuration.windowPeriodSec), timestamp);
- // Capture results from any windows we can now retire.
- retireWindows(newWindowStart);
- // Capture current bid.
- captureBid(event.bid, timestamp);
- }
- }
-
- public Query5Model(NexmarkConfiguration configuration) {
- super(configuration);
- }
-
- @Override
- public AbstractSimulator<?, ?> simulator() {
- return new Simulator(configuration);
- }
-
- @Override
- protected <T> Collection<String> toCollection(Iterator<TimestampedValue<T>> itr) {
- return toValue(itr);
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query6.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query6.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query6.java
deleted file mode 100644
index d5bcc30..0000000
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query6.java
+++ /dev/null
@@ -1,151 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.integration.nexmark;
-
-import com.google.common.collect.Lists;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.beam.sdk.transforms.Combine;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.windowing.AfterPane;
-import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
-import org.apache.beam.sdk.transforms.windowing.Repeatedly;
-import org.apache.beam.sdk.transforms.windowing.Window;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-import org.joda.time.Duration;
-
-/**
- * Query 6, 'Average Selling Price by Seller'. Select the average selling price over the
- * last 10 closed auctions by the same seller. In CQL syntax:
- *
- * <pre>{@code
- * SELECT Istream(AVG(Q.final), Q.seller)
- * FROM (SELECT Rstream(MAX(B.price) AS final, A.seller)
- * FROM Auction A [ROWS UNBOUNDED], Bid B [ROWS UNBOUNDED]
- * WHERE A.id=B.auction AND B.datetime < A.expires AND A.expires < CURRENT_TIME
- * GROUP BY A.id, A.seller) [PARTITION BY A.seller ROWS 10] Q
- * GROUP BY Q.seller;
- * }</pre>
- *
- * <p>We are a little more exact with selecting winning bids: see {@link WinningBids}.
- */
-class Query6 extends NexmarkQuery {
- /**
- * Combiner to keep track of up to {@code maxNumBids} of the most recent wining bids and calculate
- * their average selling price.
- */
- private static class MovingMeanSellingPrice extends Combine.CombineFn<Bid, List<Bid>, Long> {
- private final int maxNumBids;
-
- public MovingMeanSellingPrice(int maxNumBids) {
- this.maxNumBids = maxNumBids;
- }
-
- @Override
- public List<Bid> createAccumulator() {
- return new ArrayList<>();
- }
-
- @Override
- public List<Bid> addInput(List<Bid> accumulator, Bid input) {
- accumulator.add(input);
- Collections.sort(accumulator, Bid.ASCENDING_TIME_THEN_PRICE);
- if (accumulator.size() > maxNumBids) {
- accumulator.remove(0);
- }
- return accumulator;
- }
-
- @Override
- public List<Bid> mergeAccumulators(Iterable<List<Bid>> accumulators) {
- List<Bid> result = new ArrayList<>();
- for (List<Bid> accumulator : accumulators) {
- for (Bid bid : accumulator) {
- result.add(bid);
- }
- }
- Collections.sort(result, Bid.ASCENDING_TIME_THEN_PRICE);
- if (result.size() > maxNumBids) {
- result = Lists.newArrayList(result.listIterator(result.size() - maxNumBids));
- }
- return result;
- }
-
- @Override
- public Long extractOutput(List<Bid> accumulator) {
- if (accumulator.isEmpty()) {
- return 0L;
- }
- long sumOfPrice = 0;
- for (Bid bid : accumulator) {
- sumOfPrice += bid.price;
- }
- return Math.round((double) sumOfPrice / accumulator.size());
- }
- }
-
- public Query6(NexmarkConfiguration configuration) {
- super(configuration, "Query6");
- }
-
- private PCollection<SellerPrice> applyTyped(PCollection<Event> events) {
- return events
- // Find the winning bid for each closed auction.
- .apply(new WinningBids(name + ".WinningBids", configuration))
-
- // Key the winning bid by the seller id.
- .apply(name + ".Rekey",
- ParDo.of(new DoFn<AuctionBid, KV<Long, Bid>>() {
- @ProcessElement
- public void processElement(ProcessContext c) {
- Auction auction = c.element().auction;
- Bid bid = c.element().bid;
- c.output(KV.of(auction.seller, bid));
- }
- }))
-
- // Re-window to update on every wining bid.
- .apply(
- Window.<KV<Long, Bid>>into(new GlobalWindows())
- .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))
- .accumulatingFiredPanes()
- .withAllowedLateness(Duration.ZERO))
-
- // Find the average of last 10 winning bids for each seller.
- .apply(Combine.<Long, Bid, Long>perKey(new MovingMeanSellingPrice(10)))
-
- // Project into our datatype.
- .apply(name + ".Select",
- ParDo.of(new DoFn<KV<Long, Long>, SellerPrice>() {
- @ProcessElement
- public void processElement(ProcessContext c) {
- c.output(new SellerPrice(c.element().getKey(), c.element().getValue()));
- }
- }));
- }
-
- @Override
- protected PCollection<KnownSize> applyPrim(PCollection<Event> events) {
- return NexmarkUtils.castToKnownSize(name, applyTyped(events));
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query6Model.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query6Model.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query6Model.java
deleted file mode 100644
index d03f0fe..0000000
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query6Model.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.integration.nexmark;
-
-import java.io.Serializable;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.TreeMap;
-
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.values.TimestampedValue;
-import org.joda.time.Instant;
-import org.junit.Assert;
-
-/**
- * A direct implementation of {@link Query6}.
- */
-public class Query6Model extends NexmarkQueryModel implements Serializable {
- /**
- * Simulator for query 6.
- */
- private static class Simulator extends AbstractSimulator<AuctionBid, SellerPrice> {
- /** The cumulative count of winning bids, indexed by seller id. */
- private final Map<Long, Long> numWinningBidsPerSeller;
-
- /** The cumulative total of winning bid prices, indexed by seller id. */
- private final Map<Long, Long> totalWinningBidPricesPerSeller;
-
- private Instant lastTimestamp;
-
- public Simulator(NexmarkConfiguration configuration) {
- super(new WinningBidsSimulator(configuration).results());
- numWinningBidsPerSeller = new TreeMap<>();
- totalWinningBidPricesPerSeller = new TreeMap<>();
- lastTimestamp = BoundedWindow.TIMESTAMP_MIN_VALUE;
- }
-
- /**
- * Update the per-seller running counts/sums.
- */
- private void captureWinningBid(Auction auction, Bid bid, Instant timestamp) {
- NexmarkUtils.info("winning auction, bid: %s, %s", auction, bid);
- Long count = numWinningBidsPerSeller.get(auction.seller);
- if (count == null) {
- count = 1L;
- } else {
- count += 1;
- }
- numWinningBidsPerSeller.put(auction.seller, count);
- Long total = totalWinningBidPricesPerSeller.get(auction.seller);
- if (total == null) {
- total = bid.price;
- } else {
- total += bid.price;
- }
- totalWinningBidPricesPerSeller.put(auction.seller, total);
- TimestampedValue<SellerPrice> intermediateResult = TimestampedValue.of(
- new SellerPrice(auction.seller, Math.round((double) total / count)), timestamp);
- addIntermediateResult(intermediateResult);
- }
-
-
- @Override
- protected void run() {
- TimestampedValue<AuctionBid> timestampedWinningBid = nextInput();
- if (timestampedWinningBid == null) {
- for (long seller : numWinningBidsPerSeller.keySet()) {
- long count = numWinningBidsPerSeller.get(seller);
- long total = totalWinningBidPricesPerSeller.get(seller);
- addResult(TimestampedValue.of(
- new SellerPrice(seller, Math.round((double) total / count)), lastTimestamp));
- }
- allDone();
- return;
- }
-
- lastTimestamp = timestampedWinningBid.getTimestamp();
- captureWinningBid(timestampedWinningBid.getValue().auction,
- timestampedWinningBid.getValue().bid, lastTimestamp);
- }
- }
-
- public Query6Model(NexmarkConfiguration configuration) {
- super(configuration);
- }
-
- @Override
- public AbstractSimulator<?, ?> simulator() {
- return new Simulator(configuration);
- }
-
- @Override
- protected Iterable<TimestampedValue<KnownSize>> relevantResults(
- Iterable<TimestampedValue<KnownSize>> results) {
- // Find the last (in processing time) reported average price for each seller.
- Map<Long, TimestampedValue<KnownSize>> finalAverages = new TreeMap<>();
- for (TimestampedValue<KnownSize> obj : results) {
- Assert.assertTrue("have SellerPrice", obj.getValue() instanceof SellerPrice);
- SellerPrice sellerPrice = (SellerPrice) obj.getValue();
- finalAverages.put(
- sellerPrice.seller, TimestampedValue.of((KnownSize) sellerPrice, obj.getTimestamp()));
- }
- return finalAverages.values();
- }
-
- @Override
- protected <T> Collection<String> toCollection(Iterator<TimestampedValue<T>> itr) {
- return toValue(itr);
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query7.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query7.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query7.java
deleted file mode 100644
index 7c51c18..0000000
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query7.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.integration.nexmark;
-
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.Max;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.windowing.FixedWindows;
-import org.apache.beam.sdk.transforms.windowing.Window;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionView;
-
-import org.joda.time.Duration;
-
-/**
- * Query 7, 'Highest Bid'. Select the bids with the highest bid
- * price in the last minute. In CQL syntax:
- *
- * <pre>
- * SELECT Rstream(B.auction, B.price, B.bidder)
- * FROM Bid [RANGE 1 MINUTE SLIDE 1 MINUTE] B
- * WHERE B.price = (SELECT MAX(B1.price)
- * FROM BID [RANGE 1 MINUTE SLIDE 1 MINUTE] B1);
- * </pre>
- *
- * <p>We will use a shorter window to help make testing easier. We'll also implement this using
- * a side-input in order to exercise that functionality. (A combiner, as used in Query 5, is
- * a more efficient approach.).
- */
-class Query7 extends NexmarkQuery {
- public Query7(NexmarkConfiguration configuration) {
- super(configuration, "Query7");
- }
-
- private PCollection<Bid> applyTyped(PCollection<Event> events) {
- // Window the bids.
- PCollection<Bid> slidingBids = events.apply(JUST_BIDS).apply(
- Window.<Bid>into(FixedWindows.of(Duration.standardSeconds(configuration.windowSizeSec))));
-
- // Find the largest price in all bids.
- // NOTE: It would be more efficient to write this query much as we did for Query5, using
- // a binary combiner to accumulate the bids with maximal price. As written this query
- // requires an additional scan per window, with the associated cost of snapshotted state and
- // its I/O. We'll keep this implementation since it illustrates the use of side inputs.
- final PCollectionView<Long> maxPriceView =
- slidingBids //
- .apply("BidToPrice", BID_TO_PRICE)
- .apply(Max.longsGlobally().withFanout(configuration.fanout).asSingletonView());
-
- return slidingBids
- // Select all bids which have that maximum price (there may be more than one).
- .apply(name + ".Select",
- ParDo.withSideInputs(maxPriceView)
- .of(new DoFn<Bid, Bid>() {
- @ProcessElement
- public void processElement(ProcessContext c) {
- long maxPrice = c.sideInput(maxPriceView);
- Bid bid = c.element();
- if (bid.price == maxPrice) {
- c.output(bid);
- }
- }
- }));
- }
-
- @Override
- protected PCollection<KnownSize> applyPrim(PCollection<Event> events) {
- return NexmarkUtils.castToKnownSize(name, applyTyped(events));
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query7Model.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query7Model.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query7Model.java
deleted file mode 100644
index 0033c68..0000000
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query7Model.java
+++ /dev/null
@@ -1,127 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.integration.nexmark;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.List;
-
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.values.TimestampedValue;
-import org.joda.time.Duration;
-import org.joda.time.Instant;
-
-/**
- * A direct implementation of {@link Query7}.
- */
-public class Query7Model extends NexmarkQueryModel implements Serializable {
- /**
- * Simulator for query 7.
- */
- private class Simulator extends AbstractSimulator<Event, Bid> {
- /** Bids with highest bid price seen in the current window. */
- private final List<Bid> highestBids;
-
- /** When current window started. */
- private Instant windowStart;
-
- private Instant lastTimestamp;
-
- public Simulator(NexmarkConfiguration configuration) {
- super(NexmarkUtils.standardEventIterator(configuration));
- highestBids = new ArrayList<>();
- windowStart = NexmarkUtils.BEGINNING_OF_TIME;
- lastTimestamp = BoundedWindow.TIMESTAMP_MIN_VALUE;
- }
-
- /**
- * Transfer the currently winning bids into results and retire them.
- */
- private void retireWindow(Instant timestamp) {
- for (Bid bid : highestBids) {
- addResult(TimestampedValue.of(bid, timestamp));
- }
- highestBids.clear();
- }
-
- /**
- * Keep just the highest price bid.
- */
- private void captureBid(Bid bid) {
- Iterator<Bid> itr = highestBids.iterator();
- boolean isWinning = true;
- while (itr.hasNext()) {
- Bid existingBid = itr.next();
- if (existingBid.price > bid.price) {
- isWinning = false;
- break;
- }
- NexmarkUtils.info("smaller price: %s", existingBid);
- itr.remove();
- }
- if (isWinning) {
- NexmarkUtils.info("larger price: %s", bid);
- highestBids.add(bid);
- }
- }
-
- @Override
- protected void run() {
- TimestampedValue<Event> timestampedEvent = nextInput();
- if (timestampedEvent == null) {
- // Capture all remaining bids in results.
- retireWindow(lastTimestamp);
- allDone();
- return;
- }
-
- Event event = timestampedEvent.getValue();
- if (event.bid == null) {
- // Ignore non-bid events.
- return;
- }
- lastTimestamp = timestampedEvent.getTimestamp();
- Instant newWindowStart = windowStart(Duration.standardSeconds(configuration.windowSizeSec),
- Duration.standardSeconds(configuration.windowSizeSec), lastTimestamp);
- if (!newWindowStart.equals(windowStart)) {
- // Capture highest priced bids in current window and retire it.
- retireWindow(lastTimestamp);
- windowStart = newWindowStart;
- }
- // Keep only the highest bids.
- captureBid(event.bid);
- //TODO test fails because offset of some hundreds of ms between expect and actual
- }
- }
-
- public Query7Model(NexmarkConfiguration configuration) {
- super(configuration);
- }
-
- @Override
- public AbstractSimulator<?, ?> simulator() {
- return new Simulator(configuration);
- }
-
- @Override
- protected <T> Collection<String> toCollection(Iterator<TimestampedValue<T>> itr) {
- return toValueOrder(itr);
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query8.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query8.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query8.java
deleted file mode 100644
index ee5c26c..0000000
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query8.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.integration.nexmark;
-
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.join.CoGbkResult;
-import org.apache.beam.sdk.transforms.join.CoGroupByKey;
-import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple;
-import org.apache.beam.sdk.transforms.windowing.FixedWindows;
-import org.apache.beam.sdk.transforms.windowing.Window;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-
-import org.joda.time.Duration;
-
-/**
- * Query 8, 'Monitor New Users'. Select people who have entered the system and created auctions
- * in the last 12 hours, updated every 12 hours. In CQL syntax:
- *
- * <pre>
- * SELECT Rstream(P.id, P.name, A.reserve)
- * FROM Person [RANGE 12 HOUR] P, Auction [RANGE 12 HOUR] A
- * WHERE P.id = A.seller;
- * </pre>
- *
- * <p>To make things a bit more dynamic and easier to test we'll use a much shorter window.
- */
-class Query8 extends NexmarkQuery {
- public Query8(NexmarkConfiguration configuration) {
- super(configuration, "Query8");
- }
-
- private PCollection<IdNameReserve> applyTyped(PCollection<Event> events) {
- // Window and key new people by their id.
- PCollection<KV<Long, Person>> personsById =
- events
- .apply(JUST_NEW_PERSONS)
- .apply("Query8.WindowPersons",
- Window.<Person>into(
- FixedWindows.of(Duration.standardSeconds(configuration.windowSizeSec))))
- .apply("PersonById", PERSON_BY_ID);
-
- // Window and key new auctions by their id.
- PCollection<KV<Long, Auction>> auctionsBySeller =
- events.apply(JUST_NEW_AUCTIONS)
- .apply("Query8.WindowAuctions",
- Window.<Auction>into(
- FixedWindows.of(Duration.standardSeconds(configuration.windowSizeSec))))
- .apply("AuctionBySeller", AUCTION_BY_SELLER);
-
- // Join people and auctions and project the person id, name and auction reserve price.
- return KeyedPCollectionTuple.of(PERSON_TAG, personsById)
- .and(AUCTION_TAG, auctionsBySeller)
- .apply(CoGroupByKey.<Long>create())
- .apply(name + ".Select",
- ParDo.of(new DoFn<KV<Long, CoGbkResult>, IdNameReserve>() {
- @ProcessElement
- public void processElement(ProcessContext c) {
- Person person = c.element().getValue().getOnly(PERSON_TAG, null);
- if (person == null) {
- // Person was not created in last window period.
- return;
- }
- for (Auction auction : c.element().getValue().getAll(AUCTION_TAG)) {
- c.output(new IdNameReserve(person.id, person.name, auction.reserve));
- }
- }
- }));
- }
-
- @Override
- protected PCollection<KnownSize> applyPrim(PCollection<Event> events) {
- return NexmarkUtils.castToKnownSize(name, applyTyped(events));
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query8Model.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query8Model.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query8Model.java
deleted file mode 100644
index 261e383..0000000
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query8Model.java
+++ /dev/null
@@ -1,144 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.integration.nexmark;
-
-import com.google.common.collect.ArrayListMultimap;
-import com.google.common.collect.Multimap;
-
-import java.io.Serializable;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-
-import org.apache.beam.sdk.values.TimestampedValue;
-import org.joda.time.Duration;
-import org.joda.time.Instant;
-
-/**
- * A direct implementation of {@link Query8}.
- */
-public class Query8Model extends NexmarkQueryModel implements Serializable {
- /**
- * Simulator for query 8.
- */
- private class Simulator extends AbstractSimulator<Event, IdNameReserve> {
- /** New persons seen in the current window, indexed by id. */
- private final Map<Long, Person> newPersons;
-
- /** New auctions seen in the current window, indexed by seller id. */
- private final Multimap<Long, Auction> newAuctions;
-
- /** When did the current window start. */
- private Instant windowStart;
-
- public Simulator(NexmarkConfiguration configuration) {
- super(NexmarkUtils.standardEventIterator(configuration));
- newPersons = new HashMap<>();
- newAuctions = ArrayListMultimap.create();
- windowStart = NexmarkUtils.BEGINNING_OF_TIME;
- }
-
- /**
- * Retire all persons added in last window.
- */
- private void retirePersons() {
- for (Map.Entry<Long, Person> entry : newPersons.entrySet()) {
- NexmarkUtils.info("retire: %s", entry.getValue());
- }
- newPersons.clear();
- }
-
- /**
- * Retire all auctions added in last window.
- */
- private void retireAuctions() {
- for (Map.Entry<Long, Auction> entry : newAuctions.entries()) {
- NexmarkUtils.info("retire: %s", entry.getValue());
- }
- newAuctions.clear();
- }
-
- /**
- * Capture new result.
- */
- private void addResult(Auction auction, Person person, Instant timestamp) {
- addResult(TimestampedValue.of(
- new IdNameReserve(person.id, person.name, auction.reserve), timestamp));
- }
-
- @Override
- public void run() {
- TimestampedValue<Event> timestampedEvent = nextInput();
- if (timestampedEvent == null) {
- allDone();
- return;
- }
-
- Event event = timestampedEvent.getValue();
- if (event.bid != null) {
- // Ignore bid events.
- // Keep looking for next events.
- return;
- }
- Instant timestamp = timestampedEvent.getTimestamp();
- Instant newWindowStart = windowStart(Duration.standardSeconds(configuration.windowSizeSec),
- Duration.standardSeconds(configuration.windowSizeSec), timestamp);
- if (!newWindowStart.equals(windowStart)) {
- // Retire this window.
- retirePersons();
- retireAuctions();
- windowStart = newWindowStart;
- }
-
- if (event.newAuction != null) {
- // Join new auction with existing person, if any.
- Person person = newPersons.get(event.newAuction.seller);
- if (person != null) {
- addResult(event.newAuction, person, timestamp);
- } else {
- // Remember auction for future new people.
- newAuctions.put(event.newAuction.seller, event.newAuction);
- }
- } else { // event is not an auction, nor a bid, so it is a person
- // Join new person with existing auctions.
- for (Auction auction : newAuctions.get(event.newPerson.id)) {
- addResult(auction, event.newPerson, timestamp);
- }
- // We'll never need these auctions again.
- newAuctions.removeAll(event.newPerson.id);
- // Remember person for future auctions.
- newPersons.put(event.newPerson.id, event.newPerson);
- }
- }
- }
-
- public Query8Model(NexmarkConfiguration configuration) {
- super(configuration);
- }
-
- @Override
- public AbstractSimulator<?, ?> simulator() {
- return new Simulator(configuration);
- }
-
- @Override
- protected <T> Collection<String> toCollection(Iterator<TimestampedValue<T>> itr) {
- return toValue(itr);
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query9.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query9.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query9.java
deleted file mode 100644
index 64bf653..0000000
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query9.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.integration.nexmark;
-
-import org.apache.beam.sdk.values.PCollection;
-
-/**
- * Query "9", 'Winning bids'. Select just the winning bids. Not in original NEXMark suite, but
- * handy for testing. See {@link WinningBids} for the details.
- */
-class Query9 extends NexmarkQuery {
- public Query9(NexmarkConfiguration configuration) {
- super(configuration, "Query9");
- }
-
- private PCollection<AuctionBid> applyTyped(PCollection<Event> events) {
- return events.apply(new WinningBids(name, configuration));
- }
-
- @Override
- protected PCollection<KnownSize> applyPrim(PCollection<Event> events) {
- return NexmarkUtils.castToKnownSize(name, applyTyped(events));
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query9Model.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query9Model.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query9Model.java
deleted file mode 100644
index 338f02a..0000000
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/Query9Model.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.integration.nexmark;
-
-import java.io.Serializable;
-import java.util.Collection;
-import java.util.Iterator;
-
-import org.apache.beam.sdk.values.TimestampedValue;
-
-/**
- * A direct implementation of {@link Query9}.
- */
-public class Query9Model extends NexmarkQueryModel implements Serializable {
- public Query9Model(NexmarkConfiguration configuration) {
- super(configuration);
- }
-
- @Override
- public AbstractSimulator<?, ?> simulator() {
- return new WinningBidsSimulator(configuration);
- }
-
- @Override
- protected <T> Collection<String> toCollection(Iterator<TimestampedValue<T>> itr) {
- return toValue(itr);
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/SellerPrice.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/SellerPrice.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/SellerPrice.java
deleted file mode 100644
index 4081287..0000000
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/SellerPrice.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.integration.nexmark;
-
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.core.JsonProcessingException;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.io.Serializable;
-
-import org.apache.beam.sdk.coders.AtomicCoder;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.VarLongCoder;
-
-/**
- * Result of {@link Query6}.
- */
-public class SellerPrice implements KnownSize, Serializable {
- private static final Coder<Long> LONG_CODER = VarLongCoder.of();
-
- public static final Coder<SellerPrice> CODER = new AtomicCoder<SellerPrice>() {
- @Override
- public void encode(SellerPrice value, OutputStream outStream,
- Coder.Context context)
- throws CoderException, IOException {
- LONG_CODER.encode(value.seller, outStream, Context.NESTED);
- LONG_CODER.encode(value.price, outStream, Context.NESTED);
- }
-
- @Override
- public SellerPrice decode(
- InputStream inStream, Coder.Context context)
- throws CoderException, IOException {
- long seller = LONG_CODER.decode(inStream, Context.NESTED);
- long price = LONG_CODER.decode(inStream, Context.NESTED);
- return new SellerPrice(seller, price);
- }
- };
-
- @JsonProperty
- public final long seller;
-
- /** Price in cents. */
- @JsonProperty
- public final long price;
-
- // For Avro only.
- @SuppressWarnings("unused")
- private SellerPrice() {
- seller = 0;
- price = 0;
- }
-
- public SellerPrice(long seller, long price) {
- this.seller = seller;
- this.price = price;
- }
-
- @Override
- public long sizeInBytes() {
- return 8 + 8;
- }
-
- @Override
- public String toString() {
- try {
- return NexmarkUtils.MAPPER.writeValueAsString(this);
- } catch (JsonProcessingException e) {
- throw new RuntimeException(e);
- }
- }
-}