You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ie...@apache.org on 2017/08/23 17:09:37 UTC
[29/55] [abbrv] beam git commit: Move WinningBids into the queries
package
Move WinningBids into the queries package
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/a39cb800
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/a39cb800
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/a39cb800
Branch: refs/heads/master
Commit: a39cb80009f569e1c8ba82ee9c67a7c5dbe3d16f
Parents: a6dbdfa
Author: Ismaël Mejía <ie...@apache.org>
Authored: Sun Apr 30 17:44:07 2017 +0200
Committer: Ismaël Mejía <ie...@gmail.com>
Committed: Wed Aug 23 19:07:28 2017 +0200
----------------------------------------------------------------------
.../integration/nexmark/AbstractSimulator.java | 210 ----------
.../beam/integration/nexmark/NexmarkQuery.java | 267 -------------
.../integration/nexmark/NexmarkQueryModel.java | 122 ------
.../beam/integration/nexmark/NexmarkRunner.java | 2 +
.../beam/integration/nexmark/WinningBids.java | 377 ------------------
.../nexmark/WinningBidsSimulator.java | 205 ----------
.../integration/nexmark/model/AuctionBid.java | 3 +-
.../nexmark/queries/AbstractSimulator.java | 211 +++++++++++
.../nexmark/queries/NexmarkQuery.java | 270 +++++++++++++
.../nexmark/queries/NexmarkQueryModel.java | 123 ++++++
.../integration/nexmark/queries/Query0.java | 1 -
.../nexmark/queries/Query0Model.java | 4 +-
.../integration/nexmark/queries/Query1.java | 1 -
.../integration/nexmark/queries/Query10.java | 1 -
.../integration/nexmark/queries/Query11.java | 1 -
.../integration/nexmark/queries/Query12.java | 1 -
.../nexmark/queries/Query1Model.java | 2 -
.../integration/nexmark/queries/Query2.java | 1 -
.../nexmark/queries/Query2Model.java | 2 -
.../integration/nexmark/queries/Query3.java | 1 -
.../nexmark/queries/Query3Model.java | 2 -
.../integration/nexmark/queries/Query4.java | 2 -
.../nexmark/queries/Query4Model.java | 3 -
.../integration/nexmark/queries/Query5.java | 1 -
.../nexmark/queries/Query5Model.java | 2 -
.../integration/nexmark/queries/Query6.java | 2 -
.../nexmark/queries/Query6Model.java | 3 -
.../integration/nexmark/queries/Query7.java | 1 -
.../nexmark/queries/Query7Model.java | 2 -
.../integration/nexmark/queries/Query8.java | 1 -
.../nexmark/queries/Query8Model.java | 2 -
.../integration/nexmark/queries/Query9.java | 2 -
.../nexmark/queries/Query9Model.java | 3 -
.../nexmark/queries/WinningBids.java | 379 +++++++++++++++++++
.../nexmark/queries/WinningBidsSimulator.java | 207 ++++++++++
.../integration/nexmark/queries/QueryTest.java | 2 -
36 files changed, 1194 insertions(+), 1225 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/a39cb800/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/AbstractSimulator.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/AbstractSimulator.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/AbstractSimulator.java
deleted file mode 100644
index b012842..0000000
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/AbstractSimulator.java
+++ /dev/null
@@ -1,210 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.integration.nexmark;
-
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import javax.annotation.Nullable;
-
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.values.TimestampedValue;
-import org.joda.time.Duration;
-import org.joda.time.Instant;
-
-/**
- * Abstract base class for simulator of a query.
- *
- * @param <InputT> Type of input elements.
- * @param <OutputT> Type of output elements.
- */
-public abstract class AbstractSimulator<InputT, OutputT> {
- /** Window size for action bucket sampling. */
- public static final Duration WINDOW_SIZE = Duration.standardMinutes(1);
-
- /** Input event stream we should draw from. */
- private final Iterator<TimestampedValue<InputT>> input;
-
- /** Set to true when no more results. */
- private boolean isDone;
-
- /**
- * Results which have not yet been returned by the {@link #results} iterator.
- */
- private final List<TimestampedValue<OutputT>> pendingResults;
-
- /**
- * Current window timestamp (ms since epoch).
- */
- private long currentWindow;
-
- /**
- * Number of (possibly intermediate) results for the current window.
- */
- private long currentCount;
-
- /**
- * Result counts per window which have not yet been returned by the {@link #resultsPerWindow}
- * iterator.
- */
- private final List<Long> pendingCounts;
-
- public AbstractSimulator(Iterator<TimestampedValue<InputT>> input) {
- this.input = input;
- isDone = false;
- pendingResults = new ArrayList<>();
- currentWindow = BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis();
- currentCount = 0;
- pendingCounts = new ArrayList<>();
- }
-
- /** Called by implementors of {@link #run}: Fetch the next input element. */
- @Nullable
- protected TimestampedValue<InputT> nextInput() {
- if (!input.hasNext()) {
- return null;
- }
- TimestampedValue<InputT> timestampedInput = input.next();
- NexmarkUtils.info("input: %s", timestampedInput);
- return timestampedInput;
- }
-
- /**
- * Called by implementors of {@link #run}: Capture an intermediate result, for the purpose of
- * recording the expected activity of the query over time.
- */
- protected void addIntermediateResult(TimestampedValue<OutputT> result) {
- NexmarkUtils.info("intermediate result: %s", result);
- updateCounts(result.getTimestamp());
- }
-
- /**
- * Called by implementors of {@link #run}: Capture a final result, for the purpose of checking
- * semantic correctness.
- */
- protected void addResult(TimestampedValue<OutputT> result) {
- NexmarkUtils.info("result: %s", result);
- pendingResults.add(result);
- updateCounts(result.getTimestamp());
- }
-
- /**
- * Update window and counts.
- */
- private void updateCounts(Instant timestamp) {
- long window = timestamp.getMillis() - timestamp.getMillis() % WINDOW_SIZE.getMillis();
- if (window > currentWindow) {
- if (currentWindow > BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis()) {
- pendingCounts.add(currentCount);
- }
- currentCount = 0;
- currentWindow = window;
- }
- currentCount++;
- }
-
- /** Called by implementors of {@link #run}: Record that no more results will be emitted. */
- protected void allDone() {
- isDone = true;
- }
-
- /**
- * Overridden by derived classes to do the next increment of work. Each call should
- * call one or more of {@link #nextInput}, {@link #addIntermediateResult}, {@link #addResult}
- * or {@link #allDone}. It is ok for a single call to emit more than one result via
- * {@link #addResult}. It is ok for a single call to run the entire simulation, though
- * this will prevent the {@link #results} and {@link #resultsPerWindow} iterators to
- * stall.
- */
- protected abstract void run();
-
- /**
- * Return iterator over all expected timestamped results. The underlying simulator state is
- * changed. Only one of {@link #results} or {@link #resultsPerWindow} can be called.
- */
- public Iterator<TimestampedValue<OutputT>> results() {
- return new Iterator<TimestampedValue<OutputT>>() {
- @Override
- public boolean hasNext() {
- while (true) {
- if (!pendingResults.isEmpty()) {
- return true;
- }
- if (isDone) {
- return false;
- }
- run();
- }
- }
-
- @Override
- public TimestampedValue<OutputT> next() {
- TimestampedValue<OutputT> result = pendingResults.get(0);
- pendingResults.remove(0);
- return result;
- }
-
- @Override
- public void remove() {
- throw new UnsupportedOperationException();
- }
- };
- }
-
- /**
- * Return an iterator over the number of results per {@link #WINDOW_SIZE} period. The underlying
- * simulator state is changed. Only one of {@link #results} or {@link #resultsPerWindow} can be
- * called.
- */
- public Iterator<Long> resultsPerWindow() {
- return new Iterator<Long>() {
- @Override
- public boolean hasNext() {
- while (true) {
- if (!pendingCounts.isEmpty()) {
- return true;
- }
- if (isDone) {
- if (currentCount > 0) {
- pendingCounts.add(currentCount);
- currentCount = 0;
- currentWindow = BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis();
- return true;
- } else {
- return false;
- }
- }
- run();
- }
- }
-
- @Override
- public Long next() {
- Long result = pendingCounts.get(0);
- pendingCounts.remove(0);
- return result;
- }
-
- @Override
- public void remove() {
- throw new UnsupportedOperationException();
- }
- };
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/a39cb800/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkQuery.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkQuery.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkQuery.java
deleted file mode 100644
index ab1c305..0000000
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkQuery.java
+++ /dev/null
@@ -1,267 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.integration.nexmark;
-
-import org.apache.beam.integration.nexmark.model.Auction;
-import org.apache.beam.integration.nexmark.model.Bid;
-import org.apache.beam.integration.nexmark.model.Event;
-import org.apache.beam.integration.nexmark.model.KnownSize;
-import org.apache.beam.integration.nexmark.model.Person;
-import org.apache.beam.sdk.metrics.Counter;
-import org.apache.beam.sdk.metrics.Metrics;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.Filter;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.SerializableFunction;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.TimestampedValue;
-import org.apache.beam.sdk.values.TupleTag;
-import org.joda.time.Instant;
-
-/**
- * Base class for the eight 'NEXMark' queries. Supplies some fragments common to
- * multiple queries.
- */
-public abstract class NexmarkQuery
- extends PTransform<PCollection<Event>, PCollection<TimestampedValue<KnownSize>>> {
- protected static final TupleTag<Auction> AUCTION_TAG = new TupleTag<>("auctions");
- protected static final TupleTag<Bid> BID_TAG = new TupleTag<>("bids");
- protected static final TupleTag<Person> PERSON_TAG = new TupleTag<>("person");
-
- /** Predicate to detect a new person event. */
- protected static final SerializableFunction<Event, Boolean> IS_NEW_PERSON =
- new SerializableFunction<Event, Boolean>() {
- @Override
- public Boolean apply(Event event) {
- return event.newPerson != null;
- }
- };
-
- /** DoFn to convert a new person event to a person. */
- protected static final DoFn<Event, Person> AS_PERSON = new DoFn<Event, Person>() {
- @ProcessElement
- public void processElement(ProcessContext c) {
- c.output(c.element().newPerson);
- }
- };
-
- /** Predicate to detect a new auction event. */
- protected static final SerializableFunction<Event, Boolean> IS_NEW_AUCTION =
- new SerializableFunction<Event, Boolean>() {
- @Override
- public Boolean apply(Event event) {
- return event.newAuction != null;
- }
- };
-
- /** DoFn to convert a new auction event to an auction. */
- protected static final DoFn<Event, Auction> AS_AUCTION = new DoFn<Event, Auction>() {
- @ProcessElement
- public void processElement(ProcessContext c) {
- c.output(c.element().newAuction);
- }
- };
-
- /** Predicate to detect a new bid event. */
- protected static final SerializableFunction<Event, Boolean> IS_BID =
- new SerializableFunction<Event, Boolean>() {
- @Override
- public Boolean apply(Event event) {
- return event.bid != null;
- }
- };
-
- /** DoFn to convert a bid event to a bid. */
- protected static final DoFn<Event, Bid> AS_BID = new DoFn<Event, Bid>() {
- @ProcessElement
- public void processElement(ProcessContext c) {
- c.output(c.element().bid);
- }
- };
-
- /** Transform to key each person by their id. */
- protected static final ParDo.SingleOutput<Person, KV<Long, Person>> PERSON_BY_ID =
- ParDo.of(new DoFn<Person, KV<Long, Person>>() {
- @ProcessElement
- public void processElement(ProcessContext c) {
- c.output(KV.of(c.element().id, c.element()));
- }
- });
-
- /** Transform to key each auction by its id. */
- protected static final ParDo.SingleOutput<Auction, KV<Long, Auction>> AUCTION_BY_ID =
- ParDo.of(new DoFn<Auction, KV<Long, Auction>>() {
- @ProcessElement
- public void processElement(ProcessContext c) {
- c.output(KV.of(c.element().id, c.element()));
- }
- });
-
- /** Transform to key each auction by its seller id. */
- protected static final ParDo.SingleOutput<Auction, KV<Long, Auction>> AUCTION_BY_SELLER =
- ParDo.of(new DoFn<Auction, KV<Long, Auction>>() {
- @ProcessElement
- public void processElement(ProcessContext c) {
- c.output(KV.of(c.element().seller, c.element()));
- }
- });
-
- /** Transform to key each bid by it's auction id. */
- protected static final ParDo.SingleOutput<Bid, KV<Long, Bid>> BID_BY_AUCTION =
- ParDo.of(new DoFn<Bid, KV<Long, Bid>>() {
- @ProcessElement
- public void processElement(ProcessContext c) {
- c.output(KV.of(c.element().auction, c.element()));
- }
- });
-
- /** Transform to project the auction id from each bid. */
- protected static final ParDo.SingleOutput<Bid, Long> BID_TO_AUCTION =
- ParDo.of(new DoFn<Bid, Long>() {
- @ProcessElement
- public void processElement(ProcessContext c) {
- c.output(c.element().auction);
- }
- });
-
- /** Transform to project the price from each bid. */
- protected static final ParDo.SingleOutput<Bid, Long> BID_TO_PRICE =
- ParDo.of(new DoFn<Bid, Long>() {
- @ProcessElement
- public void processElement(ProcessContext c) {
- c.output(c.element().price);
- }
- });
-
- /** Transform to emit each event with the timestamp embedded within it. */
- public static final ParDo.SingleOutput<Event, Event> EVENT_TIMESTAMP_FROM_DATA =
- ParDo.of(new DoFn<Event, Event>() {
- @ProcessElement
- public void processElement(ProcessContext c) {
- Event e = c.element();
- if (e.bid != null) {
- c.outputWithTimestamp(e, new Instant(e.bid.dateTime));
- } else if (e.newPerson != null) {
- c.outputWithTimestamp(e, new Instant(e.newPerson.dateTime));
- } else if (e.newAuction != null) {
- c.outputWithTimestamp(e, new Instant(e.newAuction.dateTime));
- }
- }
- });
-
- /**
- * Transform to filter for just the new auction events.
- */
- protected static final PTransform<PCollection<Event>, PCollection<Auction>> JUST_NEW_AUCTIONS =
- new PTransform<PCollection<Event>, PCollection<Auction>>("justNewAuctions") {
- @Override
- public PCollection<Auction> expand(PCollection<Event> input) {
- return input.apply("IsNewAuction", Filter.by(IS_NEW_AUCTION))
- .apply("AsAuction", ParDo.of(AS_AUCTION));
- }
- };
-
- /**
- * Transform to filter for just the new person events.
- */
- protected static final PTransform<PCollection<Event>, PCollection<Person>> JUST_NEW_PERSONS =
- new PTransform<PCollection<Event>, PCollection<Person>>("justNewPersons") {
- @Override
- public PCollection<Person> expand(PCollection<Event> input) {
- return input.apply("IsNewPerson", Filter.by(IS_NEW_PERSON))
- .apply("AsPerson", ParDo.of(AS_PERSON));
- }
- };
-
- /**
- * Transform to filter for just the bid events.
- */
- protected static final PTransform<PCollection<Event>, PCollection<Bid>> JUST_BIDS =
- new PTransform<PCollection<Event>, PCollection<Bid>>("justBids") {
- @Override
- public PCollection<Bid> expand(PCollection<Event> input) {
- return input.apply("IsBid", Filter.by(IS_BID))
- .apply("AsBid", ParDo.of(AS_BID));
- }
- };
-
- protected final NexmarkConfiguration configuration;
- public final Monitor<Event> eventMonitor;
- public final Monitor<KnownSize> resultMonitor;
- public final Monitor<Event> endOfStreamMonitor;
- protected final Counter fatalCounter;
-
- protected NexmarkQuery(NexmarkConfiguration configuration, String name) {
- super(name);
- this.configuration = configuration;
- if (configuration.debug) {
- eventMonitor = new Monitor<>(name + ".Events", "event");
- resultMonitor = new Monitor<>(name + ".Results", "result");
- endOfStreamMonitor = new Monitor<>(name + ".EndOfStream", "end");
- fatalCounter = Metrics.counter(name , "fatal");
- } else {
- eventMonitor = null;
- resultMonitor = null;
- endOfStreamMonitor = null;
- fatalCounter = null;
- }
- }
-
- /**
- * Implement the actual query. All we know about the result is it has a known encoded size.
- */
- protected abstract PCollection<KnownSize> applyPrim(PCollection<Event> events);
-
- @Override
- public PCollection<TimestampedValue<KnownSize>> expand(PCollection<Event> events) {
-
- if (configuration.debug) {
- events =
- events
- // Monitor events as they go by.
- .apply(name + ".Monitor", eventMonitor.getTransform())
- // Count each type of event.
- .apply(name + ".Snoop", NexmarkUtils.snoop(name));
- }
-
- if (configuration.cpuDelayMs > 0) {
- // Slow down by pegging one core at 100%.
- events = events.apply(name + ".CpuDelay",
- NexmarkUtils.<Event>cpuDelay(name, configuration.cpuDelayMs));
- }
-
- if (configuration.diskBusyBytes > 0) {
- // Slow down by forcing bytes to durable store.
- events = events.apply(name + ".DiskBusy",
- NexmarkUtils.<Event>diskBusy(name, configuration.diskBusyBytes));
- }
-
- // Run the query.
- PCollection<KnownSize> queryResults = applyPrim(events);
-
- if (configuration.debug) {
- // Monitor results as they go by.
- queryResults = queryResults.apply(name + ".Debug", resultMonitor.getTransform());
- }
-
- // Timestamp the query results.
- return queryResults.apply(name + ".Stamp", NexmarkUtils.<KnownSize>stamp(name));
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/a39cb800/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkQueryModel.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkQueryModel.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkQueryModel.java
deleted file mode 100644
index b2b1826..0000000
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkQueryModel.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.integration.nexmark;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Set;
-
-import org.apache.beam.integration.nexmark.model.KnownSize;
-import org.apache.beam.sdk.transforms.SerializableFunction;
-import org.apache.beam.sdk.values.TimestampedValue;
-
-
-import org.hamcrest.core.IsEqual;
-import org.joda.time.Duration;
-import org.joda.time.Instant;
-import org.junit.Assert;
-
-/**
- * Base class for models of the eight NEXMark queries. Provides an assertion function which can be
- * applied against the actual query results to check their consistency with the model.
- */
-public abstract class NexmarkQueryModel implements Serializable {
- protected final NexmarkConfiguration configuration;
-
- public NexmarkQueryModel(NexmarkConfiguration configuration) {
- this.configuration = configuration;
- }
-
- /**
- * Return the start of the most recent window of {@code size} and {@code period} which ends
- * strictly before {@code timestamp}.
- */
- public static Instant windowStart(Duration size, Duration period, Instant timestamp) {
- long ts = timestamp.getMillis();
- long p = period.getMillis();
- long lim = ts - ts % p;
- long s = size.getMillis();
- return new Instant(lim - s);
- }
-
- /** Convert {@code itr} to strings capturing values, timestamps and order. */
- protected static <T> List<String> toValueTimestampOrder(Iterator<TimestampedValue<T>> itr) {
- List<String> strings = new ArrayList<>();
- while (itr.hasNext()) {
- strings.add(itr.next().toString());
- }
- return strings;
- }
-
- /** Convert {@code itr} to strings capturing values and order. */
- protected static <T> List<String> toValueOrder(Iterator<TimestampedValue<T>> itr) {
- List<String> strings = new ArrayList<>();
- while (itr.hasNext()) {
- strings.add(itr.next().getValue().toString());
- }
- return strings;
- }
-
- /** Convert {@code itr} to strings capturing values only. */
- protected static <T> Set<String> toValue(Iterator<TimestampedValue<T>> itr) {
- Set<String> strings = new HashSet<>();
- while (itr.hasNext()) {
- strings.add(itr.next().getValue().toString());
- }
- return strings;
- }
-
- /** Return simulator for query. */
- protected abstract AbstractSimulator<?, ?> simulator();
-
- /** Return sub-sequence of results which are significant for model. */
- protected Iterable<TimestampedValue<KnownSize>> relevantResults(
- Iterable<TimestampedValue<KnownSize>> results) {
- return results;
- }
-
- /**
- * Convert iterator of elements to collection of strings to use when testing coherence of model
- * against actual query results.
- */
- protected abstract <T> Collection<String> toCollection(Iterator<TimestampedValue<T>> itr);
-
- /** Return assertion to use on results of pipeline for this query. */
- public SerializableFunction<Iterable<TimestampedValue<KnownSize>>, Void> assertionFor() {
- final Collection<String> expectedStrings = toCollection(simulator().results());
- final String[] expectedStringsArray =
- expectedStrings.toArray(new String[expectedStrings.size()]);
-
- return new SerializableFunction<Iterable<TimestampedValue<KnownSize>>, Void>() {
- @Override
- public Void apply(Iterable<TimestampedValue<KnownSize>> actual) {
- Collection<String> actualStrings = toCollection(relevantResults(actual).iterator());
- Assert.assertThat("wrong pipeline output", actualStrings,
- IsEqual.equalTo(expectedStrings));
-//compare without order
-// Assert.assertThat("wrong pipeline output", actualStrings,
-// IsIterableContainingInAnyOrder.containsInAnyOrder(expectedStringsArray));
- return null;
- }
- };
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/a39cb800/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkRunner.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkRunner.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkRunner.java
index ebfd196..a3c4d33 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkRunner.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkRunner.java
@@ -38,6 +38,8 @@ import org.apache.beam.integration.nexmark.model.Bid;
import org.apache.beam.integration.nexmark.model.Event;
import org.apache.beam.integration.nexmark.model.KnownSize;
import org.apache.beam.integration.nexmark.model.Person;
+import org.apache.beam.integration.nexmark.queries.NexmarkQuery;
+import org.apache.beam.integration.nexmark.queries.NexmarkQueryModel;
import org.apache.beam.integration.nexmark.queries.Query0;
import org.apache.beam.integration.nexmark.queries.Query0Model;
import org.apache.beam.integration.nexmark.queries.Query1;
http://git-wip-us.apache.org/repos/asf/beam/blob/a39cb800/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/WinningBids.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/WinningBids.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/WinningBids.java
deleted file mode 100644
index 3815b9d..0000000
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/WinningBids.java
+++ /dev/null
@@ -1,377 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.integration.nexmark;
-
-import static com.google.common.base.Preconditions.checkState;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import java.util.TreeMap;
-import org.apache.beam.integration.nexmark.model.Auction;
-import org.apache.beam.integration.nexmark.model.AuctionBid;
-import org.apache.beam.integration.nexmark.model.Bid;
-import org.apache.beam.integration.nexmark.model.Event;
-import org.apache.beam.integration.nexmark.sources.GeneratorConfig;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.CustomCoder;
-import org.apache.beam.sdk.coders.VarIntCoder;
-import org.apache.beam.sdk.coders.VarLongCoder;
-import org.apache.beam.sdk.metrics.Counter;
-import org.apache.beam.sdk.metrics.Metrics;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.join.CoGbkResult;
-import org.apache.beam.sdk.transforms.join.CoGroupByKey;
-import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple;
-import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
-import org.apache.beam.sdk.transforms.windowing.Window;
-import org.apache.beam.sdk.transforms.windowing.WindowFn;
-import org.apache.beam.sdk.transforms.windowing.WindowMappingFn;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-import org.joda.time.Instant;
-
-/**
- * A transform to find the winning bid for each closed auction. In pseudo CQL syntax:
- *
- * <pre>{@code
- * SELECT Rstream(A.*, B.auction, B.bidder, MAX(B.price), B.dateTime)
- * FROM Auction A [ROWS UNBOUNDED], Bid B [ROWS UNBOUNDED]
- * WHERE A.id = B.auction AND B.datetime < A.expires AND A.expires < CURRENT_TIME
- * GROUP BY A.id
- * }</pre>
- *
- * <p>We will also check that the winning bid is above the auction reserve. Note that
- * we ignore the auction opening bid value since it has no impact on which bid eventually wins,
- * if any.
- *
- * <p>Our implementation will use a custom windowing function in order to bring bids and
- * auctions together without requiring global state.
- */
-public class WinningBids extends PTransform<PCollection<Event>, PCollection<AuctionBid>> {
- /** Windows for open auctions and bids. */
- private static class AuctionOrBidWindow extends IntervalWindow implements Serializable {
- /** Id of auction this window is for. */
- public final long auction;
-
- /**
- * True if this window represents an actual auction, and thus has a start/end
- * time matching that of the auction. False if this window represents a bid, and
- * thus has an unbounded start/end time.
- */
- public final boolean isAuctionWindow;
-
- /** For avro only. */
- private AuctionOrBidWindow() {
- super(TIMESTAMP_MIN_VALUE, TIMESTAMP_MAX_VALUE);
- auction = 0;
- isAuctionWindow = false;
- }
-
- private AuctionOrBidWindow(
- Instant start, Instant end, long auctionId, boolean isAuctionWindow) {
- super(start, end);
- this.auction = auctionId;
- this.isAuctionWindow = isAuctionWindow;
- }
-
- /** Return an auction window for {@code auction}. */
- public static AuctionOrBidWindow forAuction(Instant timestamp, Auction auction) {
- AuctionOrBidWindow result =
- new AuctionOrBidWindow(timestamp, new Instant(auction.expires), auction.id, true);
- return result;
- }
-
- /**
- * Return a bid window for {@code bid}. It should later be merged into
- * the corresponding auction window. However, it is possible this bid is for an already
- * expired auction, or for an auction which the system has not yet seen. So we
- * give the bid a bit of wiggle room in its interval.
- */
- public static AuctionOrBidWindow forBid(
- long expectedAuctionDurationMs, Instant timestamp, Bid bid) {
- // At this point we don't know which auctions are still valid, and the bid may
- // be for an auction which won't start until some unknown time in the future
- // (due to Generator.AUCTION_ID_LEAD in Generator.nextBid).
- // A real system would atomically reconcile bids and auctions by a separate mechanism.
- // If we give bids an unbounded window it is possible a bid for an auction which
- // has already expired would cause the system watermark to stall, since that window
- // would never be retired.
- // Instead, we will just give the bid a finite window which expires at
- // the upper bound of auctions assuming the auction starts at the same time as the bid,
- // and assuming the system is running at its lowest event rate (as per interEventDelayUs).
- AuctionOrBidWindow result = new AuctionOrBidWindow(
- timestamp, timestamp.plus(expectedAuctionDurationMs * 2), bid.auction, false);
- return result;
- }
-
- /** Is this an auction window? */
- public boolean isAuctionWindow() {
- return isAuctionWindow;
- }
-
- @Override
- public String toString() {
- return String.format("AuctionOrBidWindow{start:%s; end:%s; auction:%d; isAuctionWindow:%s}",
- start(), end(), auction, isAuctionWindow);
- }
- }
-
- /**
- * Encodes an {@link AuctionOrBidWindow} as an {@link IntervalWindow} and an auction id long.
- */
- private static class AuctionOrBidWindowCoder extends CustomCoder<AuctionOrBidWindow> {
- private static final AuctionOrBidWindowCoder INSTANCE = new AuctionOrBidWindowCoder();
- private static final Coder<IntervalWindow> SUPER_CODER = IntervalWindow.getCoder();
- private static final Coder<Long> ID_CODER = VarLongCoder.of();
- private static final Coder<Integer> INT_CODER = VarIntCoder.of();
-
- @JsonCreator
- public static AuctionOrBidWindowCoder of() {
- return INSTANCE;
- }
-
- @Override
- public void encode(AuctionOrBidWindow window, OutputStream outStream, Coder.Context context)
- throws IOException, CoderException {
- SUPER_CODER.encode(window, outStream, Coder.Context.NESTED);
- ID_CODER.encode(window.auction, outStream, Coder.Context.NESTED);
- INT_CODER.encode(window.isAuctionWindow ? 1 : 0, outStream, Coder.Context.NESTED);
- }
-
- @Override
- public AuctionOrBidWindow decode(InputStream inStream, Coder.Context context)
- throws IOException, CoderException {
- IntervalWindow superWindow = SUPER_CODER.decode(inStream, Coder.Context.NESTED);
- long auction = ID_CODER.decode(inStream, Coder.Context.NESTED);
- boolean isAuctionWindow =
- INT_CODER.decode(inStream, Coder.Context.NESTED) == 0 ? false : true;
- return new AuctionOrBidWindow(
- superWindow.start(), superWindow.end(), auction, isAuctionWindow);
- }
-
- @Override public void verifyDeterministic() throws NonDeterministicException {}
- }
-
- /** Assign events to auction windows and merges them intelligently. */
- private static class AuctionOrBidWindowFn extends WindowFn<Event, AuctionOrBidWindow> {
- /** Expected duration of auctions in ms. */
- private final long expectedAuctionDurationMs;
-
- public AuctionOrBidWindowFn(long expectedAuctionDurationMs) {
- this.expectedAuctionDurationMs = expectedAuctionDurationMs;
- }
-
- @Override
- public Collection<AuctionOrBidWindow> assignWindows(AssignContext c) {
- Event event = c.element();
- if (event.newAuction != null) {
- // Assign auctions to an auction window which expires at the auction's close.
- return Arrays.asList(AuctionOrBidWindow.forAuction(c.timestamp(), event.newAuction));
- } else if (event.bid != null) {
- // Assign bids to a temporary bid window which will later be merged into the appropriate
- // auction window.
- return Arrays.asList(
- AuctionOrBidWindow.forBid(expectedAuctionDurationMs, c.timestamp(), event.bid));
- } else {
- // Don't assign people to any window. They will thus be dropped.
- return Arrays.asList();
- }
- }
-
- @Override
- public void mergeWindows(MergeContext c) throws Exception {
- // Split and index the auction and bid windows by auction id.
- Map<Long, AuctionOrBidWindow> idToTrueAuctionWindow = new TreeMap<>();
- Map<Long, List<AuctionOrBidWindow>> idToBidAuctionWindows = new TreeMap<>();
- for (AuctionOrBidWindow window : c.windows()) {
- if (window.isAuctionWindow()) {
- idToTrueAuctionWindow.put(window.auction, window);
- } else {
- List<AuctionOrBidWindow> bidWindows = idToBidAuctionWindows.get(window.auction);
- if (bidWindows == null) {
- bidWindows = new ArrayList<>();
- idToBidAuctionWindows.put(window.auction, bidWindows);
- }
- bidWindows.add(window);
- }
- }
-
- // Merge all 'bid' windows into their corresponding 'auction' window, provided the
- // auction has not expired.
- for (long auction : idToTrueAuctionWindow.keySet()) {
- AuctionOrBidWindow auctionWindow = idToTrueAuctionWindow.get(auction);
- List<AuctionOrBidWindow> bidWindows = idToBidAuctionWindows.get(auction);
- if (bidWindows != null) {
- List<AuctionOrBidWindow> toBeMerged = new ArrayList<>();
- for (AuctionOrBidWindow bidWindow : bidWindows) {
- if (bidWindow.start().isBefore(auctionWindow.end())) {
- toBeMerged.add(bidWindow);
- }
- // else: This bid window will remain until its expire time, at which point it
- // will expire without ever contributing to an output.
- }
- if (!toBeMerged.isEmpty()) {
- toBeMerged.add(auctionWindow);
- c.merge(toBeMerged, auctionWindow);
- }
- }
- }
- }
-
- @Override
- public boolean isCompatible(WindowFn<?, ?> other) {
- return other instanceof AuctionOrBidWindowFn;
- }
-
- @Override
- public Coder<AuctionOrBidWindow> windowCoder() {
- return AuctionOrBidWindowCoder.of();
- }
-
- @Override
- public WindowMappingFn<AuctionOrBidWindow> getDefaultWindowMappingFn() {
- throw new UnsupportedOperationException("AuctionWindowFn not supported for side inputs");
- }
-
- /**
- * Below we will GBK auctions and bids on their auction ids. Then we will reduce those
- * per id to emit {@code (auction, winning bid)} pairs for auctions which have expired with at
- * least one valid bid. We would like those output pairs to have a timestamp of the auction's
- * expiry (since that's the earliest we know for sure we have the correct winner). We would
- * also like to make that winning results are available to following stages at the auction's
- * expiry.
- *
- * <p>Each result of the GBK will have a timestamp of the min of the result of this object's
- * assignOutputTime over all records which end up in one of its iterables. Thus we get the
- * desired behavior if we ignore each record's timestamp and always return the auction window's
- * 'maxTimestamp', which will correspond to the auction's expiry.
- *
- * <p>In contrast, if this object's assignOutputTime were to return 'inputTimestamp'
- * (the usual implementation), then each GBK record will take as its timestamp the minimum of
- * the timestamps of all bids and auctions within it, which will always be the auction's
- * timestamp. An auction which expires well into the future would thus hold up the watermark
- * of the GBK results until that auction expired. That in turn would hold up all winning pairs.
- */
- @Override
- public Instant getOutputTime(
- Instant inputTimestamp, AuctionOrBidWindow window) {
- return window.maxTimestamp();
- }
- }
-
- private final AuctionOrBidWindowFn auctionOrBidWindowFn;
-
- public WinningBids(String name, NexmarkConfiguration configuration) {
- super(name);
- // What's the expected auction time (when the system is running at the lowest event rate).
- long[] interEventDelayUs = configuration.rateShape.interEventDelayUs(
- configuration.firstEventRate, configuration.nextEventRate,
- configuration.rateUnit, configuration.numEventGenerators);
- long longestDelayUs = 0;
- for (int i = 0; i < interEventDelayUs.length; i++) {
- longestDelayUs = Math.max(longestDelayUs, interEventDelayUs[i]);
- }
- // Adjust for proportion of auction events amongst all events.
- longestDelayUs =
- (longestDelayUs * GeneratorConfig.PROPORTION_DENOMINATOR)
- / GeneratorConfig.AUCTION_PROPORTION;
- // Adjust for number of in-flight auctions.
- longestDelayUs = longestDelayUs * configuration.numInFlightAuctions;
- long expectedAuctionDurationMs = (longestDelayUs + 999) / 1000;
- NexmarkUtils.console("Expected auction duration is %d ms", expectedAuctionDurationMs);
- auctionOrBidWindowFn = new AuctionOrBidWindowFn(expectedAuctionDurationMs);
- }
-
- @Override
- public PCollection<AuctionBid> expand(PCollection<Event> events) {
- // Window auctions and bids into custom auction windows. New people events will be discarded.
- // This will allow us to bring bids and auctions together irrespective of how long
- // each auction is open for.
- events = events.apply("Window", Window.into(auctionOrBidWindowFn));
-
- // Key auctions by their id.
- PCollection<KV<Long, Auction>> auctionsById =
- events.apply(NexmarkQuery.JUST_NEW_AUCTIONS)
- .apply("AuctionById:", NexmarkQuery.AUCTION_BY_ID);
-
- // Key bids by their auction id.
- PCollection<KV<Long, Bid>> bidsByAuctionId =
- events.apply(NexmarkQuery.JUST_BIDS).apply("BidByAuction", NexmarkQuery.BID_BY_AUCTION);
-
- // Find the highest price valid bid for each closed auction.
- return
- // Join auctions and bids.
- KeyedPCollectionTuple.of(NexmarkQuery.AUCTION_TAG, auctionsById)
- .and(NexmarkQuery.BID_TAG, bidsByAuctionId)
- .apply(CoGroupByKey.<Long>create())
- // Filter and select.
- .apply(name + ".Join",
- ParDo.of(new DoFn<KV<Long, CoGbkResult>, AuctionBid>() {
- private final Counter noAuctionCounter = Metrics.counter(name, "noAuction");
- private final Counter underReserveCounter = Metrics.counter(name, "underReserve");
- private final Counter noValidBidsCounter = Metrics.counter(name, "noValidBids");
-
- @ProcessElement
- public void processElement(ProcessContext c) {
- Auction auction =
- c.element().getValue().getOnly(NexmarkQuery.AUCTION_TAG, null);
- if (auction == null) {
- // We have bids without a matching auction. Give up.
- noAuctionCounter.inc();
- return;
- }
- // Find the current winning bid for auction.
- // The earliest bid with the maximum price above the reserve wins.
- Bid bestBid = null;
- for (Bid bid : c.element().getValue().getAll(NexmarkQuery.BID_TAG)) {
- // Bids too late for their auction will have been
- // filtered out by the window merge function.
- checkState(bid.dateTime < auction.expires);
- if (bid.price < auction.reserve) {
- // Bid price is below auction reserve.
- underReserveCounter.inc();
- continue;
- }
-
- if (bestBid == null
- || Bid.PRICE_THEN_DESCENDING_TIME.compare(bid, bestBid) > 0) {
- bestBid = bid;
- }
- }
- if (bestBid == null) {
- // We don't have any valid bids for auction.
- noValidBidsCounter.inc();
- return;
- }
- c.output(new AuctionBid(auction, bestBid));
- }
- }
- ));
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/a39cb800/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/WinningBidsSimulator.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/WinningBidsSimulator.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/WinningBidsSimulator.java
deleted file mode 100644
index e7f51b7..0000000
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/WinningBidsSimulator.java
+++ /dev/null
@@ -1,205 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.integration.nexmark;
-
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.TreeMap;
-import java.util.TreeSet;
-import javax.annotation.Nullable;
-
-import org.apache.beam.integration.nexmark.model.Auction;
-import org.apache.beam.integration.nexmark.model.AuctionBid;
-import org.apache.beam.integration.nexmark.model.Bid;
-import org.apache.beam.integration.nexmark.model.Event;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.values.TimestampedValue;
-import org.joda.time.Instant;
-
-/**
- * A simulator of the {@code WinningBids} query.
- */
-public class WinningBidsSimulator extends AbstractSimulator<Event, AuctionBid> {
- /** Auctions currently still open, indexed by auction id. */
- private final Map<Long, Auction> openAuctions;
-
- /** The ids of auctions known to be closed. */
- private final Set<Long> closedAuctions;
-
- /** Current best valid bids for open auctions, indexed by auction id. */
- private final Map<Long, Bid> bestBids;
-
- /** Bids for auctions we havn't seen yet. */
- private final List<Bid> bidsWithoutAuctions;
-
- /**
- * Timestamp of last new auction or bid event (ms since epoch).
- */
- private long lastTimestamp;
-
- public WinningBidsSimulator(NexmarkConfiguration configuration) {
- super(NexmarkUtils.standardEventIterator(configuration));
- openAuctions = new TreeMap<>();
- closedAuctions = new TreeSet<>();
- bestBids = new TreeMap<>();
- bidsWithoutAuctions = new ArrayList<>();
- lastTimestamp = BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis();
- }
-
- /**
- * Try to account for {@code bid} in state. Return true if bid has now been
- * accounted for by {@code bestBids}.
- */
- private boolean captureBestBid(Bid bid, boolean shouldLog) {
- if (closedAuctions.contains(bid.auction)) {
- // Ignore bids for known, closed auctions.
- if (shouldLog) {
- NexmarkUtils.info("closed auction: %s", bid);
- }
- return true;
- }
- Auction auction = openAuctions.get(bid.auction);
- if (auction == null) {
- // We don't have an auction for this bid yet, so can't determine if it is
- // winning or not.
- if (shouldLog) {
- NexmarkUtils.info("pending auction: %s", bid);
- }
- return false;
- }
- if (bid.price < auction.reserve) {
- // Bid price is too low.
- if (shouldLog) {
- NexmarkUtils.info("below reserve: %s", bid);
- }
- return true;
- }
- Bid existingBid = bestBids.get(bid.auction);
- if (existingBid == null || Bid.PRICE_THEN_DESCENDING_TIME.compare(existingBid, bid) < 0) {
- // We've found a (new) best bid for a known auction.
- bestBids.put(bid.auction, bid);
- if (shouldLog) {
- NexmarkUtils.info("new winning bid: %s", bid);
- }
- } else {
- if (shouldLog) {
- NexmarkUtils.info("ignoring low bid: %s", bid);
- }
- }
- return true;
- }
-
- /**
- * Try to match bids without auctions to auctions.
- */
- private void flushBidsWithoutAuctions() {
- Iterator<Bid> itr = bidsWithoutAuctions.iterator();
- while (itr.hasNext()) {
- Bid bid = itr.next();
- if (captureBestBid(bid, false)) {
- NexmarkUtils.info("bid now accounted for: %s", bid);
- itr.remove();
- }
- }
- }
-
- /**
- * Return the next winning bid for an expired auction relative to {@code timestamp}.
- * Return null if no more winning bids, in which case all expired auctions will
- * have been removed from our state. Retire auctions in order of expire time.
- */
- @Nullable
- private TimestampedValue<AuctionBid> nextWinningBid(long timestamp) {
- Map<Long, List<Long>> toBeRetired = new TreeMap<>();
- for (Map.Entry<Long, Auction> entry : openAuctions.entrySet()) {
- if (entry.getValue().expires <= timestamp) {
- List<Long> idsAtTime = toBeRetired.get(entry.getValue().expires);
- if (idsAtTime == null) {
- idsAtTime = new ArrayList<>();
- toBeRetired.put(entry.getValue().expires, idsAtTime);
- }
- idsAtTime.add(entry.getKey());
- }
- }
- for (Map.Entry<Long, List<Long>> entry : toBeRetired.entrySet()) {
- for (long id : entry.getValue()) {
- Auction auction = openAuctions.get(id);
- NexmarkUtils.info("retiring auction: %s", auction);
- openAuctions.remove(id);
- Bid bestBid = bestBids.get(id);
- if (bestBid != null) {
- TimestampedValue<AuctionBid> result =
- TimestampedValue.of(new AuctionBid(auction, bestBid), new Instant(auction.expires));
- NexmarkUtils.info("winning: %s", result);
- return result;
- }
- }
- }
- return null;
- }
-
- @Override
- protected void run() {
- if (lastTimestamp > BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis()) {
- // We may have finally seen the auction a bid was intended for.
- flushBidsWithoutAuctions();
- TimestampedValue<AuctionBid> result = nextWinningBid(lastTimestamp);
- if (result != null) {
- addResult(result);
- return;
- }
- }
-
- TimestampedValue<Event> timestampedEvent = nextInput();
- if (timestampedEvent == null) {
- // No more events. Flush any still open auctions.
- TimestampedValue<AuctionBid> result =
- nextWinningBid(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis());
- if (result == null) {
- // We are done.
- allDone();
- return;
- }
- addResult(result);
- //TODO test fails because offset of some hundreds of ms beween expect and actual
- return;
- }
-
- Event event = timestampedEvent.getValue();
- if (event.newPerson != null) {
- // Ignore new person events.
- return;
- }
-
- lastTimestamp = timestampedEvent.getTimestamp().getMillis();
- if (event.newAuction != null) {
- // Add this new open auction to our state.
- openAuctions.put(event.newAuction.id, event.newAuction);
- } else {
- if (!captureBestBid(event.bid, true)) {
- // We don't know what to do with this bid yet.
- NexmarkUtils.info("bid not yet accounted for: %s", event.bid);
- bidsWithoutAuctions.add(event.bid);
- }
- }
- // Keep looking for winning bids.
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/a39cb800/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/AuctionBid.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/AuctionBid.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/AuctionBid.java
index 7f6b7c9..b1d9ec2 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/AuctionBid.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/AuctionBid.java
@@ -24,13 +24,12 @@ import java.io.InputStream;
import java.io.OutputStream;
import java.io.Serializable;
import org.apache.beam.integration.nexmark.NexmarkUtils;
-import org.apache.beam.integration.nexmark.WinningBids;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.CustomCoder;
/**
- * Result of {@link WinningBids} transform.
+ * Result of {@link org.apache.beam.integration.nexmark.queries.WinningBids} transform.
*/
public class AuctionBid implements KnownSize, Serializable {
public static final Coder<AuctionBid> CODER = new CustomCoder<AuctionBid>() {
http://git-wip-us.apache.org/repos/asf/beam/blob/a39cb800/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/AbstractSimulator.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/AbstractSimulator.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/AbstractSimulator.java
new file mode 100644
index 0000000..270b5c3
--- /dev/null
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/AbstractSimulator.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.integration.nexmark.queries;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import javax.annotation.Nullable;
+
+import org.apache.beam.integration.nexmark.NexmarkUtils;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+/**
+ * Abstract base class for simulator of a query.
+ *
+ * @param <InputT> Type of input elements.
+ * @param <OutputT> Type of output elements.
+ */
+public abstract class AbstractSimulator<InputT, OutputT> {
+ /** Window size for action bucket sampling. */
+ public static final Duration WINDOW_SIZE = Duration.standardMinutes(1);
+
+ /** Input event stream we should draw from. */
+ private final Iterator<TimestampedValue<InputT>> input;
+
+ /** Set to true when no more results. */
+ private boolean isDone;
+
+ /**
+ * Results which have not yet been returned by the {@link #results} iterator.
+ */
+ private final List<TimestampedValue<OutputT>> pendingResults;
+
+ /**
+ * Current window timestamp (ms since epoch).
+ */
+ private long currentWindow;
+
+ /**
+ * Number of (possibly intermediate) results for the current window.
+ */
+ private long currentCount;
+
+ /**
+ * Result counts per window which have not yet been returned by the {@link #resultsPerWindow}
+ * iterator.
+ */
+ private final List<Long> pendingCounts;
+
+ public AbstractSimulator(Iterator<TimestampedValue<InputT>> input) {
+ this.input = input;
+ isDone = false;
+ pendingResults = new ArrayList<>();
+ currentWindow = BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis();
+ currentCount = 0;
+ pendingCounts = new ArrayList<>();
+ }
+
+ /** Called by implementors of {@link #run}: Fetch the next input element. */
+ @Nullable
+ protected TimestampedValue<InputT> nextInput() {
+ if (!input.hasNext()) {
+ return null;
+ }
+ TimestampedValue<InputT> timestampedInput = input.next();
+ NexmarkUtils.info("input: %s", timestampedInput);
+ return timestampedInput;
+ }
+
+ /**
+ * Called by implementors of {@link #run}: Capture an intermediate result, for the purpose of
+ * recording the expected activity of the query over time.
+ */
+ protected void addIntermediateResult(TimestampedValue<OutputT> result) {
+ NexmarkUtils.info("intermediate result: %s", result);
+ updateCounts(result.getTimestamp());
+ }
+
+ /**
+ * Called by implementors of {@link #run}: Capture a final result, for the purpose of checking
+ * semantic correctness.
+ */
+ protected void addResult(TimestampedValue<OutputT> result) {
+ NexmarkUtils.info("result: %s", result);
+ pendingResults.add(result);
+ updateCounts(result.getTimestamp());
+ }
+
+ /**
+ * Update window and counts.
+ */
+ private void updateCounts(Instant timestamp) {
+ long window = timestamp.getMillis() - timestamp.getMillis() % WINDOW_SIZE.getMillis();
+ if (window > currentWindow) {
+ if (currentWindow > BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis()) {
+ pendingCounts.add(currentCount);
+ }
+ currentCount = 0;
+ currentWindow = window;
+ }
+ currentCount++;
+ }
+
+ /** Called by implementors of {@link #run}: Record that no more results will be emitted. */
+ protected void allDone() {
+ isDone = true;
+ }
+
+ /**
+ * Overridden by derived classes to do the next increment of work. Each call should
+ * call one or more of {@link #nextInput}, {@link #addIntermediateResult}, {@link #addResult}
+ * or {@link #allDone}. It is ok for a single call to emit more than one result via
+ * {@link #addResult}. It is ok for a single call to run the entire simulation, though
+ * this will prevent the {@link #results} and {@link #resultsPerWindow} iterators to
+ * stall.
+ */
+ protected abstract void run();
+
+ /**
+ * Return iterator over all expected timestamped results. The underlying simulator state is
+ * changed. Only one of {@link #results} or {@link #resultsPerWindow} can be called.
+ */
+ public Iterator<TimestampedValue<OutputT>> results() {
+ return new Iterator<TimestampedValue<OutputT>>() {
+ @Override
+ public boolean hasNext() {
+ while (true) {
+ if (!pendingResults.isEmpty()) {
+ return true;
+ }
+ if (isDone) {
+ return false;
+ }
+ run();
+ }
+ }
+
+ @Override
+ public TimestampedValue<OutputT> next() {
+ TimestampedValue<OutputT> result = pendingResults.get(0);
+ pendingResults.remove(0);
+ return result;
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+ };
+ }
+
+ /**
+ * Return an iterator over the number of results per {@link #WINDOW_SIZE} period. The underlying
+ * simulator state is changed. Only one of {@link #results} or {@link #resultsPerWindow} can be
+ * called.
+ */
+ public Iterator<Long> resultsPerWindow() {
+ return new Iterator<Long>() {
+ @Override
+ public boolean hasNext() {
+ while (true) {
+ if (!pendingCounts.isEmpty()) {
+ return true;
+ }
+ if (isDone) {
+ if (currentCount > 0) {
+ pendingCounts.add(currentCount);
+ currentCount = 0;
+ currentWindow = BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis();
+ return true;
+ } else {
+ return false;
+ }
+ }
+ run();
+ }
+ }
+
+ @Override
+ public Long next() {
+ Long result = pendingCounts.get(0);
+ pendingCounts.remove(0);
+ return result;
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+ };
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/a39cb800/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/NexmarkQuery.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/NexmarkQuery.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/NexmarkQuery.java
new file mode 100644
index 0000000..0796ce5
--- /dev/null
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/NexmarkQuery.java
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.integration.nexmark.queries;
+
+import org.apache.beam.integration.nexmark.Monitor;
+import org.apache.beam.integration.nexmark.NexmarkConfiguration;
+import org.apache.beam.integration.nexmark.NexmarkUtils;
+import org.apache.beam.integration.nexmark.model.Auction;
+import org.apache.beam.integration.nexmark.model.Bid;
+import org.apache.beam.integration.nexmark.model.Event;
+import org.apache.beam.integration.nexmark.model.KnownSize;
+import org.apache.beam.integration.nexmark.model.Person;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Metrics;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.Filter;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.apache.beam.sdk.values.TupleTag;
+import org.joda.time.Instant;
+
+/**
+ * Base class for the eight 'NEXMark' queries. Supplies some fragments common to
+ * multiple queries.
+ */
+public abstract class NexmarkQuery
+ extends PTransform<PCollection<Event>, PCollection<TimestampedValue<KnownSize>>> {
+ public static final TupleTag<Auction> AUCTION_TAG = new TupleTag<>("auctions");
+ public static final TupleTag<Bid> BID_TAG = new TupleTag<>("bids");
+ protected static final TupleTag<Person> PERSON_TAG = new TupleTag<>("person");
+
+ /** Predicate to detect a new person event. */
+ protected static final SerializableFunction<Event, Boolean> IS_NEW_PERSON =
+ new SerializableFunction<Event, Boolean>() {
+ @Override
+ public Boolean apply(Event event) {
+ return event.newPerson != null;
+ }
+ };
+
+ /** DoFn to convert a new person event to a person. */
+ protected static final DoFn<Event, Person> AS_PERSON = new DoFn<Event, Person>() {
+ @ProcessElement
+ public void processElement(ProcessContext c) {
+ c.output(c.element().newPerson);
+ }
+ };
+
+ /** Predicate to detect a new auction event. */
+ protected static final SerializableFunction<Event, Boolean> IS_NEW_AUCTION =
+ new SerializableFunction<Event, Boolean>() {
+ @Override
+ public Boolean apply(Event event) {
+ return event.newAuction != null;
+ }
+ };
+
+ /** DoFn to convert a new auction event to an auction. */
+ protected static final DoFn<Event, Auction> AS_AUCTION = new DoFn<Event, Auction>() {
+ @ProcessElement
+ public void processElement(ProcessContext c) {
+ c.output(c.element().newAuction);
+ }
+ };
+
+ /** Predicate to detect a new bid event. */
+ protected static final SerializableFunction<Event, Boolean> IS_BID =
+ new SerializableFunction<Event, Boolean>() {
+ @Override
+ public Boolean apply(Event event) {
+ return event.bid != null;
+ }
+ };
+
+ /** DoFn to convert a bid event to a bid. */
+ protected static final DoFn<Event, Bid> AS_BID = new DoFn<Event, Bid>() {
+ @ProcessElement
+ public void processElement(ProcessContext c) {
+ c.output(c.element().bid);
+ }
+ };
+
+ /** Transform to key each person by their id. */
+ protected static final ParDo.SingleOutput<Person, KV<Long, Person>> PERSON_BY_ID =
+ ParDo.of(new DoFn<Person, KV<Long, Person>>() {
+ @ProcessElement
+ public void processElement(ProcessContext c) {
+ c.output(KV.of(c.element().id, c.element()));
+ }
+ });
+
+ /** Transform to key each auction by its id. */
+ protected static final ParDo.SingleOutput<Auction, KV<Long, Auction>> AUCTION_BY_ID =
+ ParDo.of(new DoFn<Auction, KV<Long, Auction>>() {
+ @ProcessElement
+ public void processElement(ProcessContext c) {
+ c.output(KV.of(c.element().id, c.element()));
+ }
+ });
+
+ /** Transform to key each auction by its seller id. */
+ protected static final ParDo.SingleOutput<Auction, KV<Long, Auction>> AUCTION_BY_SELLER =
+ ParDo.of(new DoFn<Auction, KV<Long, Auction>>() {
+ @ProcessElement
+ public void processElement(ProcessContext c) {
+ c.output(KV.of(c.element().seller, c.element()));
+ }
+ });
+
+ /** Transform to key each bid by it's auction id. */
+ protected static final ParDo.SingleOutput<Bid, KV<Long, Bid>> BID_BY_AUCTION =
+ ParDo.of(new DoFn<Bid, KV<Long, Bid>>() {
+ @ProcessElement
+ public void processElement(ProcessContext c) {
+ c.output(KV.of(c.element().auction, c.element()));
+ }
+ });
+
+ /** Transform to project the auction id from each bid. */
+ protected static final ParDo.SingleOutput<Bid, Long> BID_TO_AUCTION =
+ ParDo.of(new DoFn<Bid, Long>() {
+ @ProcessElement
+ public void processElement(ProcessContext c) {
+ c.output(c.element().auction);
+ }
+ });
+
+ /** Transform to project the price from each bid. */
+ protected static final ParDo.SingleOutput<Bid, Long> BID_TO_PRICE =
+ ParDo.of(new DoFn<Bid, Long>() {
+ @ProcessElement
+ public void processElement(ProcessContext c) {
+ c.output(c.element().price);
+ }
+ });
+
+ /** Transform to emit each event with the timestamp embedded within it. */
+ public static final ParDo.SingleOutput<Event, Event> EVENT_TIMESTAMP_FROM_DATA =
+ ParDo.of(new DoFn<Event, Event>() {
+ @ProcessElement
+ public void processElement(ProcessContext c) {
+ Event e = c.element();
+ if (e.bid != null) {
+ c.outputWithTimestamp(e, new Instant(e.bid.dateTime));
+ } else if (e.newPerson != null) {
+ c.outputWithTimestamp(e, new Instant(e.newPerson.dateTime));
+ } else if (e.newAuction != null) {
+ c.outputWithTimestamp(e, new Instant(e.newAuction.dateTime));
+ }
+ }
+ });
+
+ /**
+ * Transform to filter for just the new auction events.
+ */
+ public static final PTransform<PCollection<Event>, PCollection<Auction>> JUST_NEW_AUCTIONS =
+ new PTransform<PCollection<Event>, PCollection<Auction>>("justNewAuctions") {
+ @Override
+ public PCollection<Auction> expand(PCollection<Event> input) {
+ return input.apply("IsNewAuction", Filter.by(IS_NEW_AUCTION))
+ .apply("AsAuction", ParDo.of(AS_AUCTION));
+ }
+ };
+
+ /**
+ * Transform to filter for just the new person events.
+ */
+ public static final PTransform<PCollection<Event>, PCollection<Person>> JUST_NEW_PERSONS =
+ new PTransform<PCollection<Event>, PCollection<Person>>("justNewPersons") {
+ @Override
+ public PCollection<Person> expand(PCollection<Event> input) {
+ return input.apply("IsNewPerson", Filter.by(IS_NEW_PERSON))
+ .apply("AsPerson", ParDo.of(AS_PERSON));
+ }
+ };
+
+ /**
+ * Transform to filter for just the bid events.
+ */
+ public static final PTransform<PCollection<Event>, PCollection<Bid>> JUST_BIDS =
+ new PTransform<PCollection<Event>, PCollection<Bid>>("justBids") {
+ @Override
+ public PCollection<Bid> expand(PCollection<Event> input) {
+ return input.apply("IsBid", Filter.by(IS_BID))
+ .apply("AsBid", ParDo.of(AS_BID));
+ }
+ };
+
+ protected final NexmarkConfiguration configuration;
+ public final Monitor<Event> eventMonitor;
+ public final Monitor<KnownSize> resultMonitor;
+ public final Monitor<Event> endOfStreamMonitor;
+ protected final Counter fatalCounter;
+
+ protected NexmarkQuery(NexmarkConfiguration configuration, String name) {
+ super(name);
+ this.configuration = configuration;
+ if (configuration.debug) {
+ eventMonitor = new Monitor<>(name + ".Events", "event");
+ resultMonitor = new Monitor<>(name + ".Results", "result");
+ endOfStreamMonitor = new Monitor<>(name + ".EndOfStream", "end");
+ fatalCounter = Metrics.counter(name , "fatal");
+ } else {
+ eventMonitor = null;
+ resultMonitor = null;
+ endOfStreamMonitor = null;
+ fatalCounter = null;
+ }
+ }
+
+ /**
+ * Implement the actual query. All we know about the result is it has a known encoded size.
+ */
+ protected abstract PCollection<KnownSize> applyPrim(PCollection<Event> events);
+
+ @Override
+ public PCollection<TimestampedValue<KnownSize>> expand(PCollection<Event> events) {
+
+ if (configuration.debug) {
+ events =
+ events
+ // Monitor events as they go by.
+ .apply(name + ".Monitor", eventMonitor.getTransform())
+ // Count each type of event.
+ .apply(name + ".Snoop", NexmarkUtils.snoop(name));
+ }
+
+ if (configuration.cpuDelayMs > 0) {
+ // Slow down by pegging one core at 100%.
+ events = events.apply(name + ".CpuDelay",
+ NexmarkUtils.<Event>cpuDelay(name, configuration.cpuDelayMs));
+ }
+
+ if (configuration.diskBusyBytes > 0) {
+ // Slow down by forcing bytes to durable store.
+ events = events.apply(name + ".DiskBusy",
+ NexmarkUtils.<Event>diskBusy(name, configuration.diskBusyBytes));
+ }
+
+ // Run the query.
+ PCollection<KnownSize> queryResults = applyPrim(events);
+
+ if (configuration.debug) {
+ // Monitor results as they go by.
+ queryResults = queryResults.apply(name + ".Debug", resultMonitor.getTransform());
+ }
+
+ // Timestamp the query results.
+ return queryResults.apply(name + ".Stamp", NexmarkUtils.<KnownSize>stamp(name));
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/a39cb800/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/NexmarkQueryModel.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/NexmarkQueryModel.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/NexmarkQueryModel.java
new file mode 100644
index 0000000..1ad9099
--- /dev/null
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/NexmarkQueryModel.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.integration.nexmark.queries;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.beam.integration.nexmark.NexmarkConfiguration;
+import org.apache.beam.integration.nexmark.model.KnownSize;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.TimestampedValue;
+
+
+import org.hamcrest.core.IsEqual;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Assert;
+
+/**
+ * Base class for models of the eight NEXMark queries. Provides an assertion function which can be
+ * applied against the actual query results to check their consistency with the model.
+ */
+public abstract class NexmarkQueryModel implements Serializable {
+ public final NexmarkConfiguration configuration;
+
+ public NexmarkQueryModel(NexmarkConfiguration configuration) {
+ this.configuration = configuration;
+ }
+
+ /**
+ * Return the start of the most recent window of {@code size} and {@code period} which ends
+ * strictly before {@code timestamp}.
+ */
+ public static Instant windowStart(Duration size, Duration period, Instant timestamp) {
+ long ts = timestamp.getMillis();
+ long p = period.getMillis();
+ long lim = ts - ts % p;
+ long s = size.getMillis();
+ return new Instant(lim - s);
+ }
+
+ /** Convert {@code itr} to strings capturing values, timestamps and order. */
+ protected static <T> List<String> toValueTimestampOrder(Iterator<TimestampedValue<T>> itr) {
+ List<String> strings = new ArrayList<>();
+ while (itr.hasNext()) {
+ strings.add(itr.next().toString());
+ }
+ return strings;
+ }
+
+ /** Convert {@code itr} to strings capturing values and order. */
+ protected static <T> List<String> toValueOrder(Iterator<TimestampedValue<T>> itr) {
+ List<String> strings = new ArrayList<>();
+ while (itr.hasNext()) {
+ strings.add(itr.next().getValue().toString());
+ }
+ return strings;
+ }
+
+ /** Convert {@code itr} to strings capturing values only. */
+ protected static <T> Set<String> toValue(Iterator<TimestampedValue<T>> itr) {
+ Set<String> strings = new HashSet<>();
+ while (itr.hasNext()) {
+ strings.add(itr.next().getValue().toString());
+ }
+ return strings;
+ }
+
+ /** Return simulator for query. */
+ public abstract AbstractSimulator<?, ?> simulator();
+
+ /** Return sub-sequence of results which are significant for model. */
+ protected Iterable<TimestampedValue<KnownSize>> relevantResults(
+ Iterable<TimestampedValue<KnownSize>> results) {
+ return results;
+ }
+
+ /**
+ * Convert iterator of elements to collection of strings to use when testing coherence of model
+ * against actual query results.
+ */
+ protected abstract <T> Collection<String> toCollection(Iterator<TimestampedValue<T>> itr);
+
+ /** Return assertion to use on results of pipeline for this query. */
+ public SerializableFunction<Iterable<TimestampedValue<KnownSize>>, Void> assertionFor() {
+ final Collection<String> expectedStrings = toCollection(simulator().results());
+ final String[] expectedStringsArray =
+ expectedStrings.toArray(new String[expectedStrings.size()]);
+
+ return new SerializableFunction<Iterable<TimestampedValue<KnownSize>>, Void>() {
+ @Override
+ public Void apply(Iterable<TimestampedValue<KnownSize>> actual) {
+ Collection<String> actualStrings = toCollection(relevantResults(actual).iterator());
+ Assert.assertThat("wrong pipeline output", actualStrings,
+ IsEqual.equalTo(expectedStrings));
+//compare without order
+// Assert.assertThat("wrong pipeline output", actualStrings,
+// IsIterableContainingInAnyOrder.containsInAnyOrder(expectedStringsArray));
+ return null;
+ }
+ };
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/a39cb800/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query0.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query0.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query0.java
index 84696c4..00a49a8 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query0.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query0.java
@@ -22,7 +22,6 @@ import java.io.ByteArrayOutputStream;
import java.io.IOException;
import org.apache.beam.integration.nexmark.NexmarkConfiguration;
-import org.apache.beam.integration.nexmark.NexmarkQuery;
import org.apache.beam.integration.nexmark.NexmarkUtils;
import org.apache.beam.integration.nexmark.model.Event;
import org.apache.beam.integration.nexmark.model.KnownSize;
http://git-wip-us.apache.org/repos/asf/beam/blob/a39cb800/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query0Model.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query0Model.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query0Model.java
index 991b1d4..6fb6613 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query0Model.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query0Model.java
@@ -20,9 +20,7 @@ package org.apache.beam.integration.nexmark.queries;
import java.util.Collection;
import java.util.Iterator;
-import org.apache.beam.integration.nexmark.AbstractSimulator;
import org.apache.beam.integration.nexmark.NexmarkConfiguration;
-import org.apache.beam.integration.nexmark.NexmarkQueryModel;
import org.apache.beam.integration.nexmark.NexmarkUtils;
import org.apache.beam.integration.nexmark.model.Event;
import org.apache.beam.sdk.values.TimestampedValue;
@@ -56,7 +54,7 @@ public class Query0Model extends NexmarkQueryModel {
}
@Override
- protected AbstractSimulator<?, ?> simulator() {
+ public AbstractSimulator<?, ?> simulator() {
return new Simulator(configuration);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/a39cb800/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query1.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query1.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query1.java
index 0be77ce..8d90b70 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query1.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query1.java
@@ -18,7 +18,6 @@
package org.apache.beam.integration.nexmark.queries;
import org.apache.beam.integration.nexmark.NexmarkConfiguration;
-import org.apache.beam.integration.nexmark.NexmarkQuery;
import org.apache.beam.integration.nexmark.NexmarkUtils;
import org.apache.beam.integration.nexmark.model.Bid;
import org.apache.beam.integration.nexmark.model.Event;
http://git-wip-us.apache.org/repos/asf/beam/blob/a39cb800/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query10.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query10.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query10.java
index d9b3557..c919691 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query10.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query10.java
@@ -28,7 +28,6 @@ import java.nio.channels.WritableByteChannel;
import java.util.concurrent.ThreadLocalRandom;
import javax.annotation.Nullable;
import org.apache.beam.integration.nexmark.NexmarkConfiguration;
-import org.apache.beam.integration.nexmark.NexmarkQuery;
import org.apache.beam.integration.nexmark.NexmarkUtils;
import org.apache.beam.integration.nexmark.model.Done;
import org.apache.beam.integration.nexmark.model.Event;
http://git-wip-us.apache.org/repos/asf/beam/blob/a39cb800/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query11.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query11.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query11.java
index a8a61ae..fd936a9 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query11.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query11.java
@@ -18,7 +18,6 @@
package org.apache.beam.integration.nexmark.queries;
import org.apache.beam.integration.nexmark.NexmarkConfiguration;
-import org.apache.beam.integration.nexmark.NexmarkQuery;
import org.apache.beam.integration.nexmark.NexmarkUtils;
import org.apache.beam.integration.nexmark.model.Bid;
import org.apache.beam.integration.nexmark.model.BidsPerSession;
http://git-wip-us.apache.org/repos/asf/beam/blob/a39cb800/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query12.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query12.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query12.java
index a5db504..20f45fb 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query12.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query12.java
@@ -18,7 +18,6 @@
package org.apache.beam.integration.nexmark.queries;
import org.apache.beam.integration.nexmark.NexmarkConfiguration;
-import org.apache.beam.integration.nexmark.NexmarkQuery;
import org.apache.beam.integration.nexmark.NexmarkUtils;
import org.apache.beam.integration.nexmark.model.Bid;
import org.apache.beam.integration.nexmark.model.BidsPerSession;
http://git-wip-us.apache.org/repos/asf/beam/blob/a39cb800/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query1Model.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query1Model.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query1Model.java
index 58037d3..0388687 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query1Model.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query1Model.java
@@ -21,9 +21,7 @@ import java.io.Serializable;
import java.util.Collection;
import java.util.Iterator;
-import org.apache.beam.integration.nexmark.AbstractSimulator;
import org.apache.beam.integration.nexmark.NexmarkConfiguration;
-import org.apache.beam.integration.nexmark.NexmarkQueryModel;
import org.apache.beam.integration.nexmark.NexmarkUtils;
import org.apache.beam.integration.nexmark.model.Bid;
import org.apache.beam.integration.nexmark.model.Event;
http://git-wip-us.apache.org/repos/asf/beam/blob/a39cb800/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query2.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query2.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query2.java
index 4c8f878..a365b97 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query2.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query2.java
@@ -18,7 +18,6 @@
package org.apache.beam.integration.nexmark.queries;
import org.apache.beam.integration.nexmark.NexmarkConfiguration;
-import org.apache.beam.integration.nexmark.NexmarkQuery;
import org.apache.beam.integration.nexmark.NexmarkUtils;
import org.apache.beam.integration.nexmark.model.AuctionPrice;
import org.apache.beam.integration.nexmark.model.Bid;
http://git-wip-us.apache.org/repos/asf/beam/blob/a39cb800/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query2Model.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query2Model.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query2Model.java
index f578e4c..e00992f 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query2Model.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query2Model.java
@@ -21,9 +21,7 @@ import java.io.Serializable;
import java.util.Collection;
import java.util.Iterator;
-import org.apache.beam.integration.nexmark.AbstractSimulator;
import org.apache.beam.integration.nexmark.NexmarkConfiguration;
-import org.apache.beam.integration.nexmark.NexmarkQueryModel;
import org.apache.beam.integration.nexmark.NexmarkUtils;
import org.apache.beam.integration.nexmark.model.AuctionPrice;
import org.apache.beam.integration.nexmark.model.Bid;