You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ie...@apache.org on 2017/08/23 17:09:11 UTC
[03/55] [abbrv] beam git commit: NexMark
http://git-wip-us.apache.org/repos/asf/beam/blob/1f08970a/integration/java/src/main/java/org/apache/beam/integration/nexmark/NexmarkUtils.java
----------------------------------------------------------------------
diff --git a/integration/java/src/main/java/org/apache/beam/integration/nexmark/NexmarkUtils.java b/integration/java/src/main/java/org/apache/beam/integration/nexmark/NexmarkUtils.java
new file mode 100644
index 0000000..13ed580
--- /dev/null
+++ b/integration/java/src/main/java/org/apache/beam/integration/nexmark/NexmarkUtils.java
@@ -0,0 +1,681 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.integration.nexmark;
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.AvroCoder;
+import org.apache.beam.sdk.coders.ByteArrayCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.beam.sdk.coders.CustomCoder;
+import org.apache.beam.sdk.coders.SerializableCoder;
+import org.apache.beam.sdk.io.Read;
+import org.apache.beam.sdk.runners.DirectPipelineRunner;
+import org.apache.beam.sdk.runners.PipelineRunner;
+import org.apache.beam.sdk.transforms.Aggregator;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.Sum.SumLongFn;
+import org.apache.beam.sdk.transforms.windowing.AfterPane;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.util.state.StateNamespaces;
+import org.apache.beam.sdk.util.state.StateTag;
+import org.apache.beam.sdk.util.state.StateTags;
+import org.apache.beam.sdk.util.state.ValueState;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PInput;
+import org.apache.beam.sdk.values.TimestampedValue;
+import com.google.common.collect.ImmutableList;
+import com.google.common.hash.Hashing;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * Odd's 'n Ends used throughout queries and driver.
+ */
+public class NexmarkUtils {
+ private static final Logger LOG = LoggerFactory.getLogger(NexmarkGoogleDriver.class.getName());
+
+ /**
+ * Mapper for (de)serializing JSON.
+ */
+ static final ObjectMapper MAPPER = new ObjectMapper();
+
+ /**
+ * Possible sources for events.
+ */
+ public enum SourceType {
+ /**
+ * Produce events directly.
+ */
+ DIRECT,
+ /**
+ * Read events from an Avro file.
+ */
+ AVRO,
+ /**
+ * Read from a PubSub topic. It will be fed the same synthetic events by this pipeline.
+ */
+ PUBSUB
+ }
+
+ /**
+ * Possible sinks for query results.
+ */
+ public enum SinkType {
+ /**
+ * Discard all results.
+ */
+ COUNT_ONLY,
+ /**
+ * Discard all results after converting them to strings.
+ */
+ DEVNULL,
+ /**
+ * Write to a PubSub topic. It will be drained by this pipeline.
+ */
+ PUBSUB,
+ /**
+ * Write to a text file. Only works in batch mode.
+ */
+ TEXT,
+ /**
+ * Write raw Events to Avro. Only works in batch mode.
+ */
+ AVRO,
+ /**
+ * Write raw Events to BigQuery.
+ */
+ BIGQUERY,
+ }
+
+ /**
+ * Pub/sub mode to run in.
+ */
+ public enum PubSubMode {
+ /**
+ * Publish events to pub/sub, but don't run the query.
+ */
+ PUBLISH_ONLY,
+ /**
+ * Consume events from pub/sub and run the query, but don't publish.
+ */
+ SUBSCRIBE_ONLY,
+ /**
+ * Both publish and consume, but as separate jobs.
+ */
+ COMBINED
+ }
+
+ /**
+ * Coder strategies.
+ */
+ public enum CoderStrategy {
+ /**
+ * Hand-written.
+ */
+ HAND,
+ /**
+ * Avro.
+ */
+ AVRO,
+ /**
+ * Java serialization.
+ */
+ JAVA
+ }
+
+ /**
+ * How to determine resource names.
+ */
+ public enum ResourceNameMode {
+ /** Names are used as provided. */
+ VERBATIM,
+ /** Names are suffixed with the query being run. */
+ QUERY,
+ /** Names are suffixed with the query being run and a random number. */
+ QUERY_AND_SALT;
+ }
+
+ /**
+ * Units for rates.
+ */
+ public enum RateUnit {
+ PER_SECOND(1_000_000L),
+ PER_MINUTE(60_000_000L);
+
+ RateUnit(long usPerUnit) {
+ this.usPerUnit = usPerUnit;
+ }
+
+ /**
+ * Number of microseconds per unit.
+ */
+ private final long usPerUnit;
+
+ /**
+ * Number of microseconds between events at given rate.
+ */
+ public long rateToPeriodUs(long rate) {
+ return (usPerUnit + rate / 2) / rate;
+ }
+ }
+
+ /**
+ * Shape of event rate.
+ */
+ public static enum RateShape {
+ SQUARE,
+ SINE;
+
+ /**
+ * Number of steps used to approximate sine wave.
+ */
+ private static final int N = 10;
+
+ /**
+ * Return inter-event delay, in microseconds, for each generator
+ * to follow in order to achieve {@code rate} at {@code unit} using {@code numGenerators}.
+ */
+ public long interEventDelayUs(int rate, RateUnit unit, int numGenerators) {
+ return unit.rateToPeriodUs(rate) * numGenerators;
+ }
+
+ /**
+ * Return array of successive inter-event delays, in microseconds, for each generator
+ * to follow in order to achieve this shape with {@code firstRate/nextRate} at
+ * {@code unit} using {@code numGenerators}.
+ */
+ public long[] interEventDelayUs(
+ int firstRate, int nextRate, RateUnit unit, int numGenerators) {
+ if (firstRate == nextRate) {
+ long[] interEventDelayUs = new long[1];
+ interEventDelayUs[0] = unit.rateToPeriodUs(firstRate) * numGenerators;
+ return interEventDelayUs;
+ }
+
+ switch (this) {
+ case SQUARE: {
+ long[] interEventDelayUs = new long[2];
+ interEventDelayUs[0] = unit.rateToPeriodUs(firstRate) * numGenerators;
+ interEventDelayUs[1] = unit.rateToPeriodUs(nextRate) * numGenerators;
+ return interEventDelayUs;
+ }
+ case SINE: {
+ double mid = (firstRate + nextRate) / 2.0;
+ double amp = (firstRate - nextRate) / 2.0; // may be -ve
+ long[] interEventDelayUs = new long[N];
+ for (int i = 0; i < N; i++) {
+ double r = (2.0 * Math.PI * i) / N;
+ double rate = mid + amp * Math.cos(r);
+ interEventDelayUs[i] = unit.rateToPeriodUs(Math.round(rate)) * numGenerators;
+ }
+ return interEventDelayUs;
+ }
+ }
+ throw new RuntimeException(); // switch should be exhaustive
+ }
+
+ /**
+ * Return delay between steps, in seconds, for result of {@link #interEventDelayUs}, so
+ * as to cycle through the entire sequence every {@code ratePeriodSec}.
+ */
+ public int stepLengthSec(int ratePeriodSec) {
+ int n = 0;
+ switch (this) {
+ case SQUARE:
+ n = 2;
+ break;
+ case SINE:
+ n = N;
+ break;
+ }
+ return (ratePeriodSec + n - 1) / n;
+ }
+ }
+
+ /**
+ * Set to true to capture all info messages. The logging level flags don't currently work.
+ */
+ private static final boolean LOG_INFO = false;
+
+ /**
+ * Set to true to capture all error messages. The logging level flags don't currently work.
+ */
+ private static final boolean LOG_ERROR = true;
+
+ /**
+ * Set to true to log directly to stdout on VM. You can watch the results in real-time with:
+ * tail -f /var/log/dataflow/streaming-harness/harness-stdout.log
+ */
+ private static final boolean LOG_TO_CONSOLE = false;
+
+ /**
+ * Log info message.
+ */
+ public static void info(String format, Object... args) {
+ if (LOG_INFO) {
+ LOG.info(String.format(format, args));
+ if (LOG_TO_CONSOLE) {
+ System.out.println(String.format(format, args));
+ }
+ }
+ }
+
+ /**
+ * Log error message.
+ */
+ public static void error(String format, Object... args) {
+ if (LOG_ERROR) {
+ LOG.error(String.format(format, args));
+ if (LOG_TO_CONSOLE) {
+ System.out.println(String.format(format, args));
+ }
+ }
+ }
+
+ /**
+ * Log message to console. For client side only.
+ */
+ public static void console(String format, Object... args) {
+ System.out.printf("%s %s\n", Instant.now(), String.format(format, args));
+ }
+
+ /**
+ * Label to use for timestamps on pub/sub messages.
+ */
+ public static final String PUBSUB_TIMESTAMP = "timestamp";
+
+ /**
+ * Label to use for windmill ids on pub/sub messages.
+ */
+ public static final String PUBSUB_ID = "id";
+
+ /**
+ * All events will be given a timestamp relative to this time (ms since epoch).
+ */
+ public static final long BASE_TIME = Instant.parse("2015-07-15T00:00:00.000Z").getMillis();
+
+ /**
+ * Instants guaranteed to be strictly before and after all event timestamps, and which won't
+ * be subject to underflow/overflow.
+ */
+ public static final Instant BEGINNING_OF_TIME = new Instant(0).plus(Duration.standardDays(365));
+ public static final Instant END_OF_TIME =
+ BoundedWindow.TIMESTAMP_MAX_VALUE.minus(Duration.standardDays(365));
+
+ /**
+ * Setup pipeline with codes and some other options.
+ */
+ public static void setupPipeline(CoderStrategy coderStrategy, Pipeline p) {
+ PipelineRunner<?> runner = p.getRunner();
+ if (runner instanceof DirectPipelineRunner) {
+ // Disable randomization of output since we want to check batch and streaming match the
+ // model both locally and on the cloud.
+ ((DirectPipelineRunner) runner).withUnorderednessTesting(false);
+ }
+
+ CoderRegistry registry = p.getCoderRegistry();
+ switch (coderStrategy) {
+ case HAND:
+ registry.registerCoder(Auction.class, Auction.CODER);
+ registry.registerCoder(AuctionBid.class, AuctionBid.CODER);
+ registry.registerCoder(AuctionCount.class, AuctionCount.CODER);
+ registry.registerCoder(AuctionPrice.class, AuctionPrice.CODER);
+ registry.registerCoder(Bid.class, Bid.CODER);
+ registry.registerCoder(CategoryPrice.class, CategoryPrice.CODER);
+ registry.registerCoder(Event.class, Event.CODER);
+ registry.registerCoder(IdNameReserve.class, IdNameReserve.CODER);
+ registry.registerCoder(NameCityStateId.class, NameCityStateId.CODER);
+ registry.registerCoder(Person.class, Person.CODER);
+ registry.registerCoder(SellerPrice.class, SellerPrice.CODER);
+ registry.registerCoder(Done.class, Done.CODER);
+ registry.registerCoder(BidsPerSession.class, BidsPerSession.CODER);
+ break;
+ case AVRO:
+ registry.setFallbackCoderProvider(AvroCoder.PROVIDER);
+ break;
+ case JAVA:
+ registry.setFallbackCoderProvider(SerializableCoder.PROVIDER);
+ break;
+ }
+ }
+
+ /**
+ * Return a generator config to match the given {@code options}.
+ */
+ public static GeneratorConfig standardGeneratorConfig(NexmarkConfiguration configuration) {
+ return new GeneratorConfig(configuration,
+ configuration.useWallclockEventTime ? System.currentTimeMillis()
+ : BASE_TIME, 0,
+ configuration.numEvents, 0);
+ }
+
+ /**
+ * Return an iterator of events using the 'standard' generator config.
+ */
+ public static Iterator<TimestampedValue<Event>> standardEventIterator(
+ NexmarkConfiguration configuration) {
+ return new Generator(standardGeneratorConfig(configuration));
+ }
+
+ /**
+ * Return a transform which yields a finite number of synthesized events generated
+ * as a batch.
+ */
+ public static PTransform<PInput, PCollection<Event>> batchEventsSource(
+ String name, NexmarkConfiguration configuration) {
+ return Read
+ .from(new BoundedEventSource(
+ NexmarkUtils.standardGeneratorConfig(configuration), configuration.numEventGenerators))
+ .named(name + ".ReadBounded");
+ }
+
+ /**
+ * Return a transform which yields a finite number of synthesized events generated
+ * on-the-fly in real time.
+ */
+ public static PTransform<PInput, PCollection<Event>> streamEventsSource(
+ String name, NexmarkConfiguration configuration) {
+ return Read.from(new UnboundedEventSource(NexmarkUtils.standardGeneratorConfig(configuration),
+ configuration.numEventGenerators,
+ configuration.watermarkHoldbackSec,
+ configuration.isRateLimited))
+ .named(name + ".ReadUnbounded");
+ }
+
+ /**
+ * Return a transform to pass-through events, but count them as they go by.
+ */
+ public static ParDo.Bound<Event, Event> snoop(final String name) {
+ return ParDo.named(name + ".Snoop")
+ .of(new DoFn<Event, Event>() {
+ final Aggregator<Long, Long> eventCounter =
+ createAggregator("events", new SumLongFn());
+ final Aggregator<Long, Long> newPersonCounter =
+ createAggregator("newPersons", new SumLongFn());
+ final Aggregator<Long, Long> newAuctionCounter =
+ createAggregator("newAuctions", new SumLongFn());
+ final Aggregator<Long, Long> bidCounter =
+ createAggregator("bids", new SumLongFn());
+ final Aggregator<Long, Long> endOfStreamCounter =
+ createAggregator("endOfStream", new SumLongFn());
+
+ @Override
+ public void processElement(ProcessContext c) {
+ eventCounter.addValue(1L);
+ if (c.element().newPerson != null) {
+ newPersonCounter.addValue(1L);
+ } else if (c.element().newAuction != null) {
+ newAuctionCounter.addValue(1L);
+ } else if (c.element().bid != null) {
+ bidCounter.addValue(1L);
+ } else {
+ endOfStreamCounter.addValue(1L);
+ }
+ info("%s snooping element %s", name, c.element());
+ c.output(c.element());
+ }
+ });
+ }
+
+ /**
+ * Return a transform to count and discard each element.
+ */
+ public static <T> ParDo.Bound<T, Void> devNull(String name) {
+ return ParDo.named(name + ".DevNull")
+ .of(new DoFn<T, Void>() {
+ final Aggregator<Long, Long> discardCounter =
+ createAggregator("discarded", new SumLongFn());
+
+ @Override
+ public void processElement(ProcessContext c) {
+ discardCounter.addValue(1L);
+ }
+ });
+ }
+
+ /**
+ * Return a transform to log each element, passing it through unchanged.
+ */
+ public static <T> ParDo.Bound<T, T> log(final String name) {
+ return ParDo.named(name + ".Log")
+ .of(new DoFn<T, T>() {
+ @Override
+ public void processElement(ProcessContext c) {
+ error("%s: %s", name, c.element());
+ c.output(c.element());
+ }
+ });
+ }
+
+ /**
+ * Return a transform to format each element as a string.
+ */
+ public static <T> ParDo.Bound<T, String> format(String name) {
+ return ParDo.named(name + ".Format")
+ .of(new DoFn<T, String>() {
+ final Aggregator<Long, Long> recordCounter =
+ createAggregator("records", new SumLongFn());
+
+ @Override
+ public void processElement(ProcessContext c) {
+ recordCounter.addValue(1L);
+ c.output(c.element().toString());
+ }
+ });
+ }
+
+ /**
+ * Return a transform to make explicit the timestamp of each element.
+ */
+ public static <T> ParDo.Bound<T, TimestampedValue<T>> stamp(String name) {
+ return ParDo.named(name + ".Stamp")
+ .of(new DoFn<T, TimestampedValue<T>>() {
+ @Override
+ public void processElement(ProcessContext c) {
+ c.output(TimestampedValue.of(c.element(), c.timestamp()));
+ }
+ });
+ }
+
+ /**
+ * Return a transform to reduce a stream to a single, order-invariant long hash.
+ */
+ public static <T> PTransform<PCollection<T>, PCollection<Long>> hash(
+ final long numEvents, String name) {
+ return new PTransform<PCollection<T>, PCollection<Long>>(name) {
+ @Override
+ public PCollection<Long> apply(PCollection<T> input) {
+ return input.apply(Window.<T>into(new GlobalWindows())
+ .triggering(AfterPane.elementCountAtLeast((int) numEvents))
+ .withAllowedLateness(Duration.standardDays(1))
+ .discardingFiredPanes())
+
+ .apply(ParDo.named(name + ".Hash").of(new DoFn<T, Long>() {
+ @Override
+ public void processElement(ProcessContext c) {
+ long hash =
+ Hashing.murmur3_128()
+ .newHasher()
+ .putLong(c.timestamp().getMillis())
+ .putString(c.element().toString(), StandardCharsets.UTF_8)
+ .hash()
+ .asLong();
+ c.output(hash);
+ }
+ }))
+
+ .apply(Combine.globally(new Combine.BinaryCombineFn<Long>() {
+ @Override
+ public Long apply(Long left, Long right) {
+ return left ^ right;
+ }
+ }));
+ }
+ };
+ }
+
+ private static final long MASK = (1L << 16) - 1L;
+ private static final long HASH = 0x243F6A8885A308D3L;
+ private static final long INIT_PLAINTEXT = 50000L;
+
+ /**
+ * Return a transform to keep the CPU busy for given milliseconds on every record.
+ */
+ public static <T> ParDo.Bound<T, T> cpuDelay(String name, final long delayMs) {
+ return ParDo.named(name + ".CpuDelay")
+ .of(new DoFn<T, T>() {
+ @Override
+ public void processElement(ProcessContext c) {
+ long now = System.currentTimeMillis();
+ long end = now + delayMs;
+ while (now < end) {
+ // Find plaintext which hashes to HASH in lowest MASK bits.
+ // Values chosen to roughly take 1ms on typical workstation.
+ long p = INIT_PLAINTEXT;
+ while (true) {
+ long t = Hashing.murmur3_128().hashLong(p).asLong();
+ if ((t & MASK) == (HASH & MASK)) {
+ break;
+ }
+ p++;
+ }
+ long next = System.currentTimeMillis();
+ now = next;
+ }
+ c.output(c.element());
+ }
+ });
+ }
+
+ private static final StateTag<Object, ValueState<byte[]>> DUMMY_TAG =
+ StateTags.value("dummy", ByteArrayCoder.of());
+ private static final int MAX_BUFFER_SIZE = 1 << 24;
+
+ /**
+ * Return a transform to write given number of bytes to durable store on every record.
+ */
+ public static <T> ParDo.Bound<T, T> diskBusy(String name, final long bytes) {
+ return ParDo.named(name + ".DiskBusy")
+ .of(new DoFn<T, T>() {
+ @Override
+ public void processElement(ProcessContext c) {
+ long remain = bytes;
+ long start = System.currentTimeMillis();
+ long now = start;
+ while (remain > 0) {
+ long thisBytes = Math.min(remain, MAX_BUFFER_SIZE);
+ remain -= thisBytes;
+ byte[] arr = new byte[(int) thisBytes];
+ for (int i = 0; i < thisBytes; i++) {
+ arr[i] = (byte) now;
+ }
+ ValueState<byte[]> state = c.windowingInternals().stateInternals().state(
+ StateNamespaces.global(), DUMMY_TAG);
+ state.write(arr);
+ now = System.currentTimeMillis();
+ }
+ c.output(c.element());
+ }
+ });
+ }
+
+ /**
+ * Return a transform to cast each element to {@link KnownSize}.
+ */
+ private static <T extends KnownSize> ParDo.Bound<T, KnownSize> castToKnownSize(
+ final String name) {
+ return ParDo.named(name + ".Forget")
+ .of(new DoFn<T, KnownSize>() {
+ @Override
+ public void processElement(ProcessContext c) {
+ c.output(c.element());
+ }
+ });
+ }
+
+ /**
+ * A coder for instances of {@code T} cast up to {@link KnownSize}.
+ *
+ * @param <T> True type of object.
+ */
+ private static class CastingCoder<T extends KnownSize> extends CustomCoder<KnownSize> {
+ private final Coder<T> trueCoder;
+
+ public CastingCoder(Coder<T> trueCoder) {
+ this.trueCoder = trueCoder;
+ }
+
+ @Override
+ public void encode(KnownSize value, OutputStream outStream, Context context)
+ throws CoderException, IOException {
+ @SuppressWarnings("unchecked")
+ T typedValue = (T) value;
+ trueCoder.encode(typedValue, outStream, context);
+ }
+
+ @Override
+ public KnownSize decode(InputStream inStream, Context context)
+ throws CoderException, IOException {
+ return trueCoder.decode(inStream, context);
+ }
+
+ @Override
+ public List<? extends Coder<?>> getComponents() {
+ return ImmutableList.of(trueCoder);
+ }
+ }
+
+ /**
+ * Return a coder for {@code KnownSize} that are known to be exactly of type {@code T}.
+ */
+ private static <T extends KnownSize> Coder<KnownSize> makeCastingCoder(Coder<T> trueCoder) {
+ return new CastingCoder<>(trueCoder);
+ }
+
+ /**
+ * Return {@code elements} as {@code KnownSize}s.
+ */
+ public static <T extends KnownSize> PCollection<KnownSize> castToKnownSize(
+ final String name, PCollection<T> elements) {
+ return elements.apply(castToKnownSize(name)).setCoder(makeCastingCoder(elements.getCoder()));
+ }
+
+ // Do not instantiate.
+ private NexmarkUtils() {
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/1f08970a/integration/java/src/main/java/org/apache/beam/integration/nexmark/Options.java
----------------------------------------------------------------------
diff --git a/integration/java/src/main/java/org/apache/beam/integration/nexmark/Options.java b/integration/java/src/main/java/org/apache/beam/integration/nexmark/Options.java
new file mode 100644
index 0000000..4f5304d
--- /dev/null
+++ b/integration/java/src/main/java/org/apache/beam/integration/nexmark/Options.java
@@ -0,0 +1,360 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.integration.nexmark;
+
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PubsubOptions;
+import javax.annotation.Nullable;
+
+/**
+ * Command line flags.
+ */
+public interface Options extends PubsubOptions {
+ @Description("Which suite to run. Default is to use command line arguments for one job.")
+ @Default.Enum("DEFAULT")
+ NexmarkSuite getSuite();
+
+ void setSuite(NexmarkSuite suite);
+
+ @Description("If true, and using the DataflowPipelineRunner, monitor the jobs as they run.")
+ @Default.Boolean(false)
+ boolean getMonitorJobs();
+
+ void setMonitorJobs(boolean monitorJobs);
+
+ @Description("Where the events come from.")
+ @Nullable
+ NexmarkUtils.SourceType getSourceType();
+
+ void setSourceType(NexmarkUtils.SourceType sourceType);
+
+ @Description("Prefix for input files if using avro input")
+ @Nullable
+ String getInputPath();
+
+ void setInputPath(String inputPath);
+
+ @Description("Where results go.")
+ @Nullable
+ NexmarkUtils.SinkType getSinkType();
+
+ void setSinkType(NexmarkUtils.SinkType sinkType);
+
+ @Description("Which mode to run in when source is PUBSUB.")
+ @Nullable
+ NexmarkUtils.PubSubMode getPubSubMode();
+
+ void setPubSubMode(NexmarkUtils.PubSubMode pubSubMode);
+
+ @Description("Which query to run.")
+ @Nullable
+ Integer getQuery();
+
+ void setQuery(Integer query);
+
+ @Description("Prefix for output files if using text output for results or running Query 10.")
+ @Nullable
+ String getOutputPath();
+
+ void setOutputPath(String outputPath);
+
+ @Description("Base name of pubsub topic to publish to in streaming mode.")
+ @Nullable
+ @Default.String("nexmark")
+ String getPubsubTopic();
+
+ void setPubsubTopic(String pubsubTopic);
+
+ @Description("Base name of pubsub subscription to read from in streaming mode.")
+ @Nullable
+ @Default.String("nexmark")
+ String getPubsubSubscription();
+
+ void setPubsubSubscription(String pubsubSubscription);
+
+ @Description("Base name of BigQuery table name if using BigQuery output.")
+ @Nullable
+ @Default.String("nexmark")
+ String getBigQueryTable();
+
+ void setBigQueryTable(String bigQueryTable);
+
+ @Description("Approximate number of events to generate. "
+ + "Zero for effectively unlimited in streaming mode.")
+ @Nullable
+ Long getNumEvents();
+
+ void setNumEvents(Long numEvents);
+
+ @Description("Time in seconds to preload the subscription with data, at the initial input rate "
+ + "of the pipeline.")
+ @Nullable
+ Integer getPreloadSeconds();
+
+ void setPreloadSeconds(Integer preloadSeconds);
+
+ @Description("Number of unbounded sources to create events.")
+ @Nullable
+ Integer getNumEventGenerators();
+
+ void setNumEventGenerators(Integer numEventGenerators);
+
+ @Description("Shape of event rate curve.")
+ @Nullable
+ NexmarkUtils.RateShape getRateShape();
+
+ void setRateShape(NexmarkUtils.RateShape rateShape);
+
+ @Description("Initial overall event rate (in --rateUnit).")
+ @Nullable
+ Integer getFirstEventRate();
+
+ void setFirstEventRate(Integer firstEventRate);
+
+ @Description("Next overall event rate (in --rateUnit).")
+ @Nullable
+ Integer getNextEventRate();
+
+ void setNextEventRate(Integer nextEventRate);
+
+ @Description("Unit for rates.")
+ @Nullable
+ NexmarkUtils.RateUnit getRateUnit();
+
+ void setRateUnit(NexmarkUtils.RateUnit rateUnit);
+
+ @Description("Overall period of rate shape, in seconds.")
+ @Nullable
+ Integer getRatePeriodSec();
+
+ void setRatePeriodSec(Integer ratePeriodSec);
+
+ @Description("If true, relay events in real time in streaming mode.")
+ @Nullable
+ Boolean getIsRateLimited();
+
+ void setIsRateLimited(Boolean isRateLimited);
+
+ @Description("If true, use wallclock time as event time. Otherwise, use a deterministic"
+ + " time in the past so that multiple runs will see exactly the same event streams"
+ + " and should thus have exactly the same results.")
+ @Nullable
+ Boolean getUseWallclockEventTime();
+
+ void setUseWallclockEventTime(Boolean useWallclockEventTime);
+
+ @Description("Assert pipeline results match model results.")
+ @Nullable
+ boolean getAssertCorrectness();
+
+ void setAssertCorrectness(boolean assertCorrectness);
+
+ @Description("Log all input events.")
+ @Nullable
+ boolean getLogEvents();
+
+ void setLogEvents(boolean logEvents);
+
+ @Description("Log all query results.")
+ @Nullable
+ boolean getLogResults();
+
+ void setLogResults(boolean logResults);
+
+ @Description("Average size in bytes for a person record.")
+ @Nullable
+ Integer getAvgPersonByteSize();
+
+ void setAvgPersonByteSize(Integer avgPersonByteSize);
+
+ @Description("Average size in bytes for an auction record.")
+ @Nullable
+ Integer getAvgAuctionByteSize();
+
+ void setAvgAuctionByteSize(Integer avgAuctionByteSize);
+
+ @Description("Average size in bytes for a bid record.")
+ @Nullable
+ Integer getAvgBidByteSize();
+
+ void setAvgBidByteSize(Integer avgBidByteSize);
+
+ @Description("Ratio of bids for 'hot' auctions above the background.")
+ @Nullable
+ Integer getHotAuctionRatio();
+
+ void setHotAuctionRatio(Integer hotAuctionRatio);
+
+ @Description("Ratio of auctions for 'hot' sellers above the background.")
+ @Nullable
+ Integer getHotSellersRatio();
+
+ void setHotSellersRatio(Integer hotSellersRatio);
+
+ @Description("Ratio of auctions for 'hot' bidders above the background.")
+ @Nullable
+ Integer getHotBiddersRatio();
+
+ void setHotBiddersRatio(Integer hotBiddersRatio);
+
+ @Description("Window size in seconds.")
+ @Nullable
+ Long getWindowSizeSec();
+
+ void setWindowSizeSec(Long windowSizeSec);
+
+ @Description("Window period in seconds.")
+ @Nullable
+ Long getWindowPeriodSec();
+
+ void setWindowPeriodSec(Long windowPeriodSec);
+
+ @Description("If in streaming mode, the holdback for watermark in seconds.")
+ @Nullable
+ Long getWatermarkHoldbackSec();
+
+ void setWatermarkHoldbackSec(Long watermarkHoldbackSec);
+
+ @Description("Roughly how many auctions should be in flight for each generator.")
+ @Nullable
+ Integer getNumInFlightAuctions();
+
+ void setNumInFlightAuctions(Integer numInFlightAuctions);
+
+
+ @Description("Maximum number of people to consider as active for placing auctions or bids.")
+ @Nullable
+ Integer getNumActivePeople();
+
+ void setNumActivePeople(Integer numActivePeople);
+
+ @Description("Filename of perf data to append to.")
+ @Nullable
+ String getPerfFilename();
+
+ void setPerfFilename(String perfFilename);
+
+ @Description("Filename of baseline perf data to read from.")
+ @Nullable
+ String getBaselineFilename();
+
+ void setBaselineFilename(String baselineFilename);
+
+ @Description("Filename of summary perf data to append to.")
+ @Nullable
+ String getSummaryFilename();
+
+ void setSummaryFilename(String summaryFilename);
+
+ @Description("Filename for javascript capturing all perf data and any baselines.")
+ @Nullable
+ String getJavascriptFilename();
+
+ void setJavascriptFilename(String javascriptFilename);
+
+ @Description("If true, don't run the actual query. Instead, calculate the distribution "
+ + "of number of query results per (event time) minute according to the query model.")
+ @Nullable
+ boolean getJustModelResultRate();
+
+ void setJustModelResultRate(boolean justModelResultRate);
+
+ @Description("Coder strategy to use.")
+ @Nullable
+ NexmarkUtils.CoderStrategy getCoderStrategy();
+
+ void setCoderStrategy(NexmarkUtils.CoderStrategy coderStrategy);
+
+ @Description("Delay, in milliseconds, for each event. We will peg one core for this "
+ + "number of milliseconds to simulate CPU-bound computation.")
+ @Nullable
+ Long getCpuDelayMs();
+
+ void setCpuDelayMs(Long cpuDelayMs);
+
+ @Description("Extra data, in bytes, to save to persistent state for each event. "
+ + "This will force I/O all the way to durable storage to simulate an "
+ + "I/O-bound computation.")
+ @Nullable
+ Long getDiskBusyBytes();
+
+ void setDiskBusyBytes(Long diskBusyBytes);
+
+ @Description("Skip factor for query 2. We select bids for every {@code auctionSkip}'th auction")
+ @Nullable
+ Integer getAuctionSkip();
+
+ void setAuctionSkip(Integer auctionSkip);
+
+ @Description("Fanout for queries 4 (groups by category id) and 7 (finds a global maximum).")
+ @Nullable
+ Integer getFanout();
+
+ void setFanout(Integer fanout);
+
+ @Description("Length of occasional delay to impose on events (in seconds).")
+ @Nullable
+ Long getOccasionalDelaySec();
+
+ void setOccasionalDelaySec(Long occasionalDelaySec);
+
+ @Description("Probability that an event will be delayed by delayS.")
+ @Nullable
+ Double getProbDelayedEvent();
+
+ void setProbDelayedEvent(Double probDelayedEvent);
+
+ @Description("Maximum size of each log file (in events). For Query10 only.")
+ @Nullable
+ Integer getMaxLogEvents();
+
+ void setMaxLogEvents(Integer maxLogEvents);
+
+ @Description("How to derive names of resources.")
+ @Default.Enum("QUERY_AND_SALT")
+ NexmarkUtils.ResourceNameMode getResourceNameMode();
+
+ void setResourceNameMode(NexmarkUtils.ResourceNameMode mode);
+
+ @Description("If true, manage the creation and cleanup of topics, subscriptions and gcs files.")
+ @Default.Boolean(true)
+ boolean getManageResources();
+
+ void setManageResources(boolean manageResources);
+
+ @Description("If true, use pub/sub publish time instead of event time.")
+ @Nullable
+ Boolean getUsePubsubPublishTime();
+
+ void setUsePubsubPublishTime(Boolean usePubsubPublishTime);
+
+ @Description("Number of events in out-of-order groups. 1 implies no out-of-order events. "
+ + "1000 implies every 1000 events per generator are emitted in pseudo-random order.")
+ @Nullable
+ Long getOutOfOrderGroupSize();
+
+ void setOutOfOrderGroupSize(Long outOfOrderGroupSize);
+
+ @Description("If false, do not add the Monitor and Snoop transforms.")
+ @Nullable
+ Boolean getDebug();
+
+ void setDebug(Boolean value);
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/1f08970a/integration/java/src/main/java/org/apache/beam/integration/nexmark/Person.java
----------------------------------------------------------------------
diff --git a/integration/java/src/main/java/org/apache/beam/integration/nexmark/Person.java b/integration/java/src/main/java/org/apache/beam/integration/nexmark/Person.java
new file mode 100644
index 0000000..6fcf388
--- /dev/null
+++ b/integration/java/src/main/java/org/apache/beam/integration/nexmark/Person.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.integration.nexmark;
+
+import org.apache.beam.sdk.coders.AtomicCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.VarLongCoder;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonProcessingException;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.Serializable;
+
+/**
+ * A person either creating an auction or making a bid.
+ */
+public class Person implements KnownSize, Serializable {
+ private static final Coder<Long> LONG_CODER = VarLongCoder.of();
+ private static final Coder<String> STRING_CODER = StringUtf8Coder.of();
+ public static final Coder<Person> CODER = new AtomicCoder<Person>() {
+ @Override
+ public void encode(Person value, OutputStream outStream,
+ Coder.Context context)
+ throws CoderException, IOException {
+ LONG_CODER.encode(value.id, outStream, Context.NESTED);
+ STRING_CODER.encode(value.name, outStream, Context.NESTED);
+ STRING_CODER.encode(value.emailAddress, outStream, Context.NESTED);
+ STRING_CODER.encode(value.creditCard, outStream, Context.NESTED);
+ STRING_CODER.encode(value.city, outStream, Context.NESTED);
+ STRING_CODER.encode(value.state, outStream, Context.NESTED);
+ LONG_CODER.encode(value.dateTime, outStream, Context.NESTED);
+ STRING_CODER.encode(value.extra, outStream, Context.NESTED);
+ }
+
+ @Override
+ public Person decode(
+ InputStream inStream, Coder.Context context)
+ throws CoderException, IOException {
+ long id = LONG_CODER.decode(inStream, Context.NESTED);
+ String name = STRING_CODER.decode(inStream, Context.NESTED);
+ String emailAddress = STRING_CODER.decode(inStream, Context.NESTED);
+ String creditCard = STRING_CODER.decode(inStream, Context.NESTED);
+ String city = STRING_CODER.decode(inStream, Context.NESTED);
+ String state = STRING_CODER.decode(inStream, Context.NESTED);
+ long dateTime = LONG_CODER.decode(inStream, Context.NESTED);
+ String extra = STRING_CODER.decode(inStream, Context.NESTED);
+ return new Person(id, name, emailAddress, creditCard, city, state, dateTime, extra);
+ }
+ };
+
+ /** Id of person. */
+ @JsonProperty
+ public final long id; // primary key
+
+ /** Extra person properties. */
+ @JsonProperty
+ public final String name;
+
+ @JsonProperty
+ public final String emailAddress;
+
+ @JsonProperty
+ public final String creditCard;
+
+ @JsonProperty
+ public final String city;
+
+ @JsonProperty
+ public final String state;
+
+ @JsonProperty
+ public final long dateTime;
+
+ /** Additional arbitrary payload for performance testing. */
+ @JsonProperty
+ public final String extra;
+
+ // For Avro only.
+ @SuppressWarnings("unused")
+ private Person() {
+ id = 0;
+ name = null;
+ emailAddress = null;
+ creditCard = null;
+ city = null;
+ state = null;
+ dateTime = 0;
+ extra = null;
+ }
+
+ public Person(long id, String name, String emailAddress, String creditCard, String city,
+ String state, long dateTime, String extra) {
+ this.id = id;
+ this.name = name;
+ this.emailAddress = emailAddress;
+ this.creditCard = creditCard;
+ this.city = city;
+ this.state = state;
+ this.dateTime = dateTime;
+ this.extra = extra;
+ }
+
+ /**
+ * Return a copy of person which capture the given annotation.
+ * (Used for debugging).
+ */
+ public Person withAnnotation(String annotation) {
+ return new Person(id, name, emailAddress, creditCard, city, state, dateTime,
+ annotation + ": " + extra);
+ }
+
+ /**
+ * Does person have {@code annotation}? (Used for debugging.)
+ */
+ public boolean hasAnnotation(String annotation) {
+ return extra.startsWith(annotation + ": ");
+ }
+
+ /**
+ * Remove {@code annotation} from person. (Used for debugging.)
+ */
+ public Person withoutAnnotation(String annotation) {
+ if (hasAnnotation(annotation)) {
+ return new Person(id, name, emailAddress, creditCard, city, state, dateTime,
+ extra.substring(annotation.length() + 2));
+ } else {
+ return this;
+ }
+ }
+
+ @Override
+ public long sizeInBytes() {
+ return 8 + name.length() + 1 + emailAddress.length() + 1 + creditCard.length() + 1
+ + city.length() + 1 + state.length() + 8 + 1 + extra.length() + 1;
+ }
+
+ @Override
+ public String toString() {
+ try {
+ return NexmarkUtils.MAPPER.writeValueAsString(this);
+ } catch (JsonProcessingException e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/1f08970a/integration/java/src/main/java/org/apache/beam/integration/nexmark/PubsubHelper.java
----------------------------------------------------------------------
diff --git a/integration/java/src/main/java/org/apache/beam/integration/nexmark/PubsubHelper.java b/integration/java/src/main/java/org/apache/beam/integration/nexmark/PubsubHelper.java
new file mode 100644
index 0000000..1255154
--- /dev/null
+++ b/integration/java/src/main/java/org/apache/beam/integration/nexmark/PubsubHelper.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.integration.nexmark;
+
+import org.apache.beam.sdk.options.PubsubOptions;
+import org.apache.beam.sdk.util.PubsubClient;
+import org.apache.beam.sdk.util.PubsubJsonClient;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+/**
+ * Helper for working with pubsub.
+ */
+public class PubsubHelper implements AutoCloseable {
+ /**
+ * Underlying pub/sub client.
+ */
+ private final PubsubClient pubsubClient;
+
+ /**
+ * Project id.
+ */
+ private final String projectId;
+
+ /**
+ * Topics we should delete on close.
+ */
+ private final List<PubsubClient.TopicPath> createdTopics;
+
+ /**
+ * Subscriptions we should delete on close.
+ */
+ private final List<PubsubClient.SubscriptionPath> createdSubscriptions;
+
+ private PubsubHelper(PubsubClient pubsubClient, String projectId) {
+ this.pubsubClient = pubsubClient;
+ this.projectId = projectId;
+ createdTopics = new ArrayList<>();
+ createdSubscriptions = new ArrayList<>();
+ }
+
+ /**
+ * Create a helper.
+ */
+ public static PubsubHelper create(PubsubOptions options) {
+ try {
+ return new PubsubHelper(
+ PubsubJsonClient.FACTORY.newClient(null, null, options),
+ options.getProject());
+ } catch (IOException e) {
+ throw new RuntimeException("Unable to create Pubsub client: ", e);
+ }
+ }
+
+ /**
+ * Create a topic from short name. Delete it if it already exists. Ensure the topic will be
+ * deleted on cleanup. Return full topic name.
+ */
+ public PubsubClient.TopicPath createTopic(String shortTopic) {
+ PubsubClient.TopicPath topic = PubsubClient.topicPathFromName(projectId, shortTopic);
+ try {
+ if (topicExists(shortTopic)) {
+ NexmarkUtils.console("attempting to cleanup topic %s", topic);
+ pubsubClient.deleteTopic(topic);
+ }
+ NexmarkUtils.console("create topic %s", topic);
+ pubsubClient.createTopic(topic);
+ createdTopics.add(topic);
+ return topic;
+ } catch (IOException e) {
+ throw new RuntimeException("Unable to create Pubsub topic " + topic + ": ", e);
+ }
+ }
+
+ /**
+ * Create a topic from short name if it does not already exist. The topic will not be
+ * deleted on cleanup. Return full topic name.
+ */
+ public PubsubClient.TopicPath createOrReuseTopic(String shortTopic) {
+ PubsubClient.TopicPath topic = PubsubClient.topicPathFromName(projectId, shortTopic);
+ try {
+ if (topicExists(shortTopic)) {
+ NexmarkUtils.console("topic %s already exists", topic);
+ return topic;
+ }
+ NexmarkUtils.console("create topic %s", topic);
+ pubsubClient.createTopic(topic);
+ return topic;
+ } catch (IOException e) {
+ throw new RuntimeException("Unable to create or reuse Pubsub topic " + topic + ": ", e);
+ }
+ }
+
+ /**
+ * Check a topic corresponding to short name exists, and throw exception if not. The
+ * topic will not be deleted on cleanup. Return full topic name.
+ */
+ public PubsubClient.TopicPath reuseTopic(String shortTopic) {
+ PubsubClient.TopicPath topic = PubsubClient.topicPathFromName(projectId, shortTopic);
+ if (topicExists(shortTopic)) {
+ NexmarkUtils.console("reusing existing topic %s", topic);
+ return topic;
+ }
+ throw new RuntimeException("topic '" + topic + "' does not already exist");
+ }
+
+ /**
+ * Does topic corresponding to short name exist?
+ */
+ public boolean topicExists(String shortTopic) {
+ PubsubClient.ProjectPath project = PubsubClient.projectPathFromId(projectId);
+ PubsubClient.TopicPath topic = PubsubClient.topicPathFromName(projectId, shortTopic);
+ try {
+ Collection<PubsubClient.TopicPath> existingTopics = pubsubClient.listTopics(project);
+ return existingTopics.contains(topic);
+ } catch (IOException e) {
+ throw new RuntimeException("Unable to check Pubsub topic " + topic + ": ", e);
+ }
+ }
+
+ /**
+ * Create subscription from short name. Delete subscription if it already exists. Ensure the
+ * subscription will be deleted on cleanup. Return full subscription name.
+ */
+ public PubsubClient.SubscriptionPath createSubscription(
+ String shortTopic, String shortSubscription) {
+ PubsubClient.TopicPath topic = PubsubClient.topicPathFromName(projectId, shortTopic);
+ PubsubClient.SubscriptionPath subscription =
+ PubsubClient.subscriptionPathFromName(projectId, shortSubscription);
+ try {
+ if (subscriptionExists(shortTopic, shortSubscription)) {
+ NexmarkUtils.console("attempting to cleanup subscription %s", subscription);
+ pubsubClient.deleteSubscription(subscription);
+ }
+ NexmarkUtils.console("create subscription %s", subscription);
+ pubsubClient.createSubscription(topic, subscription, 60);
+ createdSubscriptions.add(subscription);
+ } catch (IOException e) {
+ throw new RuntimeException("Unable to create Pubsub subscription " + subscription + ": ", e);
+ }
+ return subscription;
+ }
+
+ /**
+ * Check a subscription corresponding to short name exists, and throw exception if not. The
+ * subscription will not be deleted on cleanup. Return full topic name.
+ */
+ public PubsubClient.SubscriptionPath reuseSubscription(
+ String shortTopic, String shortSubscription) {
+ PubsubClient.SubscriptionPath subscription =
+ PubsubClient.subscriptionPathFromName(projectId, shortSubscription);
+ if (subscriptionExists(shortTopic, shortSubscription)) {
+ NexmarkUtils.console("reusing existing subscription %s", subscription);
+ return subscription;
+ }
+ throw new RuntimeException("subscription'" + subscription + "' does not already exist");
+ }
+
+ /**
+ * Does subscription corresponding to short name exist?
+ */
+ public boolean subscriptionExists(String shortTopic, String shortSubscription) {
+ PubsubClient.ProjectPath project = PubsubClient.projectPathFromId(projectId);
+ PubsubClient.TopicPath topic = PubsubClient.topicPathFromName(projectId, shortTopic);
+ PubsubClient.SubscriptionPath subscription =
+ PubsubClient.subscriptionPathFromName(projectId, shortSubscription);
+ try {
+ Collection<PubsubClient.SubscriptionPath> existingSubscriptions =
+ pubsubClient.listSubscriptions(project, topic);
+ return existingSubscriptions.contains(subscription);
+ } catch (IOException e) {
+ throw new RuntimeException("Unable to check Pubsub subscription" + subscription + ": ", e);
+ }
+ }
+
+ /**
+ * Delete all the subscriptions and topics we created.
+ */
+ @Override
+ public void close() {
+ for (PubsubClient.SubscriptionPath subscription : createdSubscriptions) {
+ try {
+ NexmarkUtils.console("delete subscription %s", subscription);
+ pubsubClient.deleteSubscription(subscription);
+ } catch (IOException ex) {
+ NexmarkUtils.console("could not delete subscription %s", subscription);
+ }
+ }
+ for (PubsubClient.TopicPath topic : createdTopics) {
+ try {
+ NexmarkUtils.console("delete topic %s", topic);
+ pubsubClient.deleteTopic(topic);
+ } catch (IOException ex) {
+ NexmarkUtils.console("could not delete topic %s", topic);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/1f08970a/integration/java/src/main/java/org/apache/beam/integration/nexmark/Query0.java
----------------------------------------------------------------------
diff --git a/integration/java/src/main/java/org/apache/beam/integration/nexmark/Query0.java b/integration/java/src/main/java/org/apache/beam/integration/nexmark/Query0.java
new file mode 100644
index 0000000..ea0d7ca
--- /dev/null
+++ b/integration/java/src/main/java/org/apache/beam/integration/nexmark/Query0.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.integration.nexmark;
+
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.transforms.Aggregator;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.Sum.SumLongFn;
+import org.apache.beam.sdk.values.PCollection;
+
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+
+/**
+ * Query 0: Pass events through unchanged. However, force them to do a round trip through
+ * serialization so that we measure the impact of the choice of coders.
+ */
+public class Query0 extends NexmarkQuery {
+ public Query0(NexmarkConfiguration configuration) {
+ super(configuration, "Query0");
+ }
+
+ private PCollection<Event> applyTyped(PCollection<Event> events) {
+ final Coder<Event> coder = events.getCoder();
+
+ return events
+
+ // Force round trip through coder.
+ .apply(
+ ParDo.named(name + ".Serialize")
+ .of(new DoFn<Event, Event>() {
+ private final Aggregator<Long, Long> bytes =
+ createAggregator("bytes", new SumLongFn());
+
+ @Override
+ public void processElement(ProcessContext c) throws CoderException, IOException {
+ ByteArrayOutputStream outStream = new ByteArrayOutputStream();
+ coder.encode(c.element(), outStream, Coder.Context.OUTER);
+ byte[] byteArray = outStream.toByteArray();
+ bytes.addValue((long) byteArray.length);
+ ByteArrayInputStream inStream = new ByteArrayInputStream(byteArray);
+ Event event = coder.decode(inStream, Coder.Context.OUTER);
+ c.output(event);
+ }
+ }));
+ }
+
+ @Override
+ protected PCollection<KnownSize> applyPrim(PCollection<Event> events) {
+ return NexmarkUtils.castToKnownSize(name, applyTyped(events));
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/1f08970a/integration/java/src/main/java/org/apache/beam/integration/nexmark/Query0Model.java
----------------------------------------------------------------------
diff --git a/integration/java/src/main/java/org/apache/beam/integration/nexmark/Query0Model.java b/integration/java/src/main/java/org/apache/beam/integration/nexmark/Query0Model.java
new file mode 100644
index 0000000..f3ceca2
--- /dev/null
+++ b/integration/java/src/main/java/org/apache/beam/integration/nexmark/Query0Model.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.integration.nexmark;
+
+import org.apache.beam.sdk.values.TimestampedValue;
+
+import java.util.Collection;
+import java.util.Iterator;
+
+/**
+ * A direct implementation of {@link Query0}.
+ */
+public class Query0Model extends NexmarkQueryModel {
+ /**
+ * Simulator for query 0.
+ */
+ private class Simulator extends AbstractSimulator<Event, Event> {
+ public Simulator(NexmarkConfiguration configuration) {
+ super(NexmarkUtils.standardEventIterator(configuration));
+ }
+
+ @Override
+ protected void run() {
+ TimestampedValue<Event> timestampedEvent = nextInput();
+ if (timestampedEvent == null) {
+ allDone();
+ return;
+ }
+ addResult(timestampedEvent);
+ }
+ }
+
+ public Query0Model(NexmarkConfiguration configuration) {
+ super(configuration);
+ }
+
+ @Override
+ protected AbstractSimulator<?, ?> simulator() {
+ return new Simulator(configuration);
+ }
+
+ @Override
+ protected <T> Collection<String> toCollection(Iterator<TimestampedValue<T>> itr) {
+ return toValueTimestampOrder(itr);
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/1f08970a/integration/java/src/main/java/org/apache/beam/integration/nexmark/Query1.java
----------------------------------------------------------------------
diff --git a/integration/java/src/main/java/org/apache/beam/integration/nexmark/Query1.java b/integration/java/src/main/java/org/apache/beam/integration/nexmark/Query1.java
new file mode 100644
index 0000000..7e60b9c
--- /dev/null
+++ b/integration/java/src/main/java/org/apache/beam/integration/nexmark/Query1.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.integration.nexmark;
+
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+
+/**
+ * Query 1, 'Currency Conversion'. Convert each bid value from dollars to euros.
+ * In CQL syntax:
+ *
+ * <pre>
+ * SELECT Istream(auction, DOLTOEUR(price), bidder, datetime)
+ * FROM bid [ROWS UNBOUNDED];
+ * </pre>
+ *
+ * <p>To make things more interesting, allow the 'currency conversion' to be arbitrarily
+ * slowed down.
+ */
+class Query1 extends NexmarkQuery {
+ public Query1(NexmarkConfiguration configuration) {
+ super(configuration, "Query1");
+ }
+
+ private PCollection<Bid> applyTyped(PCollection<Event> events) {
+ return events
+ // Only want the bid events.
+ .apply(JUST_BIDS)
+
+ // Map the conversion function over all bids.
+ .apply(
+ ParDo.named(name + ".ToEuros")
+ .of(new DoFn<Bid, Bid>() {
+ @Override
+ public void processElement(ProcessContext c) {
+ Bid bid = c.element();
+ c.output(new Bid(
+ bid.auction, bid.bidder, (bid.price * 89) / 100, bid.dateTime, bid.extra));
+ }
+ }));
+ }
+
+ @Override
+ protected PCollection<KnownSize> applyPrim(PCollection<Event> events) {
+ return NexmarkUtils.castToKnownSize(name, applyTyped(events));
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/1f08970a/integration/java/src/main/java/org/apache/beam/integration/nexmark/Query10.java
----------------------------------------------------------------------
diff --git a/integration/java/src/main/java/org/apache/beam/integration/nexmark/Query10.java b/integration/java/src/main/java/org/apache/beam/integration/nexmark/Query10.java
new file mode 100644
index 0000000..74fb28c
--- /dev/null
+++ b/integration/java/src/main/java/org/apache/beam/integration/nexmark/Query10.java
@@ -0,0 +1,378 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.integration.nexmark;
+
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.options.GcsOptions;
+import org.apache.beam.sdk.transforms.Aggregator;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.DoFnWithContext;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.Sum.SumLongFn;
+import org.apache.beam.sdk.transforms.windowing.AfterEach;
+import org.apache.beam.sdk.transforms.windowing.AfterFirst;
+import org.apache.beam.sdk.transforms.windowing.AfterPane;
+import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
+import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing;
+import org.apache.beam.sdk.transforms.windowing.Repeatedly;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.util.GcsIOChannelFactory;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+
+import com.google.cloud.hadoop.gcsio.GoogleCloudStorageWriteChannel;
+import com.google.common.base.Preconditions;
+
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.Serializable;
+import java.nio.channels.Channels;
+import java.nio.channels.WritableByteChannel;
+import java.util.concurrent.ThreadLocalRandom;
+import javax.annotation.Nullable;
+
+/**
+ * Query "10", 'Log to sharded files' (Not in original suite.)
+ *
+ * <p>Every windowSizeSec, save all events from the last period into 2*maxWorkers log files.
+ */
+class Query10 extends NexmarkQuery {
+ private static final int CHANNEL_BUFFER = 8 << 20; // 8MB
+ private static final int NUM_SHARDS_PER_WORKER = 5;
+ private static final Duration LATE_BATCHING_PERIOD = Duration.standardSeconds(10);
+
+ /**
+ * Capture everything we need to know about the records in a single output file.
+ */
+ private static class OutputFile implements Serializable {
+ /** Maximum possible timestamp of records in file. */
+ private final Instant maxTimestamp;
+ /** Shard within window. */
+ private final String shard;
+ /** Index of file in all files in shard. */
+ private final long index;
+ /** Timing of records in this file. */
+ private final PaneInfo.Timing timing;
+ /** Path to file containing records, or {@literal null} if no output required. */
+ @Nullable
+ private final String filename;
+
+ public OutputFile(
+ Instant maxTimestamp,
+ String shard,
+ long index,
+ PaneInfo.Timing timing,
+ @Nullable String filename) {
+ this.maxTimestamp = maxTimestamp;
+ this.shard = shard;
+ this.index = index;
+ this.timing = timing;
+ this.filename = filename;
+ }
+
+ @Override
+ public String toString() {
+ return String.format("%s %s %d %s %s\n", maxTimestamp, shard, index, timing, filename);
+ }
+ }
+
+ /**
+ * GCS uri prefix for all log and 'finished' files. If null they won't be written.
+ */
+ @Nullable
+ private String outputPath;
+
+ /**
+ * Maximum number of workers, used to determine log sharding factor.
+ */
+ private int maxNumWorkers;
+
+ public Query10(NexmarkConfiguration configuration) {
+ super(configuration, "Query10");
+ }
+
+ public void setOutputPath(@Nullable String outputPath) {
+ this.outputPath = outputPath;
+ }
+
+ public void setMaxNumWorkers(int maxNumWorkers) {
+ this.maxNumWorkers = maxNumWorkers;
+ }
+
+ /**
+ * Return channel for writing bytes to GCS.
+ *
+ * @throws IOException
+ */
+ private WritableByteChannel openWritableGcsFile(GcsOptions options, String filename)
+ throws IOException {
+ WritableByteChannel channel = new GcsIOChannelFactory(options).create(filename, "text/plain");
+ Preconditions.checkState(channel instanceof GoogleCloudStorageWriteChannel);
+ ((GoogleCloudStorageWriteChannel) channel).setUploadBufferSize(CHANNEL_BUFFER);
+ return channel;
+ }
+
+ /** Return a short string to describe {@code timing}. */
+ private String timingToString(PaneInfo.Timing timing) {
+ switch (timing) {
+ case EARLY:
+ return "E";
+ case ON_TIME:
+ return "O";
+ case LATE:
+ return "L";
+ }
+ throw new RuntimeException(); // cases are exhaustive
+ }
+
+ /** Construct an {@link OutputFile} for {@code pane} in {@code window} for {@code shard}. */
+ private OutputFile outputFileFor(BoundedWindow window, String shard, PaneInfo pane) {
+ @Nullable String filename =
+ outputPath == null
+ ? null
+ : String.format("%s/LOG-%s-%s-%03d-%s-%x",
+ outputPath, window.maxTimestamp(), shard, pane.getIndex(),
+ timingToString(pane.getTiming()),
+ ThreadLocalRandom.current().nextLong());
+ return new OutputFile(window.maxTimestamp(), shard, pane.getIndex(),
+ pane.getTiming(), filename);
+ }
+
+ /**
+ * Return path to which we should write the index for {@code window}, or {@literal null}
+ * if no output required.
+ */
+ @Nullable
+ private String indexPathFor(BoundedWindow window) {
+ if (outputPath == null) {
+ return null;
+ }
+ return String.format("%s/INDEX-%s", outputPath, window.maxTimestamp());
+ }
+
+ private PCollection<Done> applyTyped(PCollection<Event> events) {
+ final int numLogShards = maxNumWorkers * NUM_SHARDS_PER_WORKER;
+
+ return events
+ .apply(ParDo.named(name + ".ShardEvents")
+ .of(new DoFn<Event, KV<String, Event>>() {
+ final Aggregator<Long, Long> lateCounter =
+ createAggregator("actuallyLateEvent", new SumLongFn());
+ final Aggregator<Long, Long> onTimeCounter =
+ createAggregator("actuallyOnTimeEvent", new SumLongFn());
+
+ @Override
+ public void processElement(ProcessContext c) {
+ if (c.element().hasAnnotation("LATE")) {
+ lateCounter.addValue(1L);
+ NexmarkUtils.error("Observed late: %s", c.element());
+ } else {
+ onTimeCounter.addValue(1L);
+ }
+ int shardNum = (int) Math.abs((long) c.element().hashCode() % numLogShards);
+ String shard = String.format("shard-%05d-of-%05d", shardNum, numLogShards);
+ c.output(KV.of(shard, c.element()));
+ }
+ }))
+ .apply(Window.<KV<String, Event>>into(
+ FixedWindows.of(Duration.standardSeconds(configuration.windowSizeSec)))
+ .named(name + ".WindowEvents")
+ .triggering(AfterEach.inOrder(
+ Repeatedly
+ .forever(AfterPane.elementCountAtLeast(configuration.maxLogEvents))
+ .orFinally(AfterWatermark.pastEndOfWindow()),
+ Repeatedly.forever(
+ AfterFirst.of(AfterPane.elementCountAtLeast(configuration.maxLogEvents),
+ AfterProcessingTime.pastFirstElementInPane()
+ .plusDelayOf(LATE_BATCHING_PERIOD)))))
+ .discardingFiredPanes()
+ // Use a 1 day allowed lateness so that any forgotten hold will stall the
+ // pipeline for that period and be very noticeable.
+ .withAllowedLateness(Duration.standardDays(1)))
+ .apply(GroupByKey.<String, Event>create())
+ .apply(
+ ParDo.named(name + ".CheckForLateEvents")
+ .of(new DoFnWithContext<KV<String, Iterable<Event>>,
+ KV<String, Iterable<Event>>>() {
+ final Aggregator<Long, Long> earlyCounter =
+ createAggregator("earlyShard", new SumLongFn());
+ final Aggregator<Long, Long> onTimeCounter =
+ createAggregator("onTimeShard", new SumLongFn());
+ final Aggregator<Long, Long> lateCounter =
+ createAggregator("lateShard", new SumLongFn());
+ final Aggregator<Long, Long> unexpectedLatePaneCounter =
+ createAggregator("ERROR_unexpectedLatePane", new SumLongFn());
+ final Aggregator<Long, Long> unexpectedOnTimeElementCounter =
+ createAggregator("ERROR_unexpectedOnTimeElement", new SumLongFn());
+
+ @ProcessElement
+ public void processElement(ProcessContext c, BoundedWindow window) {
+ int numLate = 0;
+ int numOnTime = 0;
+ for (Event event : c.element().getValue()) {
+ if (event.hasAnnotation("LATE")) {
+ numLate++;
+ } else {
+ numOnTime++;
+ }
+ }
+ String shard = c.element().getKey();
+ NexmarkUtils.error(
+ "%s with timestamp %s has %d actually late and %d on-time "
+ + "elements in pane %s for window %s",
+ shard, c.timestamp(), numLate, numOnTime, c.pane(),
+ window.maxTimestamp());
+ if (c.pane().getTiming() == PaneInfo.Timing.LATE) {
+ if (numLate == 0) {
+ NexmarkUtils.error(
+ "ERROR! No late events in late pane for %s", shard);
+ unexpectedLatePaneCounter.addValue(1L);
+ }
+ if (numOnTime > 0) {
+ NexmarkUtils.error(
+ "ERROR! Have %d on-time events in late pane for %s",
+ numOnTime, shard);
+ unexpectedOnTimeElementCounter.addValue(1L);
+ }
+ lateCounter.addValue(1L);
+ } else if (c.pane().getTiming() == PaneInfo.Timing.EARLY) {
+ if (numOnTime + numLate < configuration.maxLogEvents) {
+ NexmarkUtils.error(
+ "ERROR! Only have %d events in early pane for %s",
+ numOnTime + numLate, shard);
+ }
+ earlyCounter.addValue(1L);
+ } else {
+ onTimeCounter.addValue(1L);
+ }
+ c.output(c.element());
+ }
+ }))
+ .apply(
+ ParDo.named(name + ".UploadEvents")
+ .of(new DoFnWithContext<KV<String, Iterable<Event>>,
+ KV<Void, OutputFile>>() {
+ final Aggregator<Long, Long> savedFileCounter =
+ createAggregator("savedFile", new SumLongFn());
+ final Aggregator<Long, Long> writtenRecordsCounter =
+ createAggregator("writtenRecords", new SumLongFn());
+
+ @ProcessElement
+ public void process(ProcessContext c, BoundedWindow window) throws IOException {
+ String shard = c.element().getKey();
+ GcsOptions options = c.getPipelineOptions().as(GcsOptions.class);
+ OutputFile outputFile = outputFileFor(window, shard, c.pane());
+ NexmarkUtils.error(
+ "Writing %s with record timestamp %s, window timestamp %s, pane %s",
+ shard, c.timestamp(), window.maxTimestamp(), c.pane());
+ if (outputFile.filename != null) {
+ NexmarkUtils.error("Beginning write to '%s'", outputFile.filename);
+ int n = 0;
+ try (OutputStream output =
+ Channels.newOutputStream(openWritableGcsFile(options, outputFile
+ .filename))) {
+ for (Event event : c.element().getValue()) {
+ Event.CODER.encode(event, output, Coder.Context.OUTER);
+ writtenRecordsCounter.addValue(1L);
+ if (++n % 10000 == 0) {
+ NexmarkUtils.error("So far written %d records to '%s'", n,
+ outputFile.filename);
+ }
+ }
+ }
+ NexmarkUtils.error("Written all %d records to '%s'", n, outputFile.filename);
+ }
+ savedFileCounter.addValue(1L);
+ c.output(KV.<Void, OutputFile>of(null, outputFile));
+ }
+ }))
+ // Clear fancy triggering from above.
+ .apply(Window.<KV<Void, OutputFile>>into(
+ FixedWindows.of(Duration.standardSeconds(configuration.windowSizeSec)))
+ .named(name + ".WindowLogFiles")
+ .triggering(AfterWatermark.pastEndOfWindow())
+ // We expect no late data here, but we'll assume the worst so we can detect any.
+ .withAllowedLateness(Duration.standardDays(1))
+ .discardingFiredPanes())
+ .apply(GroupByKey.<Void, OutputFile>create())
+ .apply(
+ ParDo.named(name + ".Index")
+ .of(new DoFnWithContext<KV<Void, Iterable<OutputFile>>, Done>() {
+ final Aggregator<Long, Long> unexpectedLateCounter =
+ createAggregator("ERROR_unexpectedLate", new SumLongFn());
+ final Aggregator<Long, Long> unexpectedEarlyCounter =
+ createAggregator("ERROR_unexpectedEarly", new SumLongFn());
+ final Aggregator<Long, Long> unexpectedIndexCounter =
+ createAggregator("ERROR_unexpectedIndex", new SumLongFn());
+ final Aggregator<Long, Long> finalizedCounter =
+ createAggregator("indexed", new SumLongFn());
+
+ @ProcessElement
+ public void process(ProcessContext c, BoundedWindow window) throws IOException {
+ if (c.pane().getTiming() == Timing.LATE) {
+ unexpectedLateCounter.addValue(1L);
+ NexmarkUtils.error("ERROR! Unexpected LATE pane: %s", c.pane());
+ } else if (c.pane().getTiming() == Timing.EARLY) {
+ unexpectedEarlyCounter.addValue(1L);
+ NexmarkUtils.error("ERROR! Unexpected EARLY pane: %s", c.pane());
+ } else if (c.pane().getTiming() == Timing.ON_TIME
+ && c.pane().getIndex() != 0) {
+ unexpectedIndexCounter.addValue(1L);
+ NexmarkUtils.error("ERROR! Unexpected ON_TIME pane index: %s", c.pane());
+ } else {
+ GcsOptions options = c.getPipelineOptions().as(GcsOptions.class);
+ NexmarkUtils.error(
+ "Index with record timestamp %s, window timestamp %s, pane %s",
+ c.timestamp(), window.maxTimestamp(), c.pane());
+
+ @Nullable String filename = indexPathFor(window);
+ if (filename != null) {
+ NexmarkUtils.error("Beginning write to '%s'", filename);
+ int n = 0;
+ try (OutputStream output =
+ Channels.newOutputStream(
+ openWritableGcsFile(options, filename))) {
+ for (OutputFile outputFile : c.element().getValue()) {
+ output.write(outputFile.toString().getBytes());
+ n++;
+ }
+ }
+ NexmarkUtils.error("Written all %d lines to '%s'", n, filename);
+ }
+ c.output(
+ new Done("written for timestamp " + window.maxTimestamp()));
+ finalizedCounter.addValue(1L);
+ }
+ }
+ }));
+ }
+
+ @Override
+ protected PCollection<KnownSize> applyPrim(PCollection<Event> events) {
+ return NexmarkUtils.castToKnownSize(name, applyTyped(events));
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/1f08970a/integration/java/src/main/java/org/apache/beam/integration/nexmark/Query11.java
----------------------------------------------------------------------
diff --git a/integration/java/src/main/java/org/apache/beam/integration/nexmark/Query11.java b/integration/java/src/main/java/org/apache/beam/integration/nexmark/Query11.java
new file mode 100644
index 0000000..9841421
--- /dev/null
+++ b/integration/java/src/main/java/org/apache/beam/integration/nexmark/Query11.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.integration.nexmark;
+
+import org.apache.beam.sdk.transforms.Count;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.windowing.AfterPane;
+import org.apache.beam.sdk.transforms.windowing.Repeatedly;
+import org.apache.beam.sdk.transforms.windowing.Sessions;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+
+import org.joda.time.Duration;
+
+/**
+ * Query "11", 'User sessions' (Not in original suite.)
+ *
+ * <p>Group bids by the same user into sessions with {@code windowSizeSec} max gap.
+ * However limit the session to at most {@code maxLogEvents}. Emit the number of
+ * bids per session.
+ */
+class Query11 extends NexmarkQuery {
+ public Query11(NexmarkConfiguration configuration) {
+ super(configuration, "Query11");
+ }
+
+ private PCollection<BidsPerSession> applyTyped(PCollection<Event> events) {
+ return events.apply(JUST_BIDS)
+ .apply(
+ ParDo.named(name + ".Rekey")
+ .of(new DoFn<Bid, KV<Long, Void>>() {
+ @Override
+ public void processElement(ProcessContext c) {
+ Bid bid = c.element();
+ c.output(KV.of(bid.bidder, (Void) null));
+ }
+ }))
+ .apply(Window.<KV<Long, Void>>into(
+ Sessions.withGapDuration(Duration.standardSeconds(configuration.windowSizeSec)))
+ .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(configuration.maxLogEvents)))
+ .discardingFiredPanes()
+ .withAllowedLateness(Duration.standardSeconds(configuration.occasionalDelaySec / 2)))
+ .apply(Count.<Long, Void>perKey())
+ .apply(
+ ParDo.named(name + ".ToResult")
+ .of(new DoFn<KV<Long, Long>, BidsPerSession>() {
+ @Override
+ public void processElement(ProcessContext c) {
+ c.output(new BidsPerSession(c.element().getKey(), c.element().getValue()));
+ }
+ }));
+ }
+
+ @Override
+ protected PCollection<KnownSize> applyPrim(PCollection<Event> events) {
+ return NexmarkUtils.castToKnownSize(name, applyTyped(events));
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/1f08970a/integration/java/src/main/java/org/apache/beam/integration/nexmark/Query12.java
----------------------------------------------------------------------
diff --git a/integration/java/src/main/java/org/apache/beam/integration/nexmark/Query12.java b/integration/java/src/main/java/org/apache/beam/integration/nexmark/Query12.java
new file mode 100644
index 0000000..dd39971
--- /dev/null
+++ b/integration/java/src/main/java/org/apache/beam/integration/nexmark/Query12.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.integration.nexmark;
+
+import org.apache.beam.sdk.transforms.Count;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
+import org.apache.beam.sdk.transforms.windowing.Repeatedly;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.joda.time.Duration;
+
+/**
+ * Query "12", 'Processing time windows' (Not in original suite.)
+ * <p>
+ * <p>Group bids by the same user into processing time windows of windowSize. Emit the count
+ * of bids per window.
+ */
+class Query12 extends NexmarkQuery {
+ public Query12(NexmarkConfiguration configuration) {
+ super(configuration, "Query12");
+ }
+
+ private PCollection<BidsPerSession> applyTyped(PCollection<Event> events) {
+ return events
+ .apply(JUST_BIDS)
+ .apply(
+ ParDo.named(name + ".Rekey")
+ .of(new DoFn<Bid, KV<Long, Void>>() {
+ @Override
+ public void processElement(ProcessContext c) {
+ Bid bid = c.element();
+ c.output(KV.of(bid.bidder, (Void) null));
+ }
+ }))
+ .apply(Window.<KV<Long, Void>>into(new GlobalWindows())
+ .triggering(
+ Repeatedly.forever(
+ AfterProcessingTime.pastFirstElementInPane()
+ .plusDelayOf(
+ Duration.standardSeconds(configuration.windowSizeSec))))
+ .discardingFiredPanes()
+ .withAllowedLateness(Duration.ZERO))
+ .apply(Count.<Long, Void>perKey())
+ .apply(
+ ParDo.named(name + ".ToResult")
+ .of(new DoFn<KV<Long, Long>, BidsPerSession>() {
+ @Override
+ public void processElement(ProcessContext c) {
+ c.output(
+ new BidsPerSession(c.element().getKey(), c.element().getValue()));
+ }
+ }));
+ }
+
+ @Override
+ protected PCollection<KnownSize> applyPrim(PCollection<Event> events) {
+ return NexmarkUtils.castToKnownSize(name, applyTyped(events));
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/1f08970a/integration/java/src/main/java/org/apache/beam/integration/nexmark/Query1Model.java
----------------------------------------------------------------------
diff --git a/integration/java/src/main/java/org/apache/beam/integration/nexmark/Query1Model.java b/integration/java/src/main/java/org/apache/beam/integration/nexmark/Query1Model.java
new file mode 100644
index 0000000..462d426
--- /dev/null
+++ b/integration/java/src/main/java/org/apache/beam/integration/nexmark/Query1Model.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.integration.nexmark;
+
+import org.apache.beam.sdk.values.TimestampedValue;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.Iterator;
+
+/**
+ * A direct implementation of {@link Query1}.
+ */
+public class Query1Model extends NexmarkQueryModel implements Serializable {
+ /**
+ * Simulator for query 1.
+ */
+ private class Simulator extends AbstractSimulator<Event, Bid> {
+ public Simulator(NexmarkConfiguration configuration) {
+ super(NexmarkUtils.standardEventIterator(configuration));
+ }
+
+ @Override
+ protected void run() {
+ TimestampedValue<Event> timestampedEvent = nextInput();
+ if (timestampedEvent == null) {
+ allDone();
+ return;
+ }
+ Event event = timestampedEvent.getValue();
+ if (event.bid == null) {
+ // Ignore non-bid events.
+ return;
+ }
+ Bid bid = event.bid;
+ Bid resultBid =
+ new Bid(bid.auction, bid.bidder, bid.price * 89 / 100, bid.dateTime, bid.extra);
+ TimestampedValue<Bid> result =
+ TimestampedValue.of(resultBid, timestampedEvent.getTimestamp());
+ addResult(result);
+ }
+ }
+
+ public Query1Model(NexmarkConfiguration configuration) {
+ super(configuration);
+ }
+
+ @Override
+ public AbstractSimulator<?, ?> simulator() {
+ return new Simulator(configuration);
+ }
+
+ @Override
+ protected <T> Collection<String> toCollection(Iterator<TimestampedValue<T>> itr) {
+ return toValueTimestampOrder(itr);
+ }
+}