You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ie...@apache.org on 2017/08/23 17:09:57 UTC
[49/55] [abbrv] beam git commit: Move module
beam-integration-java-nexmark to beam-sdks-java-nexmark
http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/AbstractSimulator.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/AbstractSimulator.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/AbstractSimulator.java
deleted file mode 100644
index 1395182..0000000
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/AbstractSimulator.java
+++ /dev/null
@@ -1,211 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.integration.nexmark.queries;
-
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import javax.annotation.Nullable;
-
-import org.apache.beam.integration.nexmark.NexmarkUtils;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.values.TimestampedValue;
-import org.joda.time.Duration;
-import org.joda.time.Instant;
-
-/**
- * Abstract base class for simulator of a query.
- *
- * @param <InputT> Type of input elements.
- * @param <OutputT> Type of output elements.
- */
-public abstract class AbstractSimulator<InputT, OutputT> {
- /** Window size for action bucket sampling. */
- private static final Duration WINDOW_SIZE = Duration.standardMinutes(1);
-
- /** Input event stream we should draw from. */
- private final Iterator<TimestampedValue<InputT>> input;
-
- /** Set to true when no more results. */
- private boolean isDone;
-
- /**
- * Results which have not yet been returned by the {@link #results} iterator.
- */
- private final List<TimestampedValue<OutputT>> pendingResults;
-
- /**
- * Current window timestamp (ms since epoch).
- */
- private long currentWindow;
-
- /**
- * Number of (possibly intermediate) results for the current window.
- */
- private long currentCount;
-
- /**
- * Result counts per window which have not yet been returned by the {@link #resultsPerWindow}
- * iterator.
- */
- private final List<Long> pendingCounts;
-
- public AbstractSimulator(Iterator<TimestampedValue<InputT>> input) {
- this.input = input;
- isDone = false;
- pendingResults = new ArrayList<>();
- currentWindow = BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis();
- currentCount = 0;
- pendingCounts = new ArrayList<>();
- }
-
- /** Called by implementors of {@link #run}: Fetch the next input element. */
- @Nullable
- TimestampedValue<InputT> nextInput() {
- if (!input.hasNext()) {
- return null;
- }
- TimestampedValue<InputT> timestampedInput = input.next();
- NexmarkUtils.info("input: %s", timestampedInput);
- return timestampedInput;
- }
-
- /**
- * Called by implementors of {@link #run}: Capture an intermediate result, for the purpose of
- * recording the expected activity of the query over time.
- */
- void addIntermediateResult(TimestampedValue<OutputT> result) {
- NexmarkUtils.info("intermediate result: %s", result);
- updateCounts(result.getTimestamp());
- }
-
- /**
- * Called by implementors of {@link #run}: Capture a final result, for the purpose of checking
- * semantic correctness.
- */
- void addResult(TimestampedValue<OutputT> result) {
- NexmarkUtils.info("result: %s", result);
- pendingResults.add(result);
- updateCounts(result.getTimestamp());
- }
-
- /**
- * Update window and counts.
- */
- private void updateCounts(Instant timestamp) {
- long window = timestamp.getMillis() - timestamp.getMillis() % WINDOW_SIZE.getMillis();
- if (window > currentWindow) {
- if (currentWindow > BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis()) {
- pendingCounts.add(currentCount);
- }
- currentCount = 0;
- currentWindow = window;
- }
- currentCount++;
- }
-
- /** Called by implementors of {@link #run}: Record that no more results will be emitted. */
- void allDone() {
- isDone = true;
- }
-
- /**
- * Overridden by derived classes to do the next increment of work. Each call should
- * call one or more of {@link #nextInput}, {@link #addIntermediateResult}, {@link #addResult}
- * or {@link #allDone}. It is ok for a single call to emit more than one result via
- * {@link #addResult}. It is ok for a single call to run the entire simulation, though
- * this will prevent the {@link #results} and {@link #resultsPerWindow} iterators to
- * stall.
- */
- protected abstract void run();
-
- /**
- * Return iterator over all expected timestamped results. The underlying simulator state is
- * changed. Only one of {@link #results} or {@link #resultsPerWindow} can be called.
- */
- public Iterator<TimestampedValue<OutputT>> results() {
- return new Iterator<TimestampedValue<OutputT>>() {
- @Override
- public boolean hasNext() {
- while (true) {
- if (!pendingResults.isEmpty()) {
- return true;
- }
- if (isDone) {
- return false;
- }
- run();
- }
- }
-
- @Override
- public TimestampedValue<OutputT> next() {
- TimestampedValue<OutputT> result = pendingResults.get(0);
- pendingResults.remove(0);
- return result;
- }
-
- @Override
- public void remove() {
- throw new UnsupportedOperationException();
- }
- };
- }
-
- /**
- * Return an iterator over the number of results per {@link #WINDOW_SIZE} period. The underlying
- * simulator state is changed. Only one of {@link #results} or {@link #resultsPerWindow} can be
- * called.
- */
- public Iterator<Long> resultsPerWindow() {
- return new Iterator<Long>() {
- @Override
- public boolean hasNext() {
- while (true) {
- if (!pendingCounts.isEmpty()) {
- return true;
- }
- if (isDone) {
- if (currentCount > 0) {
- pendingCounts.add(currentCount);
- currentCount = 0;
- currentWindow = BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis();
- return true;
- } else {
- return false;
- }
- }
- run();
- }
- }
-
- @Override
- public Long next() {
- Long result = pendingCounts.get(0);
- pendingCounts.remove(0);
- return result;
- }
-
- @Override
- public void remove() {
- throw new UnsupportedOperationException();
- }
- };
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/NexmarkQuery.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/NexmarkQuery.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/NexmarkQuery.java
deleted file mode 100644
index 8b74282..0000000
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/NexmarkQuery.java
+++ /dev/null
@@ -1,270 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.integration.nexmark.queries;
-
-import org.apache.beam.integration.nexmark.Monitor;
-import org.apache.beam.integration.nexmark.NexmarkConfiguration;
-import org.apache.beam.integration.nexmark.NexmarkUtils;
-import org.apache.beam.integration.nexmark.model.Auction;
-import org.apache.beam.integration.nexmark.model.Bid;
-import org.apache.beam.integration.nexmark.model.Event;
-import org.apache.beam.integration.nexmark.model.KnownSize;
-import org.apache.beam.integration.nexmark.model.Person;
-import org.apache.beam.sdk.metrics.Counter;
-import org.apache.beam.sdk.metrics.Metrics;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.Filter;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.SerializableFunction;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.TimestampedValue;
-import org.apache.beam.sdk.values.TupleTag;
-import org.joda.time.Instant;
-
-/**
- * Base class for the eight 'NEXMark' queries. Supplies some fragments common to
- * multiple queries.
- */
-public abstract class NexmarkQuery
- extends PTransform<PCollection<Event>, PCollection<TimestampedValue<KnownSize>>> {
- public static final TupleTag<Auction> AUCTION_TAG = new TupleTag<>("auctions");
- public static final TupleTag<Bid> BID_TAG = new TupleTag<>("bids");
- static final TupleTag<Person> PERSON_TAG = new TupleTag<>("person");
-
- /** Predicate to detect a new person event. */
- private static final SerializableFunction<Event, Boolean> IS_NEW_PERSON =
- new SerializableFunction<Event, Boolean>() {
- @Override
- public Boolean apply(Event event) {
- return event.newPerson != null;
- }
- };
-
- /** DoFn to convert a new person event to a person. */
- private static final DoFn<Event, Person> AS_PERSON = new DoFn<Event, Person>() {
- @ProcessElement
- public void processElement(ProcessContext c) {
- c.output(c.element().newPerson);
- }
- };
-
- /** Predicate to detect a new auction event. */
- private static final SerializableFunction<Event, Boolean> IS_NEW_AUCTION =
- new SerializableFunction<Event, Boolean>() {
- @Override
- public Boolean apply(Event event) {
- return event.newAuction != null;
- }
- };
-
- /** DoFn to convert a new auction event to an auction. */
- private static final DoFn<Event, Auction> AS_AUCTION = new DoFn<Event, Auction>() {
- @ProcessElement
- public void processElement(ProcessContext c) {
- c.output(c.element().newAuction);
- }
- };
-
- /** Predicate to detect a new bid event. */
- private static final SerializableFunction<Event, Boolean> IS_BID =
- new SerializableFunction<Event, Boolean>() {
- @Override
- public Boolean apply(Event event) {
- return event.bid != null;
- }
- };
-
- /** DoFn to convert a bid event to a bid. */
- private static final DoFn<Event, Bid> AS_BID = new DoFn<Event, Bid>() {
- @ProcessElement
- public void processElement(ProcessContext c) {
- c.output(c.element().bid);
- }
- };
-
- /** Transform to key each person by their id. */
- static final ParDo.SingleOutput<Person, KV<Long, Person>> PERSON_BY_ID =
- ParDo.of(new DoFn<Person, KV<Long, Person>>() {
- @ProcessElement
- public void processElement(ProcessContext c) {
- c.output(KV.of(c.element().id, c.element()));
- }
- });
-
- /** Transform to key each auction by its id. */
- static final ParDo.SingleOutput<Auction, KV<Long, Auction>> AUCTION_BY_ID =
- ParDo.of(new DoFn<Auction, KV<Long, Auction>>() {
- @ProcessElement
- public void processElement(ProcessContext c) {
- c.output(KV.of(c.element().id, c.element()));
- }
- });
-
- /** Transform to key each auction by its seller id. */
- static final ParDo.SingleOutput<Auction, KV<Long, Auction>> AUCTION_BY_SELLER =
- ParDo.of(new DoFn<Auction, KV<Long, Auction>>() {
- @ProcessElement
- public void processElement(ProcessContext c) {
- c.output(KV.of(c.element().seller, c.element()));
- }
- });
-
- /** Transform to key each bid by it's auction id. */
- static final ParDo.SingleOutput<Bid, KV<Long, Bid>> BID_BY_AUCTION =
- ParDo.of(new DoFn<Bid, KV<Long, Bid>>() {
- @ProcessElement
- public void processElement(ProcessContext c) {
- c.output(KV.of(c.element().auction, c.element()));
- }
- });
-
- /** Transform to project the auction id from each bid. */
- static final ParDo.SingleOutput<Bid, Long> BID_TO_AUCTION =
- ParDo.of(new DoFn<Bid, Long>() {
- @ProcessElement
- public void processElement(ProcessContext c) {
- c.output(c.element().auction);
- }
- });
-
- /** Transform to project the price from each bid. */
- static final ParDo.SingleOutput<Bid, Long> BID_TO_PRICE =
- ParDo.of(new DoFn<Bid, Long>() {
- @ProcessElement
- public void processElement(ProcessContext c) {
- c.output(c.element().price);
- }
- });
-
- /** Transform to emit each event with the timestamp embedded within it. */
- public static final ParDo.SingleOutput<Event, Event> EVENT_TIMESTAMP_FROM_DATA =
- ParDo.of(new DoFn<Event, Event>() {
- @ProcessElement
- public void processElement(ProcessContext c) {
- Event e = c.element();
- if (e.bid != null) {
- c.outputWithTimestamp(e, new Instant(e.bid.dateTime));
- } else if (e.newPerson != null) {
- c.outputWithTimestamp(e, new Instant(e.newPerson.dateTime));
- } else if (e.newAuction != null) {
- c.outputWithTimestamp(e, new Instant(e.newAuction.dateTime));
- }
- }
- });
-
- /**
- * Transform to filter for just the new auction events.
- */
- public static final PTransform<PCollection<Event>, PCollection<Auction>> JUST_NEW_AUCTIONS =
- new PTransform<PCollection<Event>, PCollection<Auction>>("justNewAuctions") {
- @Override
- public PCollection<Auction> expand(PCollection<Event> input) {
- return input.apply("IsNewAuction", Filter.by(IS_NEW_AUCTION))
- .apply("AsAuction", ParDo.of(AS_AUCTION));
- }
- };
-
- /**
- * Transform to filter for just the new person events.
- */
- public static final PTransform<PCollection<Event>, PCollection<Person>> JUST_NEW_PERSONS =
- new PTransform<PCollection<Event>, PCollection<Person>>("justNewPersons") {
- @Override
- public PCollection<Person> expand(PCollection<Event> input) {
- return input.apply("IsNewPerson", Filter.by(IS_NEW_PERSON))
- .apply("AsPerson", ParDo.of(AS_PERSON));
- }
- };
-
- /**
- * Transform to filter for just the bid events.
- */
- public static final PTransform<PCollection<Event>, PCollection<Bid>> JUST_BIDS =
- new PTransform<PCollection<Event>, PCollection<Bid>>("justBids") {
- @Override
- public PCollection<Bid> expand(PCollection<Event> input) {
- return input.apply("IsBid", Filter.by(IS_BID))
- .apply("AsBid", ParDo.of(AS_BID));
- }
- };
-
- final NexmarkConfiguration configuration;
- public final Monitor<Event> eventMonitor;
- public final Monitor<KnownSize> resultMonitor;
- private final Monitor<Event> endOfStreamMonitor;
- private final Counter fatalCounter;
-
- NexmarkQuery(NexmarkConfiguration configuration, String name) {
- super(name);
- this.configuration = configuration;
- if (configuration.debug) {
- eventMonitor = new Monitor<>(name + ".Events", "event");
- resultMonitor = new Monitor<>(name + ".Results", "result");
- endOfStreamMonitor = new Monitor<>(name + ".EndOfStream", "end");
- fatalCounter = Metrics.counter(name , "fatal");
- } else {
- eventMonitor = null;
- resultMonitor = null;
- endOfStreamMonitor = null;
- fatalCounter = null;
- }
- }
-
- /**
- * Implement the actual query. All we know about the result is it has a known encoded size.
- */
- protected abstract PCollection<KnownSize> applyPrim(PCollection<Event> events);
-
- @Override
- public PCollection<TimestampedValue<KnownSize>> expand(PCollection<Event> events) {
-
- if (configuration.debug) {
- events =
- events
- // Monitor events as they go by.
- .apply(name + ".Monitor", eventMonitor.getTransform())
- // Count each type of event.
- .apply(name + ".Snoop", NexmarkUtils.snoop(name));
- }
-
- if (configuration.cpuDelayMs > 0) {
- // Slow down by pegging one core at 100%.
- events = events.apply(name + ".CpuDelay",
- NexmarkUtils.<Event>cpuDelay(name, configuration.cpuDelayMs));
- }
-
- if (configuration.diskBusyBytes > 0) {
- // Slow down by forcing bytes to durable store.
- events = events.apply(name + ".DiskBusy",
- NexmarkUtils.<Event>diskBusy(configuration.diskBusyBytes));
- }
-
- // Run the query.
- PCollection<KnownSize> queryResults = applyPrim(events);
-
- if (configuration.debug) {
- // Monitor results as they go by.
- queryResults = queryResults.apply(name + ".Debug", resultMonitor.getTransform());
- }
-
- // Timestamp the query results.
- return queryResults.apply(name + ".Stamp", NexmarkUtils.<KnownSize>stamp(name));
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/NexmarkQueryModel.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/NexmarkQueryModel.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/NexmarkQueryModel.java
deleted file mode 100644
index bfa668b..0000000
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/NexmarkQueryModel.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.integration.nexmark.queries;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Set;
-
-import org.apache.beam.integration.nexmark.NexmarkConfiguration;
-import org.apache.beam.integration.nexmark.model.KnownSize;
-import org.apache.beam.sdk.transforms.SerializableFunction;
-import org.apache.beam.sdk.values.TimestampedValue;
-
-
-import org.hamcrest.core.IsEqual;
-import org.joda.time.Duration;
-import org.joda.time.Instant;
-import org.junit.Assert;
-
-/**
- * Base class for models of the eight NEXMark queries. Provides an assertion function which can be
- * applied against the actual query results to check their consistency with the model.
- */
-public abstract class NexmarkQueryModel implements Serializable {
- public final NexmarkConfiguration configuration;
-
- NexmarkQueryModel(NexmarkConfiguration configuration) {
- this.configuration = configuration;
- }
-
- /**
- * Return the start of the most recent window of {@code size} and {@code period} which ends
- * strictly before {@code timestamp}.
- */
- static Instant windowStart(Duration size, Duration period, Instant timestamp) {
- long ts = timestamp.getMillis();
- long p = period.getMillis();
- long lim = ts - ts % p;
- long s = size.getMillis();
- return new Instant(lim - s);
- }
-
- /** Convert {@code itr} to strings capturing values, timestamps and order. */
- static <T> List<String> toValueTimestampOrder(Iterator<TimestampedValue<T>> itr) {
- List<String> strings = new ArrayList<>();
- while (itr.hasNext()) {
- strings.add(itr.next().toString());
- }
- return strings;
- }
-
- /** Convert {@code itr} to strings capturing values and order. */
- static <T> List<String> toValueOrder(Iterator<TimestampedValue<T>> itr) {
- List<String> strings = new ArrayList<>();
- while (itr.hasNext()) {
- strings.add(itr.next().getValue().toString());
- }
- return strings;
- }
-
- /** Convert {@code itr} to strings capturing values only. */
- static <T> Set<String> toValue(Iterator<TimestampedValue<T>> itr) {
- Set<String> strings = new HashSet<>();
- while (itr.hasNext()) {
- strings.add(itr.next().getValue().toString());
- }
- return strings;
- }
-
- /** Return simulator for query. */
- public abstract AbstractSimulator<?, ?> simulator();
-
- /** Return sub-sequence of results which are significant for model. */
- Iterable<TimestampedValue<KnownSize>> relevantResults(
- Iterable<TimestampedValue<KnownSize>> results) {
- return results;
- }
-
- /**
- * Convert iterator of elements to collection of strings to use when testing coherence of model
- * against actual query results.
- */
- protected abstract <T> Collection<String> toCollection(Iterator<TimestampedValue<T>> itr);
-
- /** Return assertion to use on results of pipeline for this query. */
- public SerializableFunction<Iterable<TimestampedValue<KnownSize>>, Void> assertionFor() {
- final Collection<String> expectedStrings = toCollection(simulator().results());
-
- return new SerializableFunction<Iterable<TimestampedValue<KnownSize>>, Void>() {
- @Override
- public Void apply(Iterable<TimestampedValue<KnownSize>> actual) {
- Collection<String> actualStrings = toCollection(relevantResults(actual).iterator());
- Assert.assertThat("wrong pipeline output", actualStrings,
- IsEqual.equalTo(expectedStrings));
- return null;
- }
- };
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query0.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query0.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query0.java
deleted file mode 100644
index 00a49a8..0000000
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query0.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.integration.nexmark.queries;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-
-import org.apache.beam.integration.nexmark.NexmarkConfiguration;
-import org.apache.beam.integration.nexmark.NexmarkUtils;
-import org.apache.beam.integration.nexmark.model.Event;
-import org.apache.beam.integration.nexmark.model.KnownSize;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.metrics.Counter;
-import org.apache.beam.sdk.metrics.Metrics;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.values.PCollection;
-
-/**
- * Query 0: Pass events through unchanged. However, force them to do a round trip through
- * serialization so that we measure the impact of the choice of coders.
- */
-public class Query0 extends NexmarkQuery {
- public Query0(NexmarkConfiguration configuration) {
- super(configuration, "Query0");
- }
-
- private PCollection<Event> applyTyped(PCollection<Event> events) {
- final Coder<Event> coder = events.getCoder();
- return events
- // Force round trip through coder.
- .apply(name + ".Serialize",
- ParDo.of(new DoFn<Event, Event>() {
- private final Counter bytesMetric =
- Metrics.counter(name , "bytes");
-
- @ProcessElement
- public void processElement(ProcessContext c) throws CoderException, IOException {
- ByteArrayOutputStream outStream = new ByteArrayOutputStream();
- coder.encode(c.element(), outStream, Coder.Context.OUTER);
- byte[] byteArray = outStream.toByteArray();
- bytesMetric.inc((long) byteArray.length);
- ByteArrayInputStream inStream = new ByteArrayInputStream(byteArray);
- Event event = coder.decode(inStream, Coder.Context.OUTER);
- c.output(event);
- }
- }));
- }
-
- @Override
- protected PCollection<KnownSize> applyPrim(PCollection<Event> events) {
- return NexmarkUtils.castToKnownSize(name, applyTyped(events));
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query0Model.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query0Model.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query0Model.java
deleted file mode 100644
index e2522b8..0000000
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query0Model.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.integration.nexmark.queries;
-
-import java.util.Collection;
-import java.util.Iterator;
-
-import org.apache.beam.integration.nexmark.NexmarkConfiguration;
-import org.apache.beam.integration.nexmark.NexmarkUtils;
-import org.apache.beam.integration.nexmark.model.Event;
-import org.apache.beam.sdk.values.TimestampedValue;
-
-/**
- * A direct implementation of {@link Query0}.
- */
-public class Query0Model extends NexmarkQueryModel {
- /**
- * Simulator for query 0.
- */
- private static class Simulator extends AbstractSimulator<Event, Event> {
- public Simulator(NexmarkConfiguration configuration) {
- super(NexmarkUtils.standardEventIterator(configuration));
- }
-
- @Override
- protected void run() {
- TimestampedValue<Event> timestampedEvent = nextInput();
- if (timestampedEvent == null) {
- allDone();
- return;
- }
- addResult(timestampedEvent);
- }
- }
-
- public Query0Model(NexmarkConfiguration configuration) {
- super(configuration);
- }
-
- @Override
- public AbstractSimulator<?, ?> simulator() {
- return new Simulator(configuration);
- }
-
- @Override
- protected <T> Collection<String> toCollection(Iterator<TimestampedValue<T>> itr) {
- return toValueTimestampOrder(itr);
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query1.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query1.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query1.java
deleted file mode 100644
index 8d90b70..0000000
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query1.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.integration.nexmark.queries;
-
-import org.apache.beam.integration.nexmark.NexmarkConfiguration;
-import org.apache.beam.integration.nexmark.NexmarkUtils;
-import org.apache.beam.integration.nexmark.model.Bid;
-import org.apache.beam.integration.nexmark.model.Event;
-import org.apache.beam.integration.nexmark.model.KnownSize;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.values.PCollection;
-
-/**
- * Query 1, 'Currency Conversion'. Convert each bid value from dollars to euros.
- * In CQL syntax:
- *
- * <pre>
- * SELECT Istream(auction, DOLTOEUR(price), bidder, datetime)
- * FROM bid [ROWS UNBOUNDED];
- * </pre>
- *
- * <p>To make things more interesting, allow the 'currency conversion' to be arbitrarily
- * slowed down.
- */
-public class Query1 extends NexmarkQuery {
- public Query1(NexmarkConfiguration configuration) {
- super(configuration, "Query1");
- }
-
- private PCollection<Bid> applyTyped(PCollection<Event> events) {
- return events
- // Only want the bid events.
- .apply(JUST_BIDS)
-
- // Map the conversion function over all bids.
- .apply(name + ".ToEuros",
- ParDo.of(new DoFn<Bid, Bid>() {
- @ProcessElement
- public void processElement(ProcessContext c) {
- Bid bid = c.element();
- c.output(new Bid(
- bid.auction, bid.bidder, (bid.price * 89) / 100, bid.dateTime, bid.extra));
- }
- }));
- }
-
- @Override
- protected PCollection<KnownSize> applyPrim(PCollection<Event> events) {
- return NexmarkUtils.castToKnownSize(name, applyTyped(events));
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query10.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query10.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query10.java
deleted file mode 100644
index 378d01e..0000000
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query10.java
+++ /dev/null
@@ -1,367 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.integration.nexmark.queries;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.io.Serializable;
-import java.nio.channels.Channels;
-import java.nio.channels.WritableByteChannel;
-import java.util.concurrent.ThreadLocalRandom;
-import javax.annotation.Nullable;
-import org.apache.beam.integration.nexmark.NexmarkConfiguration;
-import org.apache.beam.integration.nexmark.NexmarkUtils;
-import org.apache.beam.integration.nexmark.model.Done;
-import org.apache.beam.integration.nexmark.model.Event;
-import org.apache.beam.integration.nexmark.model.KnownSize;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.extensions.gcp.options.GcsOptions;
-import org.apache.beam.sdk.metrics.Counter;
-import org.apache.beam.sdk.metrics.Metrics;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.GroupByKey;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.windowing.AfterEach;
-import org.apache.beam.sdk.transforms.windowing.AfterFirst;
-import org.apache.beam.sdk.transforms.windowing.AfterPane;
-import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
-import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.FixedWindows;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing;
-import org.apache.beam.sdk.transforms.windowing.Repeatedly;
-import org.apache.beam.sdk.transforms.windowing.Window;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-import org.joda.time.Duration;
-import org.joda.time.Instant;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Query "10", 'Log to sharded files' (Not in original suite.)
- *
- * <p>Every windowSizeSec, save all events from the last period into 2*maxWorkers log files.
- */
-public class Query10 extends NexmarkQuery {
- private static final Logger LOG = LoggerFactory.getLogger(Query10.class);
- private static final int CHANNEL_BUFFER = 8 << 20; // 8MB
- private static final int NUM_SHARDS_PER_WORKER = 5;
- private static final Duration LATE_BATCHING_PERIOD = Duration.standardSeconds(10);
-
- /**
- * Capture everything we need to know about the records in a single output file.
- */
- private static class OutputFile implements Serializable {
- /** Maximum possible timestamp of records in file. */
- private final Instant maxTimestamp;
- /** Shard within window. */
- private final String shard;
- /** Index of file in all files in shard. */
- private final long index;
- /** Timing of records in this file. */
- private final PaneInfo.Timing timing;
- /** Path to file containing records, or {@literal null} if no output required. */
- @Nullable
- private final String filename;
-
- public OutputFile(
- Instant maxTimestamp,
- String shard,
- long index,
- PaneInfo.Timing timing,
- @Nullable String filename) {
- this.maxTimestamp = maxTimestamp;
- this.shard = shard;
- this.index = index;
- this.timing = timing;
- this.filename = filename;
- }
-
- @Override
- public String toString() {
- return String.format("%s %s %d %s %s%n", maxTimestamp, shard, index, timing, filename);
- }
- }
-
- /**
- * GCS uri prefix for all log and 'finished' files. If null they won't be written.
- */
- @Nullable
- private String outputPath;
-
- /**
- * Maximum number of workers, used to determine log sharding factor.
- */
- private int maxNumWorkers;
-
- public Query10(NexmarkConfiguration configuration) {
- super(configuration, "Query10");
- }
-
- public void setOutputPath(@Nullable String outputPath) {
- this.outputPath = outputPath;
- }
-
- public void setMaxNumWorkers(int maxNumWorkers) {
- this.maxNumWorkers = maxNumWorkers;
- }
-
- /**
- * Return channel for writing bytes to GCS.
- */
- private WritableByteChannel openWritableGcsFile(GcsOptions options, String filename)
- throws IOException {
- //TODO
- // Fix after PR: right now this is a specific Google added use case
- // Discuss it on ML: shall we keep GCS or use HDFS or use a generic beam filesystem way.
- throw new UnsupportedOperationException("Disabled after removal of GcsIOChannelFactory");
- }
-
- /** Return a short string to describe {@code timing}. */
- private String timingToString(PaneInfo.Timing timing) {
- switch (timing) {
- case EARLY:
- return "E";
- case ON_TIME:
- return "O";
- case LATE:
- return "L";
- }
- throw new RuntimeException(); // cases are exhaustive
- }
-
- /** Construct an {@link OutputFile} for {@code pane} in {@code window} for {@code shard}. */
- private OutputFile outputFileFor(BoundedWindow window, String shard, PaneInfo pane) {
- @Nullable String filename =
- outputPath == null
- ? null
- : String.format("%s/LOG-%s-%s-%03d-%s-%x",
- outputPath, window.maxTimestamp(), shard, pane.getIndex(),
- timingToString(pane.getTiming()),
- ThreadLocalRandom.current().nextLong());
- return new OutputFile(window.maxTimestamp(), shard, pane.getIndex(),
- pane.getTiming(), filename);
- }
-
- /**
- * Return path to which we should write the index for {@code window}, or {@literal null}
- * if no output required.
- */
- @Nullable
- private String indexPathFor(BoundedWindow window) {
- if (outputPath == null) {
- return null;
- }
- return String.format("%s/INDEX-%s", outputPath, window.maxTimestamp());
- }
-
- private PCollection<Done> applyTyped(PCollection<Event> events) {
- final int numLogShards = maxNumWorkers * NUM_SHARDS_PER_WORKER;
-
- return events
- .apply(name + ".ShardEvents",
- ParDo.of(new DoFn<Event, KV<String, Event>>() {
- private final Counter lateCounter = Metrics.counter(name , "actuallyLateEvent");
- private final Counter onTimeCounter = Metrics.counter(name , "onTimeCounter");
-
- @ProcessElement
- public void processElement(ProcessContext c) {
- if (c.element().hasAnnotation("LATE")) {
- lateCounter.inc();
- LOG.info("Observed late: %s", c.element());
- } else {
- onTimeCounter.inc();
- }
- int shardNum = (int) Math.abs((long) c.element().hashCode() % numLogShards);
- String shard = String.format("shard-%05d-of-%05d", shardNum, numLogShards);
- c.output(KV.of(shard, c.element()));
- }
- }))
- .apply(name + ".WindowEvents",
- Window.<KV<String, Event>>into(
- FixedWindows.of(Duration.standardSeconds(configuration.windowSizeSec)))
- .triggering(AfterEach.inOrder(
- Repeatedly
- .forever(AfterPane.elementCountAtLeast(configuration.maxLogEvents))
- .orFinally(AfterWatermark.pastEndOfWindow()),
- Repeatedly.forever(
- AfterFirst.of(AfterPane.elementCountAtLeast(configuration.maxLogEvents),
- AfterProcessingTime.pastFirstElementInPane()
- .plusDelayOf(LATE_BATCHING_PERIOD)))))
- .discardingFiredPanes()
- // Use a 1 day allowed lateness so that any forgotten hold will stall the
- // pipeline for that period and be very noticeable.
- .withAllowedLateness(Duration.standardDays(1)))
- .apply(name + ".GroupByKey", GroupByKey.<String, Event>create())
- .apply(name + ".CheckForLateEvents",
- ParDo.of(new DoFn<KV<String, Iterable<Event>>,
- KV<String, Iterable<Event>>>() {
- private final Counter earlyCounter = Metrics.counter(name , "earlyShard");
- private final Counter onTimeCounter = Metrics.counter(name , "onTimeShard");
- private final Counter lateCounter = Metrics.counter(name , "lateShard");
- private final Counter unexpectedLatePaneCounter =
- Metrics.counter(name , "ERROR_unexpectedLatePane");
- private final Counter unexpectedOnTimeElementCounter =
- Metrics.counter(name , "ERROR_unexpectedOnTimeElement");
-
- @ProcessElement
- public void processElement(ProcessContext c, BoundedWindow window) {
- int numLate = 0;
- int numOnTime = 0;
- for (Event event : c.element().getValue()) {
- if (event.hasAnnotation("LATE")) {
- numLate++;
- } else {
- numOnTime++;
- }
- }
- String shard = c.element().getKey();
- LOG.info(String.format(
- "%s with timestamp %s has %d actually late and %d on-time "
- + "elements in pane %s for window %s",
- shard, c.timestamp(), numLate, numOnTime, c.pane(),
- window.maxTimestamp()));
- if (c.pane().getTiming() == PaneInfo.Timing.LATE) {
- if (numLate == 0) {
- LOG.error(
- "ERROR! No late events in late pane for %s", shard);
- unexpectedLatePaneCounter.inc();
- }
- if (numOnTime > 0) {
- LOG.error(
- "ERROR! Have %d on-time events in late pane for %s",
- numOnTime, shard);
- unexpectedOnTimeElementCounter.inc();
- }
- lateCounter.inc();
- } else if (c.pane().getTiming() == PaneInfo.Timing.EARLY) {
- if (numOnTime + numLate < configuration.maxLogEvents) {
- LOG.error(
- "ERROR! Only have %d events in early pane for %s",
- numOnTime + numLate, shard);
- }
- earlyCounter.inc();
- } else {
- onTimeCounter.inc();
- }
- c.output(c.element());
- }
- }))
- .apply(name + ".UploadEvents",
- ParDo.of(new DoFn<KV<String, Iterable<Event>>,
- KV<Void, OutputFile>>() {
- private final Counter savedFileCounter = Metrics.counter(name , "savedFile");
- private final Counter writtenRecordsCounter = Metrics.counter(name , "writtenRecords");
-
- @ProcessElement
- public void processElement(ProcessContext c, BoundedWindow window)
- throws IOException {
- String shard = c.element().getKey();
- GcsOptions options = c.getPipelineOptions().as(GcsOptions.class);
- OutputFile outputFile = outputFileFor(window, shard, c.pane());
- LOG.info(String.format(
- "Writing %s with record timestamp %s, window timestamp %s, pane %s",
- shard, c.timestamp(), window.maxTimestamp(), c.pane()));
- if (outputFile.filename != null) {
- LOG.info("Beginning write to '%s'", outputFile.filename);
- int n = 0;
- try (OutputStream output =
- Channels.newOutputStream(openWritableGcsFile(options, outputFile
- .filename))) {
- for (Event event : c.element().getValue()) {
- Event.CODER.encode(event, output, Coder.Context.OUTER);
- writtenRecordsCounter.inc();
- if (++n % 10000 == 0) {
- LOG.info("So far written %d records to '%s'", n,
- outputFile.filename);
- }
- }
- }
- LOG.info("Written all %d records to '%s'", n, outputFile.filename);
- }
- savedFileCounter.inc();
- c.output(KV.<Void, OutputFile>of(null, outputFile));
- }
- }))
- // Clear fancy triggering from above.
- .apply(name + ".WindowLogFiles", Window.<KV<Void, OutputFile>>into(
- FixedWindows.of(Duration.standardSeconds(configuration.windowSizeSec)))
- .triggering(AfterWatermark.pastEndOfWindow())
- // We expect no late data here, but we'll assume the worst so we can detect any.
- .withAllowedLateness(Duration.standardDays(1))
- .discardingFiredPanes())
- // this GroupByKey allows to have one file per window
- .apply(name + ".GroupByKey2", GroupByKey.<Void, OutputFile>create())
- .apply(name + ".Index",
- ParDo.of(new DoFn<KV<Void, Iterable<OutputFile>>, Done>() {
- private final Counter unexpectedLateCounter =
- Metrics.counter(name , "ERROR_unexpectedLate");
- private final Counter unexpectedEarlyCounter =
- Metrics.counter(name , "ERROR_unexpectedEarly");
- private final Counter unexpectedIndexCounter =
- Metrics.counter(name , "ERROR_unexpectedIndex");
- private final Counter finalizedCounter = Metrics.counter(name , "indexed");
-
- @ProcessElement
- public void processElement(ProcessContext c, BoundedWindow window)
- throws IOException {
- if (c.pane().getTiming() == Timing.LATE) {
- unexpectedLateCounter.inc();
- LOG.error("ERROR! Unexpected LATE pane: %s", c.pane());
- } else if (c.pane().getTiming() == Timing.EARLY) {
- unexpectedEarlyCounter.inc();
- LOG.error("ERROR! Unexpected EARLY pane: %s", c.pane());
- } else if (c.pane().getTiming() == Timing.ON_TIME
- && c.pane().getIndex() != 0) {
- unexpectedIndexCounter.inc();
- LOG.error("ERROR! Unexpected ON_TIME pane index: %s", c.pane());
- } else {
- GcsOptions options = c.getPipelineOptions().as(GcsOptions.class);
- LOG.info(
- "Index with record timestamp %s, window timestamp %s, pane %s",
- c.timestamp(), window.maxTimestamp(), c.pane());
-
- @Nullable String filename = indexPathFor(window);
- if (filename != null) {
- LOG.info("Beginning write to '%s'", filename);
- int n = 0;
- try (OutputStream output =
- Channels.newOutputStream(
- openWritableGcsFile(options, filename))) {
- for (OutputFile outputFile : c.element().getValue()) {
- output.write(outputFile.toString().getBytes("UTF-8"));
- n++;
- }
- }
- LOG.info("Written all %d lines to '%s'", n, filename);
- }
- c.output(
- new Done("written for timestamp " + window.maxTimestamp()));
- finalizedCounter.inc();
- }
- }
- }));
- }
-
- @Override
- protected PCollection<KnownSize> applyPrim(PCollection<Event> events) {
- return NexmarkUtils.castToKnownSize(name, applyTyped(events));
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query11.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query11.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query11.java
deleted file mode 100644
index 6db9bcf..0000000
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query11.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.integration.nexmark.queries;
-
-import org.apache.beam.integration.nexmark.NexmarkConfiguration;
-import org.apache.beam.integration.nexmark.NexmarkUtils;
-import org.apache.beam.integration.nexmark.model.Bid;
-import org.apache.beam.integration.nexmark.model.BidsPerSession;
-import org.apache.beam.integration.nexmark.model.Event;
-import org.apache.beam.integration.nexmark.model.KnownSize;
-import org.apache.beam.sdk.transforms.Count;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.windowing.AfterPane;
-import org.apache.beam.sdk.transforms.windowing.Repeatedly;
-import org.apache.beam.sdk.transforms.windowing.Sessions;
-import org.apache.beam.sdk.transforms.windowing.Window;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-import org.joda.time.Duration;
-
-/**
- * Query "11", 'User sessions' (Not in original suite.)
- *
- * <p>Group bids by the same user into sessions with {@code windowSizeSec} max gap.
- * However limit the session to at most {@code maxLogEvents}. Emit the number of
- * bids per session.
- */
-public class Query11 extends NexmarkQuery {
- public Query11(NexmarkConfiguration configuration) {
- super(configuration, "Query11");
- }
-
- private PCollection<BidsPerSession> applyTyped(PCollection<Event> events) {
- PCollection<Long> bidders = events.apply(JUST_BIDS).apply(name + ".Rekey",
- ParDo.of(new DoFn<Bid, Long>() {
-
- @ProcessElement public void processElement(ProcessContext c) {
- Bid bid = c.element();
- c.output(bid.bidder);
- }
- }));
-
- PCollection<Long> biddersWindowed = bidders.apply(
- Window.<Long>into(
- Sessions.withGapDuration(Duration.standardSeconds(configuration.windowSizeSec)))
- .triggering(
- Repeatedly.forever(AfterPane.elementCountAtLeast(configuration.maxLogEvents)))
- .discardingFiredPanes()
- .withAllowedLateness(Duration.standardSeconds(configuration.occasionalDelaySec / 2)));
- return biddersWindowed.apply(Count.<Long>perElement())
- .apply(name + ".ToResult", ParDo.of(new DoFn<KV<Long, Long>, BidsPerSession>() {
-
- @ProcessElement public void processElement(ProcessContext c) {
- c.output(new BidsPerSession(c.element().getKey(), c.element().getValue()));
- }
- }));
- }
-
- @Override
- protected PCollection<KnownSize> applyPrim(PCollection<Event> events) {
- return NexmarkUtils.castToKnownSize(name, applyTyped(events));
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query12.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query12.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query12.java
deleted file mode 100644
index 20f45fb..0000000
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query12.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.integration.nexmark.queries;
-
-import org.apache.beam.integration.nexmark.NexmarkConfiguration;
-import org.apache.beam.integration.nexmark.NexmarkUtils;
-import org.apache.beam.integration.nexmark.model.Bid;
-import org.apache.beam.integration.nexmark.model.BidsPerSession;
-import org.apache.beam.integration.nexmark.model.Event;
-import org.apache.beam.integration.nexmark.model.KnownSize;
-import org.apache.beam.sdk.transforms.Count;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
-import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
-import org.apache.beam.sdk.transforms.windowing.Repeatedly;
-import org.apache.beam.sdk.transforms.windowing.Window;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-import org.joda.time.Duration;
-
-/**
- * Query "12", 'Processing time windows' (Not in original suite.)
- *
- * <p>Group bids by the same user into processing time windows of windowSize. Emit the count
- * of bids per window.
- */
-public class Query12 extends NexmarkQuery {
- public Query12(NexmarkConfiguration configuration) {
- super(configuration, "Query12");
- }
-
- private PCollection<BidsPerSession> applyTyped(PCollection<Event> events) {
- return events
- .apply(JUST_BIDS)
- .apply(ParDo.of(new DoFn<Bid, Long>() {
- @ProcessElement
- public void processElement(ProcessContext c){
- c.output(c.element().bidder);
- }
- }))
- .apply(Window.<Long>into(new GlobalWindows())
- .triggering(
- Repeatedly.forever(
- AfterProcessingTime.pastFirstElementInPane()
- .plusDelayOf(
- Duration.standardSeconds(configuration.windowSizeSec))))
- .discardingFiredPanes()
- .withAllowedLateness(Duration.ZERO))
- .apply(Count.<Long>perElement())
- .apply(name + ".ToResult",
- ParDo.of(new DoFn<KV<Long, Long>, BidsPerSession>() {
- @ProcessElement
- public void processElement(ProcessContext c) {
- c.output(
- new BidsPerSession(c.element().getKey(), c.element().getValue()));
- }
- }));
- }
-
- @Override
- protected PCollection<KnownSize> applyPrim(PCollection<Event> events) {
- return NexmarkUtils.castToKnownSize(name, applyTyped(events));
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query1Model.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query1Model.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query1Model.java
deleted file mode 100644
index f07db80..0000000
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query1Model.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.integration.nexmark.queries;
-
-import java.io.Serializable;
-import java.util.Collection;
-import java.util.Iterator;
-
-import org.apache.beam.integration.nexmark.NexmarkConfiguration;
-import org.apache.beam.integration.nexmark.NexmarkUtils;
-import org.apache.beam.integration.nexmark.model.Bid;
-import org.apache.beam.integration.nexmark.model.Event;
-import org.apache.beam.sdk.values.TimestampedValue;
-
-/**
- * A direct implementation of {@link Query1}.
- */
-public class Query1Model extends NexmarkQueryModel implements Serializable {
- /**
- * Simulator for query 1.
- */
- private static class Simulator extends AbstractSimulator<Event, Bid> {
- public Simulator(NexmarkConfiguration configuration) {
- super(NexmarkUtils.standardEventIterator(configuration));
- }
-
- @Override
- protected void run() {
- TimestampedValue<Event> timestampedEvent = nextInput();
- if (timestampedEvent == null) {
- allDone();
- return;
- }
- Event event = timestampedEvent.getValue();
- if (event.bid == null) {
- // Ignore non-bid events.
- return;
- }
- Bid bid = event.bid;
- Bid resultBid =
- new Bid(bid.auction, bid.bidder, bid.price * 89 / 100, bid.dateTime, bid.extra);
- TimestampedValue<Bid> result =
- TimestampedValue.of(resultBid, timestampedEvent.getTimestamp());
- addResult(result);
- }
- }
-
- public Query1Model(NexmarkConfiguration configuration) {
- super(configuration);
- }
-
- @Override
- public AbstractSimulator<?, ?> simulator() {
- return new Simulator(configuration);
- }
-
- @Override
- protected <T> Collection<String> toCollection(Iterator<TimestampedValue<T>> itr) {
- return toValueTimestampOrder(itr);
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query2.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query2.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query2.java
deleted file mode 100644
index a365b97..0000000
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query2.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.integration.nexmark.queries;
-
-import org.apache.beam.integration.nexmark.NexmarkConfiguration;
-import org.apache.beam.integration.nexmark.NexmarkUtils;
-import org.apache.beam.integration.nexmark.model.AuctionPrice;
-import org.apache.beam.integration.nexmark.model.Bid;
-import org.apache.beam.integration.nexmark.model.Event;
-import org.apache.beam.integration.nexmark.model.KnownSize;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.Filter;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.SerializableFunction;
-import org.apache.beam.sdk.values.PCollection;
-
-/**
- * Query 2, 'Filtering. Find bids with specific auction ids and show their bid price.
- * In CQL syntax:
- *
- * <pre>
- * SELECT Rstream(auction, price)
- * FROM Bid [NOW]
- * WHERE auction = 1007 OR auction = 1020 OR auction = 2001 OR auction = 2019 OR auction = 2087;
- * </pre>
- *
- * <p>As written that query will only yield a few hundred results over event streams of
- * arbitrary size. To make it more interesting we instead choose bids for every
- * {@code auctionSkip}'th auction.
- */
-public class Query2 extends NexmarkQuery {
- public Query2(NexmarkConfiguration configuration) {
- super(configuration, "Query2");
- }
-
- private PCollection<AuctionPrice> applyTyped(PCollection<Event> events) {
- return events
- // Only want the bid events.
- .apply(JUST_BIDS)
-
- // Select just the bids for the auctions we care about.
- .apply(Filter.by(new SerializableFunction<Bid, Boolean>() {
- @Override
- public Boolean apply(Bid bid) {
- return bid.auction % configuration.auctionSkip == 0;
- }
- }))
-
- // Project just auction id and price.
- .apply(name + ".Project",
- ParDo.of(new DoFn<Bid, AuctionPrice>() {
- @ProcessElement
- public void processElement(ProcessContext c) {
- Bid bid = c.element();
- c.output(new AuctionPrice(bid.auction, bid.price));
- }
- }));
- }
-
- @Override
- protected PCollection<KnownSize> applyPrim(PCollection<Event> events) {
- return NexmarkUtils.castToKnownSize(name, applyTyped(events));
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query2Model.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query2Model.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query2Model.java
deleted file mode 100644
index e00992f..0000000
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query2Model.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.integration.nexmark.queries;
-
-import java.io.Serializable;
-import java.util.Collection;
-import java.util.Iterator;
-
-import org.apache.beam.integration.nexmark.NexmarkConfiguration;
-import org.apache.beam.integration.nexmark.NexmarkUtils;
-import org.apache.beam.integration.nexmark.model.AuctionPrice;
-import org.apache.beam.integration.nexmark.model.Bid;
-import org.apache.beam.integration.nexmark.model.Event;
-import org.apache.beam.sdk.values.TimestampedValue;
-
-/**
- * A direct implementation of {@link Query2}.
- */
-public class Query2Model extends NexmarkQueryModel implements Serializable {
- /**
- * Simulator for query 2.
- */
- private class Simulator extends AbstractSimulator<Event, AuctionPrice> {
- public Simulator(NexmarkConfiguration configuration) {
- super(NexmarkUtils.standardEventIterator(configuration));
- }
-
- @Override
- protected void run() {
- TimestampedValue<Event> timestampedEvent = nextInput();
- if (timestampedEvent == null) {
- allDone();
- return;
- }
- Event event = timestampedEvent.getValue();
- if (event.bid == null) {
- // Ignore non bid events.
- return;
- }
- Bid bid = event.bid;
- if (bid.auction % configuration.auctionSkip != 0) {
- // Ignore bids for auctions we don't care about.
- return;
- }
- AuctionPrice auctionPrice = new AuctionPrice(bid.auction, bid.price);
- TimestampedValue<AuctionPrice> result =
- TimestampedValue.of(auctionPrice, timestampedEvent.getTimestamp());
- addResult(result);
- }
- }
-
- public Query2Model(NexmarkConfiguration configuration) {
- super(configuration);
- }
-
- @Override
- public AbstractSimulator<?, ?> simulator() {
- return new Simulator(configuration);
- }
-
- @Override
- protected <T> Collection<String> toCollection(Iterator<TimestampedValue<T>> itr) {
- return toValueTimestampOrder(itr);
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query3.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query3.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query3.java
deleted file mode 100644
index f2b66d7..0000000
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query3.java
+++ /dev/null
@@ -1,301 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.integration.nexmark.queries;
-
-import java.util.ArrayList;
-import java.util.List;
-import org.apache.beam.integration.nexmark.NexmarkConfiguration;
-import org.apache.beam.integration.nexmark.NexmarkUtils;
-import org.apache.beam.integration.nexmark.model.Auction;
-import org.apache.beam.integration.nexmark.model.Event;
-import org.apache.beam.integration.nexmark.model.KnownSize;
-import org.apache.beam.integration.nexmark.model.NameCityStateId;
-import org.apache.beam.integration.nexmark.model.Person;
-import org.apache.beam.sdk.coders.ListCoder;
-import org.apache.beam.sdk.metrics.Counter;
-import org.apache.beam.sdk.metrics.Metrics;
-import org.apache.beam.sdk.state.StateSpec;
-import org.apache.beam.sdk.state.StateSpecs;
-import org.apache.beam.sdk.state.TimeDomain;
-import org.apache.beam.sdk.state.Timer;
-import org.apache.beam.sdk.state.TimerSpec;
-import org.apache.beam.sdk.state.TimerSpecs;
-import org.apache.beam.sdk.state.ValueState;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.Filter;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.SerializableFunction;
-import org.apache.beam.sdk.transforms.join.CoGbkResult;
-import org.apache.beam.sdk.transforms.join.CoGroupByKey;
-import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple;
-import org.apache.beam.sdk.transforms.windowing.AfterPane;
-import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
-import org.apache.beam.sdk.transforms.windowing.Repeatedly;
-import org.apache.beam.sdk.transforms.windowing.Window;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-import org.joda.time.Duration;
-import org.joda.time.Instant;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Query 3, 'Local Item Suggestion'. Who is selling in OR, ID or CA in category 10, and for what
- * auction ids? In CQL syntax:
- *
- * <pre>
- * SELECT Istream(P.name, P.city, P.state, A.id)
- * FROM Auction A [ROWS UNBOUNDED], Person P [ROWS UNBOUNDED]
- * WHERE A.seller = P.id AND (P.state = `OR' OR P.state = `ID' OR P.state = `CA') AND A.category
- * = 10;
- * </pre>
- *
- * <p>We'll implement this query to allow 'new auction' events to come before the 'new person'
- * events for the auction seller. Those auctions will be stored until the matching person is seen.
- * Then all subsequent auctions for a person will use the stored person record.
- *
- * <p>A real system would use an external system to maintain the id-to-person association.
- */
-public class Query3 extends NexmarkQuery {
-
- private static final Logger LOG = LoggerFactory.getLogger(Query3.class);
- private final JoinDoFn joinDoFn;
-
- public Query3(NexmarkConfiguration configuration) {
- super(configuration, "Query3");
- joinDoFn = new JoinDoFn(name, configuration.maxAuctionsWaitingTime);
- }
-
- private PCollection<NameCityStateId> applyTyped(PCollection<Event> events) {
- int numEventsInPane = 30;
-
- PCollection<Event> eventsWindowed =
- events.apply(
- Window.<Event>into(new GlobalWindows())
- .triggering(Repeatedly.forever((AfterPane.elementCountAtLeast(numEventsInPane))))
- .discardingFiredPanes()
- .withAllowedLateness(Duration.ZERO));
- PCollection<KV<Long, Auction>> auctionsBySellerId =
- eventsWindowed
- // Only want the new auction events.
- .apply(JUST_NEW_AUCTIONS)
-
- // We only want auctions in category 10.
- .apply(
- name + ".InCategory",
- Filter.by(
- new SerializableFunction<Auction, Boolean>() {
-
- @Override
- public Boolean apply(Auction auction) {
- return auction.category == 10;
- }
- }))
-
- // Key auctions by their seller id.
- .apply("AuctionBySeller", AUCTION_BY_SELLER);
-
- PCollection<KV<Long, Person>> personsById =
- eventsWindowed
- // Only want the new people events.
- .apply(JUST_NEW_PERSONS)
-
- // We only want people in OR, ID, CA.
- .apply(
- name + ".InState",
- Filter.by(
- new SerializableFunction<Person, Boolean>() {
-
- @Override
- public Boolean apply(Person person) {
- return person.state.equals("OR")
- || person.state.equals("ID")
- || person.state.equals("CA");
- }
- }))
-
- // Key people by their id.
- .apply("PersonById", PERSON_BY_ID);
-
- return
- // Join auctions and people.
- // concatenate KeyedPCollections
- KeyedPCollectionTuple.of(AUCTION_TAG, auctionsBySellerId)
- .and(PERSON_TAG, personsById)
- // group auctions and persons by personId
- .apply(CoGroupByKey.<Long>create())
- .apply(name + ".Join", ParDo.of(joinDoFn))
-
- // Project what we want.
- .apply(
- name + ".Project",
- ParDo.of(
- new DoFn<KV<Auction, Person>, NameCityStateId>() {
-
- @ProcessElement
- public void processElement(ProcessContext c) {
- Auction auction = c.element().getKey();
- Person person = c.element().getValue();
- c.output(
- new NameCityStateId(person.name, person.city, person.state, auction.id));
- }
- }));
- }
-
- @Override
- protected PCollection<KnownSize> applyPrim(PCollection<Event> events) {
- return NexmarkUtils.castToKnownSize(name, applyTyped(events));
- }
-
- /**
- * Join {@code auctions} and {@code people} by person id and emit their cross-product one pair at
- * a time.
- *
- * <p>We know a person may submit any number of auctions. Thus new person event must have the
- * person record stored in persistent state in order to match future auctions by that person.
- *
- * <p>However we know that each auction is associated with at most one person, so only need to
- * store auction records in persistent state until we have seen the corresponding person record.
- * And of course may have already seen that record.
- */
- private static class JoinDoFn extends DoFn<KV<Long, CoGbkResult>, KV<Auction, Person>> {
-
- private final int maxAuctionsWaitingTime;
- private static final String AUCTIONS = "auctions";
- private static final String PERSON = "person";
-
- @StateId(PERSON)
- private static final StateSpec<ValueState<Person>> personSpec =
- StateSpecs.value(Person.CODER);
-
- private static final String PERSON_STATE_EXPIRING = "personStateExpiring";
-
- @StateId(AUCTIONS)
- private final StateSpec<ValueState<List<Auction>>> auctionsSpec =
- StateSpecs.value(ListCoder.of(Auction.CODER));
-
- @TimerId(PERSON_STATE_EXPIRING)
- private final TimerSpec timerSpec = TimerSpecs.timer(TimeDomain.EVENT_TIME);
-
- // Used to refer the metrics namespace
- private final String name;
-
- private final Counter newAuctionCounter;
- private final Counter newPersonCounter;
- private final Counter newNewOutputCounter;
- private final Counter newOldOutputCounter;
- private final Counter oldNewOutputCounter;
- private final Counter fatalCounter;
-
- private JoinDoFn(String name, int maxAuctionsWaitingTime) {
- this.name = name;
- this.maxAuctionsWaitingTime = maxAuctionsWaitingTime;
- newAuctionCounter = Metrics.counter(name, "newAuction");
- newPersonCounter = Metrics.counter(name, "newPerson");
- newNewOutputCounter = Metrics.counter(name, "newNewOutput");
- newOldOutputCounter = Metrics.counter(name, "newOldOutput");
- oldNewOutputCounter = Metrics.counter(name, "oldNewOutput");
- fatalCounter = Metrics.counter(name , "fatal");
- }
-
- @ProcessElement
- public void processElement(
- ProcessContext c,
- @TimerId(PERSON_STATE_EXPIRING) Timer timer,
- @StateId(PERSON) ValueState<Person> personState,
- @StateId(AUCTIONS) ValueState<List<Auction>> auctionsState) {
- // We would *almost* implement this by rewindowing into the global window and
- // running a combiner over the result. The combiner's accumulator would be the
- // state we use below. However, combiners cannot emit intermediate results, thus
- // we need to wait for the pending ReduceFn API.
-
- Person existingPerson = personState.read();
- if (existingPerson != null) {
- // We've already seen the new person event for this person id.
- // We can join with any new auctions on-the-fly without needing any
- // additional persistent state.
- for (Auction newAuction : c.element().getValue().getAll(AUCTION_TAG)) {
- newAuctionCounter.inc();
- newOldOutputCounter.inc();
- c.output(KV.of(newAuction, existingPerson));
- }
- return;
- }
-
- Person theNewPerson = null;
- for (Person newPerson : c.element().getValue().getAll(PERSON_TAG)) {
- if (theNewPerson == null) {
- theNewPerson = newPerson;
- } else {
- if (theNewPerson.equals(newPerson)) {
- LOG.error("Duplicate person {}", theNewPerson);
- } else {
- LOG.error("Conflicting persons {} and {}", theNewPerson, newPerson);
- }
- fatalCounter.inc();
- continue;
- }
- newPersonCounter.inc();
- // We've now seen the person for this person id so can flush any
- // pending auctions for the same seller id (an auction is done by only one seller).
- List<Auction> pendingAuctions = auctionsState.read();
- if (pendingAuctions != null) {
- for (Auction pendingAuction : pendingAuctions) {
- oldNewOutputCounter.inc();
- c.output(KV.of(pendingAuction, newPerson));
- }
- auctionsState.clear();
- }
- // Also deal with any new auctions.
- for (Auction newAuction : c.element().getValue().getAll(AUCTION_TAG)) {
- newAuctionCounter.inc();
- newNewOutputCounter.inc();
- c.output(KV.of(newAuction, newPerson));
- }
- // Remember this person for any future auctions.
- personState.write(newPerson);
- //set a time out to clear this state
- Instant firingTime = new Instant(newPerson.dateTime)
- .plus(Duration.standardSeconds(maxAuctionsWaitingTime));
- timer.set(firingTime);
- }
- if (theNewPerson != null) {
- return;
- }
-
- // We'll need to remember the auctions until we see the corresponding
- // new person event.
- List<Auction> pendingAuctions = auctionsState.read();
- if (pendingAuctions == null) {
- pendingAuctions = new ArrayList<>();
- }
- for (Auction newAuction : c.element().getValue().getAll(AUCTION_TAG)) {
- newAuctionCounter.inc();
- pendingAuctions.add(newAuction);
- }
- auctionsState.write(pendingAuctions);
- }
-
- @OnTimer(PERSON_STATE_EXPIRING)
- public void onTimerCallback(
- OnTimerContext context,
- @StateId(PERSON) ValueState<Person> personState) {
- personState.clear();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query3Model.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query3Model.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query3Model.java
deleted file mode 100644
index f415709..0000000
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query3Model.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.integration.nexmark.queries;
-
-import com.google.common.collect.ArrayListMultimap;
-import com.google.common.collect.Multimap;
-
-import java.io.Serializable;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-
-import org.apache.beam.integration.nexmark.NexmarkConfiguration;
-import org.apache.beam.integration.nexmark.NexmarkUtils;
-import org.apache.beam.integration.nexmark.model.Auction;
-import org.apache.beam.integration.nexmark.model.Event;
-import org.apache.beam.integration.nexmark.model.NameCityStateId;
-import org.apache.beam.integration.nexmark.model.Person;
-import org.apache.beam.sdk.values.TimestampedValue;
-import org.joda.time.Instant;
-
-/**
- * A direct implementation of {@link Query3}.
- */
-public class Query3Model extends NexmarkQueryModel implements Serializable {
- /**
- * Simulator for query 3.
- */
- private static class Simulator extends AbstractSimulator<Event, NameCityStateId> {
- /** Auctions, indexed by seller id. */
- private final Multimap<Long, Auction> newAuctions;
-
- /** Persons, indexed by id. */
- private final Map<Long, Person> newPersons;
-
- public Simulator(NexmarkConfiguration configuration) {
- super(NexmarkUtils.standardEventIterator(configuration));
- newPersons = new HashMap<>();
- newAuctions = ArrayListMultimap.create();
- }
-
- /**
- * Capture new result.
- */
- private void addResult(Auction auction, Person person, Instant timestamp) {
- TimestampedValue<NameCityStateId> result = TimestampedValue.of(
- new NameCityStateId(person.name, person.city, person.state, auction.id), timestamp);
- addResult(result);
- }
-
- @Override
- protected void run() {
- TimestampedValue<Event> timestampedEvent = nextInput();
- if (timestampedEvent == null) {
- allDone();
- return;
- }
- Event event = timestampedEvent.getValue();
- if (event.bid != null) {
- // Ignore bid events.
- return;
- }
-
- Instant timestamp = timestampedEvent.getTimestamp();
-
- if (event.newAuction != null) {
- // Only want auctions in category 10.
- if (event.newAuction.category == 10) {
- // Join new auction with existing person, if any.
- Person person = newPersons.get(event.newAuction.seller);
- if (person != null) {
- addResult(event.newAuction, person, timestamp);
- } else {
- // Remember auction for future new person event.
- newAuctions.put(event.newAuction.seller, event.newAuction);
- }
- }
- } else {
- // Only want people in OR, ID or CA.
- if (event.newPerson.state.equals("OR") || event.newPerson.state.equals("ID")
- || event.newPerson.state.equals("CA")) {
- // Join new person with existing auctions.
- for (Auction auction : newAuctions.get(event.newPerson.id)) {
- addResult(auction, event.newPerson, timestamp);
- }
- // We'll never need these auctions again.
- newAuctions.removeAll(event.newPerson.id);
- // Remember person for future auctions.
- newPersons.put(event.newPerson.id, event.newPerson);
- }
- }
- }
- }
-
- public Query3Model(NexmarkConfiguration configuration) {
- super(configuration);
- }
-
- @Override
- public AbstractSimulator<?, ?> simulator() {
- return new Simulator(configuration);
- }
-
- @Override
- protected <T> Collection<String> toCollection(Iterator<TimestampedValue<T>> itr) {
- return toValue(itr);
- }
-}