You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ie...@apache.org on 2017/08/23 17:09:11 UTC

[03/55] [abbrv] beam git commit: NexMark

http://git-wip-us.apache.org/repos/asf/beam/blob/1f08970a/integration/java/src/main/java/org/apache/beam/integration/nexmark/NexmarkUtils.java
----------------------------------------------------------------------
diff --git a/integration/java/src/main/java/org/apache/beam/integration/nexmark/NexmarkUtils.java b/integration/java/src/main/java/org/apache/beam/integration/nexmark/NexmarkUtils.java
new file mode 100644
index 0000000..13ed580
--- /dev/null
+++ b/integration/java/src/main/java/org/apache/beam/integration/nexmark/NexmarkUtils.java
@@ -0,0 +1,681 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.integration.nexmark;
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.AvroCoder;
+import org.apache.beam.sdk.coders.ByteArrayCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.beam.sdk.coders.CustomCoder;
+import org.apache.beam.sdk.coders.SerializableCoder;
+import org.apache.beam.sdk.io.Read;
+import org.apache.beam.sdk.runners.DirectPipelineRunner;
+import org.apache.beam.sdk.runners.PipelineRunner;
+import org.apache.beam.sdk.transforms.Aggregator;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.Sum.SumLongFn;
+import org.apache.beam.sdk.transforms.windowing.AfterPane;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.util.state.StateNamespaces;
+import org.apache.beam.sdk.util.state.StateTag;
+import org.apache.beam.sdk.util.state.StateTags;
+import org.apache.beam.sdk.util.state.ValueState;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PInput;
+import org.apache.beam.sdk.values.TimestampedValue;
+import com.google.common.collect.ImmutableList;
+import com.google.common.hash.Hashing;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * Odd's 'n Ends used throughout queries and driver.
+ */
+public class NexmarkUtils {
+  private static final Logger LOG = LoggerFactory.getLogger(NexmarkGoogleDriver.class.getName());
+
+  /**
+   * Mapper for (de)serializing JSON.
+   */
+  static final ObjectMapper MAPPER = new ObjectMapper();
+
+  /**
+   * Possible sources for events.
+   */
+  public enum SourceType {
+    /**
+     * Produce events directly.
+     */
+    DIRECT,
+    /**
+     * Read events from an Avro file.
+     */
+    AVRO,
+    /**
+     * Read from a PubSub topic. It will be fed the same synthetic events by this pipeline.
+     */
+    PUBSUB
+  }
+
+  /**
+   * Possible sinks for query results.
+   */
+  public enum SinkType {
+    /**
+     * Discard all results.
+     */
+    COUNT_ONLY,
+    /**
+     * Discard all results after converting them to strings.
+     */
+    DEVNULL,
+    /**
+     * Write to a PubSub topic. It will be drained by this pipeline.
+     */
+    PUBSUB,
+    /**
+     * Write to a text file. Only works in batch mode.
+     */
+    TEXT,
+    /**
+     * Write raw Events to Avro. Only works in batch mode.
+     */
+    AVRO,
+    /**
+     * Write raw Events to BigQuery.
+     */
+    BIGQUERY,
+  }
+
+  /**
+   * Pub/sub mode to run in.
+   */
+  public enum PubSubMode {
+    /**
+     * Publish events to pub/sub, but don't run the query.
+     */
+    PUBLISH_ONLY,
+    /**
+     * Consume events from pub/sub and run the query, but don't publish.
+     */
+    SUBSCRIBE_ONLY,
+    /**
+     * Both publish and consume, but as separate jobs.
+     */
+    COMBINED
+  }
+
+  /**
+   * Coder strategies.
+   */
+  public enum CoderStrategy {
+    /**
+     * Hand-written.
+     */
+    HAND,
+    /**
+     * Avro.
+     */
+    AVRO,
+    /**
+     * Java serialization.
+     */
+    JAVA
+  }
+
+  /**
+   * How to determine resource names.
+   */
+  public enum ResourceNameMode {
+    /** Names are used as provided. */
+    VERBATIM,
+    /** Names are suffixed with the query being run. */
+    QUERY,
+    /** Names are suffixed with the query being run and a random number. */
+    QUERY_AND_SALT;
+  }
+
+  /**
+   * Units for rates.
+   */
+  public enum RateUnit {
+    PER_SECOND(1_000_000L),
+    PER_MINUTE(60_000_000L);
+
+    RateUnit(long usPerUnit) {
+      this.usPerUnit = usPerUnit;
+    }
+
+    /**
+     * Number of microseconds per unit.
+     */
+    private final long usPerUnit;
+
+    /**
+     * Number of microseconds between events at given rate.
+     */
+    public long rateToPeriodUs(long rate) {
+      return (usPerUnit + rate / 2) / rate;
+    }
+  }
+
+  /**
+   * Shape of event rate.
+   */
+  public static enum RateShape {
+    SQUARE,
+    SINE;
+
+    /**
+     * Number of steps used to approximate sine wave.
+     */
+    private static final int N = 10;
+
+    /**
+     * Return inter-event delay, in microseconds, for each generator
+     * to follow in order to achieve {@code rate} at {@code unit} using {@code numGenerators}.
+     */
+    public long interEventDelayUs(int rate, RateUnit unit, int numGenerators) {
+      return unit.rateToPeriodUs(rate) * numGenerators;
+    }
+
+    /**
+     * Return array of successive inter-event delays, in microseconds, for each generator
+     * to follow in order to achieve this shape with {@code firstRate/nextRate} at
+     * {@code unit} using {@code numGenerators}.
+     */
+    public long[] interEventDelayUs(
+        int firstRate, int nextRate, RateUnit unit, int numGenerators) {
+      if (firstRate == nextRate) {
+        long[] interEventDelayUs = new long[1];
+        interEventDelayUs[0] = unit.rateToPeriodUs(firstRate) * numGenerators;
+        return interEventDelayUs;
+      }
+
+      switch (this) {
+        case SQUARE: {
+          long[] interEventDelayUs = new long[2];
+          interEventDelayUs[0] = unit.rateToPeriodUs(firstRate) * numGenerators;
+          interEventDelayUs[1] = unit.rateToPeriodUs(nextRate) * numGenerators;
+          return interEventDelayUs;
+        }
+        case SINE: {
+          double mid = (firstRate + nextRate) / 2.0;
+          double amp = (firstRate - nextRate) / 2.0; // may be -ve
+          long[] interEventDelayUs = new long[N];
+          for (int i = 0; i < N; i++) {
+            double r = (2.0 * Math.PI * i) / N;
+            double rate = mid + amp * Math.cos(r);
+            interEventDelayUs[i] = unit.rateToPeriodUs(Math.round(rate)) * numGenerators;
+          }
+          return interEventDelayUs;
+        }
+      }
+      throw new RuntimeException(); // switch should be exhaustive
+    }
+
+    /**
+     * Return delay between steps, in seconds, for result of {@link #interEventDelayUs}, so
+     * as to cycle through the entire sequence every {@code ratePeriodSec}.
+     */
+    public int stepLengthSec(int ratePeriodSec) {
+      int n = 0;
+      switch (this) {
+        case SQUARE:
+          n = 2;
+          break;
+        case SINE:
+          n = N;
+          break;
+      }
+      return (ratePeriodSec + n - 1) / n;
+    }
+  }
+
+  /**
+   * Set to true to capture all info messages. The logging level flags don't currently work.
+   */
+  private static final boolean LOG_INFO = false;
+
+  /**
+   * Set to true to capture all error messages. The logging level flags don't currently work.
+   */
+  private static final boolean LOG_ERROR = true;
+
+  /**
+   * Set to true to log directly to stdout on VM. You can watch the results in real-time with:
+   * tail -f /var/log/dataflow/streaming-harness/harness-stdout.log
+   */
+  private static final boolean LOG_TO_CONSOLE = false;
+
+  /**
+   * Log info message.
+   */
+  public static void info(String format, Object... args) {
+    if (LOG_INFO) {
+      LOG.info(String.format(format, args));
+      if (LOG_TO_CONSOLE) {
+        System.out.println(String.format(format, args));
+      }
+    }
+  }
+
+  /**
+   * Log error message.
+   */
+  public static void error(String format, Object... args) {
+    if (LOG_ERROR) {
+      LOG.error(String.format(format, args));
+      if (LOG_TO_CONSOLE) {
+        System.out.println(String.format(format, args));
+      }
+    }
+  }
+
+  /**
+   * Log message to console. For client side only.
+   */
+  public static void console(String format, Object... args) {
+    System.out.printf("%s %s\n", Instant.now(), String.format(format, args));
+  }
+
+  /**
+   * Label to use for timestamps on pub/sub messages.
+   */
+  public static final String PUBSUB_TIMESTAMP = "timestamp";
+
+  /**
+   * Label to use for windmill ids on pub/sub messages.
+   */
+  public static final String PUBSUB_ID = "id";
+
+  /**
+   * All events will be given a timestamp relative to this time (ms since epoch).
+   */
+  public static final long BASE_TIME = Instant.parse("2015-07-15T00:00:00.000Z").getMillis();
+
+  /**
+   * Instants guaranteed to be strictly before and after all event timestamps, and which won't
+   * be subject to underflow/overflow.
+   */
+  public static final Instant BEGINNING_OF_TIME = new Instant(0).plus(Duration.standardDays(365));
+  public static final Instant END_OF_TIME =
+      BoundedWindow.TIMESTAMP_MAX_VALUE.minus(Duration.standardDays(365));
+
+  /**
+   * Setup pipeline with codes and some other options.
+   */
+  public static void setupPipeline(CoderStrategy coderStrategy, Pipeline p) {
+    PipelineRunner<?> runner = p.getRunner();
+    if (runner instanceof DirectPipelineRunner) {
+      // Disable randomization of output since we want to check batch and streaming match the
+      // model both locally and on the cloud.
+      ((DirectPipelineRunner) runner).withUnorderednessTesting(false);
+    }
+
+    CoderRegistry registry = p.getCoderRegistry();
+    switch (coderStrategy) {
+      case HAND:
+        registry.registerCoder(Auction.class, Auction.CODER);
+        registry.registerCoder(AuctionBid.class, AuctionBid.CODER);
+        registry.registerCoder(AuctionCount.class, AuctionCount.CODER);
+        registry.registerCoder(AuctionPrice.class, AuctionPrice.CODER);
+        registry.registerCoder(Bid.class, Bid.CODER);
+        registry.registerCoder(CategoryPrice.class, CategoryPrice.CODER);
+        registry.registerCoder(Event.class, Event.CODER);
+        registry.registerCoder(IdNameReserve.class, IdNameReserve.CODER);
+        registry.registerCoder(NameCityStateId.class, NameCityStateId.CODER);
+        registry.registerCoder(Person.class, Person.CODER);
+        registry.registerCoder(SellerPrice.class, SellerPrice.CODER);
+        registry.registerCoder(Done.class, Done.CODER);
+        registry.registerCoder(BidsPerSession.class, BidsPerSession.CODER);
+        break;
+      case AVRO:
+        registry.setFallbackCoderProvider(AvroCoder.PROVIDER);
+        break;
+      case JAVA:
+        registry.setFallbackCoderProvider(SerializableCoder.PROVIDER);
+        break;
+    }
+  }
+
+  /**
+   * Return a generator config to match the given {@code options}.
+   */
+  public static GeneratorConfig standardGeneratorConfig(NexmarkConfiguration configuration) {
+    return new GeneratorConfig(configuration,
+                               configuration.useWallclockEventTime ? System.currentTimeMillis()
+                                                                   : BASE_TIME, 0,
+                               configuration.numEvents, 0);
+  }
+
+  /**
+   * Return an iterator of events using the 'standard' generator config.
+   */
+  public static Iterator<TimestampedValue<Event>> standardEventIterator(
+      NexmarkConfiguration configuration) {
+    return new Generator(standardGeneratorConfig(configuration));
+  }
+
+  /**
+   * Return a transform which yields a finite number of synthesized events generated
+   * as a batch.
+   */
+  public static PTransform<PInput, PCollection<Event>> batchEventsSource(
+      String name, NexmarkConfiguration configuration) {
+    return Read
+        .from(new BoundedEventSource(
+            NexmarkUtils.standardGeneratorConfig(configuration), configuration.numEventGenerators))
+        .named(name + ".ReadBounded");
+  }
+
+  /**
+   * Return a transform which yields a finite number of synthesized events generated
+   * on-the-fly in real time.
+   */
+  public static PTransform<PInput, PCollection<Event>> streamEventsSource(
+      String name, NexmarkConfiguration configuration) {
+    return Read.from(new UnboundedEventSource(NexmarkUtils.standardGeneratorConfig(configuration),
+                                              configuration.numEventGenerators,
+                                              configuration.watermarkHoldbackSec,
+                                              configuration.isRateLimited))
+               .named(name + ".ReadUnbounded");
+  }
+
+  /**
+   * Return a transform to pass-through events, but count them as they go by.
+   */
+  public static ParDo.Bound<Event, Event> snoop(final String name) {
+    return ParDo.named(name + ".Snoop")
+                .of(new DoFn<Event, Event>() {
+                  final Aggregator<Long, Long> eventCounter =
+                      createAggregator("events", new SumLongFn());
+                  final Aggregator<Long, Long> newPersonCounter =
+                      createAggregator("newPersons", new SumLongFn());
+                  final Aggregator<Long, Long> newAuctionCounter =
+                      createAggregator("newAuctions", new SumLongFn());
+                  final Aggregator<Long, Long> bidCounter =
+                      createAggregator("bids", new SumLongFn());
+                  final Aggregator<Long, Long> endOfStreamCounter =
+                      createAggregator("endOfStream", new SumLongFn());
+
+                  @Override
+                  public void processElement(ProcessContext c) {
+                    eventCounter.addValue(1L);
+                    if (c.element().newPerson != null) {
+                      newPersonCounter.addValue(1L);
+                    } else if (c.element().newAuction != null) {
+                      newAuctionCounter.addValue(1L);
+                    } else if (c.element().bid != null) {
+                      bidCounter.addValue(1L);
+                    } else {
+                      endOfStreamCounter.addValue(1L);
+                    }
+                    info("%s snooping element %s", name, c.element());
+                    c.output(c.element());
+                  }
+                });
+  }
+
+  /**
+   * Return a transform to count and discard each element.
+   */
+  public static <T> ParDo.Bound<T, Void> devNull(String name) {
+    return ParDo.named(name + ".DevNull")
+                .of(new DoFn<T, Void>() {
+                  final Aggregator<Long, Long> discardCounter =
+                      createAggregator("discarded", new SumLongFn());
+
+                  @Override
+                  public void processElement(ProcessContext c) {
+                    discardCounter.addValue(1L);
+                  }
+                });
+  }
+
+  /**
+   * Return a transform to log each element, passing it through unchanged.
+   */
+  public static <T> ParDo.Bound<T, T> log(final String name) {
+    return ParDo.named(name + ".Log")
+                .of(new DoFn<T, T>() {
+                  @Override
+                  public void processElement(ProcessContext c) {
+                    error("%s: %s", name, c.element());
+                    c.output(c.element());
+                  }
+                });
+  }
+
+  /**
+   * Return a transform to format each element as a string.
+   */
+  public static <T> ParDo.Bound<T, String> format(String name) {
+    return ParDo.named(name + ".Format")
+                .of(new DoFn<T, String>() {
+                  final Aggregator<Long, Long> recordCounter =
+                      createAggregator("records", new SumLongFn());
+
+                  @Override
+                  public void processElement(ProcessContext c) {
+                    recordCounter.addValue(1L);
+                    c.output(c.element().toString());
+                  }
+                });
+  }
+
+  /**
+   * Return a transform to make explicit the timestamp of each element.
+   */
+  public static <T> ParDo.Bound<T, TimestampedValue<T>> stamp(String name) {
+    return ParDo.named(name + ".Stamp")
+                .of(new DoFn<T, TimestampedValue<T>>() {
+                  @Override
+                  public void processElement(ProcessContext c) {
+                    c.output(TimestampedValue.of(c.element(), c.timestamp()));
+                  }
+                });
+  }
+
+  /**
+   * Return a transform to reduce a stream to a single, order-invariant long hash.
+   */
+  public static <T> PTransform<PCollection<T>, PCollection<Long>> hash(
+      final long numEvents, String name) {
+    return new PTransform<PCollection<T>, PCollection<Long>>(name) {
+      @Override
+      public PCollection<Long> apply(PCollection<T> input) {
+        return input.apply(Window.<T>into(new GlobalWindows())
+                               .triggering(AfterPane.elementCountAtLeast((int) numEvents))
+                               .withAllowedLateness(Duration.standardDays(1))
+                               .discardingFiredPanes())
+
+                    .apply(ParDo.named(name + ".Hash").of(new DoFn<T, Long>() {
+                      @Override
+                      public void processElement(ProcessContext c) {
+                        long hash =
+                            Hashing.murmur3_128()
+                                   .newHasher()
+                                   .putLong(c.timestamp().getMillis())
+                                   .putString(c.element().toString(), StandardCharsets.UTF_8)
+                                   .hash()
+                                   .asLong();
+                        c.output(hash);
+                      }
+                    }))
+
+                    .apply(Combine.globally(new Combine.BinaryCombineFn<Long>() {
+                      @Override
+                      public Long apply(Long left, Long right) {
+                        return left ^ right;
+                      }
+                    }));
+      }
+    };
+  }
+
+  private static final long MASK = (1L << 16) - 1L;
+  private static final long HASH = 0x243F6A8885A308D3L;
+  private static final long INIT_PLAINTEXT = 50000L;
+
+  /**
+   * Return a transform to keep the CPU busy for given milliseconds on every record.
+   */
+  public static <T> ParDo.Bound<T, T> cpuDelay(String name, final long delayMs) {
+    return ParDo.named(name + ".CpuDelay")
+                .of(new DoFn<T, T>() {
+                  @Override
+                  public void processElement(ProcessContext c) {
+                    long now = System.currentTimeMillis();
+                    long end = now + delayMs;
+                    while (now < end) {
+                      // Find plaintext which hashes to HASH in lowest MASK bits.
+                      // Values chosen to roughly take 1ms on typical workstation.
+                      long p = INIT_PLAINTEXT;
+                      while (true) {
+                        long t = Hashing.murmur3_128().hashLong(p).asLong();
+                        if ((t & MASK) == (HASH & MASK)) {
+                          break;
+                        }
+                        p++;
+                      }
+                      long next = System.currentTimeMillis();
+                      now = next;
+                    }
+                    c.output(c.element());
+                  }
+                });
+  }
+
+  private static final StateTag<Object, ValueState<byte[]>> DUMMY_TAG =
+      StateTags.value("dummy", ByteArrayCoder.of());
+  private static final int MAX_BUFFER_SIZE = 1 << 24;
+
+  /**
+   * Return a transform to write given number of bytes to durable store on every record.
+   */
+  public static <T> ParDo.Bound<T, T> diskBusy(String name, final long bytes) {
+    return ParDo.named(name + ".DiskBusy")
+                .of(new DoFn<T, T>() {
+                  @Override
+                  public void processElement(ProcessContext c) {
+                    long remain = bytes;
+                    long start = System.currentTimeMillis();
+                    long now = start;
+                    while (remain > 0) {
+                      long thisBytes = Math.min(remain, MAX_BUFFER_SIZE);
+                      remain -= thisBytes;
+                      byte[] arr = new byte[(int) thisBytes];
+                      for (int i = 0; i < thisBytes; i++) {
+                        arr[i] = (byte) now;
+                      }
+                      ValueState<byte[]> state = c.windowingInternals().stateInternals().state(
+                          StateNamespaces.global(), DUMMY_TAG);
+                      state.write(arr);
+                      now = System.currentTimeMillis();
+                    }
+                    c.output(c.element());
+                  }
+                });
+  }
+
+  /**
+   * Return a transform to cast each element to {@link KnownSize}.
+   */
+  private static <T extends KnownSize> ParDo.Bound<T, KnownSize> castToKnownSize(
+      final String name) {
+    return ParDo.named(name + ".Forget")
+                .of(new DoFn<T, KnownSize>() {
+                  @Override
+                  public void processElement(ProcessContext c) {
+                    c.output(c.element());
+                  }
+                });
+  }
+
+  /**
+   * A coder for instances of {@code T} cast up to {@link KnownSize}.
+   *
+   * @param <T> True type of object.
+   */
+  private static class CastingCoder<T extends KnownSize> extends CustomCoder<KnownSize> {
+    private final Coder<T> trueCoder;
+
+    public CastingCoder(Coder<T> trueCoder) {
+      this.trueCoder = trueCoder;
+    }
+
+    @Override
+    public void encode(KnownSize value, OutputStream outStream, Context context)
+        throws CoderException, IOException {
+      @SuppressWarnings("unchecked")
+      T typedValue = (T) value;
+      trueCoder.encode(typedValue, outStream, context);
+    }
+
+    @Override
+    public KnownSize decode(InputStream inStream, Context context)
+        throws CoderException, IOException {
+      return trueCoder.decode(inStream, context);
+    }
+
+    @Override
+    public List<? extends Coder<?>> getComponents() {
+      return ImmutableList.of(trueCoder);
+    }
+  }
+
+  /**
+   * Return a coder for {@code KnownSize} that are known to be exactly of type {@code T}.
+   */
+  private static <T extends KnownSize> Coder<KnownSize> makeCastingCoder(Coder<T> trueCoder) {
+    return new CastingCoder<>(trueCoder);
+  }
+
+  /**
+   * Return {@code elements} as {@code KnownSize}s.
+   */
+  public static <T extends KnownSize> PCollection<KnownSize> castToKnownSize(
+      final String name, PCollection<T> elements) {
+    return elements.apply(castToKnownSize(name)).setCoder(makeCastingCoder(elements.getCoder()));
+  }
+
+  // Do not instantiate.
+  private NexmarkUtils() {
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/1f08970a/integration/java/src/main/java/org/apache/beam/integration/nexmark/Options.java
----------------------------------------------------------------------
diff --git a/integration/java/src/main/java/org/apache/beam/integration/nexmark/Options.java b/integration/java/src/main/java/org/apache/beam/integration/nexmark/Options.java
new file mode 100644
index 0000000..4f5304d
--- /dev/null
+++ b/integration/java/src/main/java/org/apache/beam/integration/nexmark/Options.java
@@ -0,0 +1,360 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.integration.nexmark;
+
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PubsubOptions;
+import javax.annotation.Nullable;
+
+/**
+ * Command line flags.
+ */
+public interface Options extends PubsubOptions {
+  @Description("Which suite to run. Default is to use command line arguments for one job.")
+  @Default.Enum("DEFAULT")
+  NexmarkSuite getSuite();
+
+  void setSuite(NexmarkSuite suite);
+
+  @Description("If true, and using the DataflowPipelineRunner, monitor the jobs as they run.")
+  @Default.Boolean(false)
+  boolean getMonitorJobs();
+
+  void setMonitorJobs(boolean monitorJobs);
+
+  @Description("Where the events come from.")
+  @Nullable
+  NexmarkUtils.SourceType getSourceType();
+
+  void setSourceType(NexmarkUtils.SourceType sourceType);
+
+  @Description("Prefix for input files if using avro input")
+  @Nullable
+  String getInputPath();
+
+  void setInputPath(String inputPath);
+
+  @Description("Where results go.")
+  @Nullable
+  NexmarkUtils.SinkType getSinkType();
+
+  void setSinkType(NexmarkUtils.SinkType sinkType);
+
+  @Description("Which mode to run in when source is PUBSUB.")
+  @Nullable
+  NexmarkUtils.PubSubMode getPubSubMode();
+
+  void setPubSubMode(NexmarkUtils.PubSubMode pubSubMode);
+
+  @Description("Which query to run.")
+  @Nullable
+  Integer getQuery();
+
+  void setQuery(Integer query);
+
+  @Description("Prefix for output files if using text output for results or running Query 10.")
+  @Nullable
+  String getOutputPath();
+
+  void setOutputPath(String outputPath);
+
+  @Description("Base name of pubsub topic to publish to in streaming mode.")
+  @Nullable
+  @Default.String("nexmark")
+  String getPubsubTopic();
+
+  void setPubsubTopic(String pubsubTopic);
+
+  @Description("Base name of pubsub subscription to read from in streaming mode.")
+  @Nullable
+  @Default.String("nexmark")
+  String getPubsubSubscription();
+
+  void setPubsubSubscription(String pubsubSubscription);
+
+  @Description("Base name of BigQuery table name if using BigQuery output.")
+  @Nullable
+  @Default.String("nexmark")
+  String getBigQueryTable();
+
+  void setBigQueryTable(String bigQueryTable);
+
+  @Description("Approximate number of events to generate. "
+               + "Zero for effectively unlimited in streaming mode.")
+  @Nullable
+  Long getNumEvents();
+
+  void setNumEvents(Long numEvents);
+
+  @Description("Time in seconds to preload the subscription with data, at the initial input rate "
+               + "of the pipeline.")
+  @Nullable
+  Integer getPreloadSeconds();
+
+  void setPreloadSeconds(Integer preloadSeconds);
+
+  @Description("Number of unbounded sources to create events.")
+  @Nullable
+  Integer getNumEventGenerators();
+
+  void setNumEventGenerators(Integer numEventGenerators);
+
+  @Description("Shape of event rate curve.")
+  @Nullable
+  NexmarkUtils.RateShape getRateShape();
+
+  void setRateShape(NexmarkUtils.RateShape rateShape);
+
+  @Description("Initial overall event rate (in --rateUnit).")
+  @Nullable
+  Integer getFirstEventRate();
+
+  void setFirstEventRate(Integer firstEventRate);
+
+  @Description("Next overall event rate (in --rateUnit).")
+  @Nullable
+  Integer getNextEventRate();
+
+  void setNextEventRate(Integer nextEventRate);
+
+  @Description("Unit for rates.")
+  @Nullable
+  NexmarkUtils.RateUnit getRateUnit();
+
+  void setRateUnit(NexmarkUtils.RateUnit rateUnit);
+
+  @Description("Overall period of rate shape, in seconds.")
+  @Nullable
+  Integer getRatePeriodSec();
+
+  void setRatePeriodSec(Integer ratePeriodSec);
+
+  @Description("If true, relay events in real time in streaming mode.")
+  @Nullable
+  Boolean getIsRateLimited();
+
+  void setIsRateLimited(Boolean isRateLimited);
+
+  @Description("If true, use wallclock time as event time. Otherwise, use a deterministic"
+               + " time in the past so that multiple runs will see exactly the same event streams"
+               + " and should thus have exactly the same results.")
+  @Nullable
+  Boolean getUseWallclockEventTime();
+
+  void setUseWallclockEventTime(Boolean useWallclockEventTime);
+
+  @Description("Assert pipeline results match model results.")
+  @Nullable
+  boolean getAssertCorrectness();
+
+  void setAssertCorrectness(boolean assertCorrectness);
+
+  @Description("Log all input events.")
+  @Nullable
+  boolean getLogEvents();
+
+  void setLogEvents(boolean logEvents);
+
+  @Description("Log all query results.")
+  @Nullable
+  boolean getLogResults();
+
+  void setLogResults(boolean logResults);
+
+  @Description("Average size in bytes for a person record.")
+  @Nullable
+  Integer getAvgPersonByteSize();
+
+  void setAvgPersonByteSize(Integer avgPersonByteSize);
+
+  @Description("Average size in bytes for an auction record.")
+  @Nullable
+  Integer getAvgAuctionByteSize();
+
+  void setAvgAuctionByteSize(Integer avgAuctionByteSize);
+
+  @Description("Average size in bytes for a bid record.")
+  @Nullable
+  Integer getAvgBidByteSize();
+
+  void setAvgBidByteSize(Integer avgBidByteSize);
+
+  @Description("Ratio of bids for 'hot' auctions above the background.")
+  @Nullable
+  Integer getHotAuctionRatio();
+
+  void setHotAuctionRatio(Integer hotAuctionRatio);
+
+  @Description("Ratio of auctions for 'hot' sellers above the background.")
+  @Nullable
+  Integer getHotSellersRatio();
+
+  void setHotSellersRatio(Integer hotSellersRatio);
+
+  @Description("Ratio of auctions for 'hot' bidders above the background.")
+  @Nullable
+  Integer getHotBiddersRatio();
+
+  void setHotBiddersRatio(Integer hotBiddersRatio);
+
+  @Description("Window size in seconds.")
+  @Nullable
+  Long getWindowSizeSec();
+
+  void setWindowSizeSec(Long windowSizeSec);
+
+  @Description("Window period in seconds.")
+  @Nullable
+  Long getWindowPeriodSec();
+
+  void setWindowPeriodSec(Long windowPeriodSec);
+
+  @Description("If in streaming mode, the holdback for watermark in seconds.")
+  @Nullable
+  Long getWatermarkHoldbackSec();
+
+  void setWatermarkHoldbackSec(Long watermarkHoldbackSec);
+
+  @Description("Roughly how many auctions should be in flight for each generator.")
+  @Nullable
+  Integer getNumInFlightAuctions();
+
+  void setNumInFlightAuctions(Integer numInFlightAuctions);
+
+
+  @Description("Maximum number of people to consider as active for placing auctions or bids.")
+  @Nullable
+  Integer getNumActivePeople();
+
+  void setNumActivePeople(Integer numActivePeople);
+
+  @Description("Filename of perf data to append to.")
+  @Nullable
+  String getPerfFilename();
+
+  void setPerfFilename(String perfFilename);
+
+  @Description("Filename of baseline perf data to read from.")
+  @Nullable
+  String getBaselineFilename();
+
+  void setBaselineFilename(String baselineFilename);
+
+  @Description("Filename of summary perf data to append to.")
+  @Nullable
+  String getSummaryFilename();
+
+  void setSummaryFilename(String summaryFilename);
+
+  @Description("Filename for javascript capturing all perf data and any baselines.")
+  @Nullable
+  String getJavascriptFilename();
+
+  void setJavascriptFilename(String javascriptFilename);
+
+  @Description("If true, don't run the actual query. Instead, calculate the distribution "
+               + "of number of query results per (event time) minute according to the query model.")
+  @Nullable
+  boolean getJustModelResultRate();
+
+  void setJustModelResultRate(boolean justModelResultRate);
+
+  @Description("Coder strategy to use.")
+  @Nullable
+  NexmarkUtils.CoderStrategy getCoderStrategy();
+
+  void setCoderStrategy(NexmarkUtils.CoderStrategy coderStrategy);
+
+  @Description("Delay, in milliseconds, for each event. We will peg one core for this "
+               + "number of milliseconds to simulate CPU-bound computation.")
+  @Nullable
+  Long getCpuDelayMs();
+
+  void setCpuDelayMs(Long cpuDelayMs);
+
+  @Description("Extra data, in bytes, to save to persistent state for each event. "
+               + "This will force I/O all the way to durable storage to simulate an "
+               + "I/O-bound computation.")
+  @Nullable
+  Long getDiskBusyBytes();
+
+  void setDiskBusyBytes(Long diskBusyBytes);
+
+  @Description("Skip factor for query 2. We select bids for every {@code auctionSkip}'th auction")
+  @Nullable
+  Integer getAuctionSkip();
+
+  void setAuctionSkip(Integer auctionSkip);
+
+  @Description("Fanout for queries 4 (groups by category id) and 7 (finds a global maximum).")
+  @Nullable
+  Integer getFanout();
+
+  void setFanout(Integer fanout);
+
+  @Description("Length of occasional delay to impose on events (in seconds).")
+  @Nullable
+  Long getOccasionalDelaySec();
+
+  void setOccasionalDelaySec(Long occasionalDelaySec);
+
+  @Description("Probability that an event will be delayed by delayS.")
+  @Nullable
+  Double getProbDelayedEvent();
+
+  void setProbDelayedEvent(Double probDelayedEvent);
+
+  @Description("Maximum size of each log file (in events). For Query10 only.")
+  @Nullable
+  Integer getMaxLogEvents();
+
+  void setMaxLogEvents(Integer maxLogEvents);
+
+  @Description("How to derive names of resources.")
+  @Default.Enum("QUERY_AND_SALT")
+  NexmarkUtils.ResourceNameMode getResourceNameMode();
+
+  void setResourceNameMode(NexmarkUtils.ResourceNameMode mode);
+
+  @Description("If true, manage the creation and cleanup of topics, subscriptions and gcs files.")
+  @Default.Boolean(true)
+  boolean getManageResources();
+
+  void setManageResources(boolean manageResources);
+
+  @Description("If true, use pub/sub publish time instead of event time.")
+  @Nullable
+  Boolean getUsePubsubPublishTime();
+
+  void setUsePubsubPublishTime(Boolean usePubsubPublishTime);
+
+  @Description("Number of events in out-of-order groups. 1 implies no out-of-order events. "
+               + "1000 implies every 1000 events per generator are emitted in pseudo-random order.")
+  @Nullable
+  Long getOutOfOrderGroupSize();
+
+  void setOutOfOrderGroupSize(Long outOfOrderGroupSize);
+
+  @Description("If false, do not add the Monitor and Snoop transforms.")
+  @Nullable
+  Boolean getDebug();
+
+  void setDebug(Boolean value);
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/1f08970a/integration/java/src/main/java/org/apache/beam/integration/nexmark/Person.java
----------------------------------------------------------------------
diff --git a/integration/java/src/main/java/org/apache/beam/integration/nexmark/Person.java b/integration/java/src/main/java/org/apache/beam/integration/nexmark/Person.java
new file mode 100644
index 0000000..6fcf388
--- /dev/null
+++ b/integration/java/src/main/java/org/apache/beam/integration/nexmark/Person.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.integration.nexmark;
+
+import org.apache.beam.sdk.coders.AtomicCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.VarLongCoder;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonProcessingException;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.Serializable;
+
+/**
+ * A person either creating an auction or making a bid.
+ */
+public class Person implements KnownSize, Serializable {
+  private static final Coder<Long> LONG_CODER = VarLongCoder.of();
+  private static final Coder<String> STRING_CODER = StringUtf8Coder.of();
+  public static final Coder<Person> CODER = new AtomicCoder<Person>() {
+    @Override
+    public void encode(Person value, OutputStream outStream,
+        Coder.Context context)
+        throws CoderException, IOException {
+      LONG_CODER.encode(value.id, outStream, Context.NESTED);
+      STRING_CODER.encode(value.name, outStream, Context.NESTED);
+      STRING_CODER.encode(value.emailAddress, outStream, Context.NESTED);
+      STRING_CODER.encode(value.creditCard, outStream, Context.NESTED);
+      STRING_CODER.encode(value.city, outStream, Context.NESTED);
+      STRING_CODER.encode(value.state, outStream, Context.NESTED);
+      LONG_CODER.encode(value.dateTime, outStream, Context.NESTED);
+      STRING_CODER.encode(value.extra, outStream, Context.NESTED);
+    }
+
+    @Override
+    public Person decode(
+        InputStream inStream, Coder.Context context)
+        throws CoderException, IOException {
+      long id = LONG_CODER.decode(inStream, Context.NESTED);
+      String name = STRING_CODER.decode(inStream, Context.NESTED);
+      String emailAddress = STRING_CODER.decode(inStream, Context.NESTED);
+      String creditCard = STRING_CODER.decode(inStream, Context.NESTED);
+      String city = STRING_CODER.decode(inStream, Context.NESTED);
+      String state = STRING_CODER.decode(inStream, Context.NESTED);
+      long dateTime = LONG_CODER.decode(inStream, Context.NESTED);
+      String extra = STRING_CODER.decode(inStream, Context.NESTED);
+      return new Person(id, name, emailAddress, creditCard, city, state, dateTime, extra);
+    }
+  };
+
+  /** Id of person. */
+  @JsonProperty
+  public final long id; // primary key
+
+  /** Extra person properties. */
+  @JsonProperty
+  public final String name;
+
+  @JsonProperty
+  public final String emailAddress;
+
+  @JsonProperty
+  public final String creditCard;
+
+  @JsonProperty
+  public final String city;
+
+  @JsonProperty
+  public final String state;
+
+  @JsonProperty
+  public final long dateTime;
+
+  /** Additional arbitrary payload for performance testing. */
+  @JsonProperty
+  public final String extra;
+
+  // For Avro only.
+  @SuppressWarnings("unused")
+  private Person() {
+    id = 0;
+    name = null;
+    emailAddress = null;
+    creditCard = null;
+    city = null;
+    state = null;
+    dateTime = 0;
+    extra = null;
+  }
+
+  public Person(long id, String name, String emailAddress, String creditCard, String city,
+      String state, long dateTime, String extra) {
+    this.id = id;
+    this.name = name;
+    this.emailAddress = emailAddress;
+    this.creditCard = creditCard;
+    this.city = city;
+    this.state = state;
+    this.dateTime = dateTime;
+    this.extra = extra;
+  }
+
+  /**
+   * Return a copy of person which capture the given annotation.
+   * (Used for debugging).
+   */
+  public Person withAnnotation(String annotation) {
+    return new Person(id, name, emailAddress, creditCard, city, state, dateTime,
+        annotation + ": " + extra);
+  }
+
+  /**
+   * Does person have {@code annotation}? (Used for debugging.)
+   */
+  public boolean hasAnnotation(String annotation) {
+    return extra.startsWith(annotation + ": ");
+  }
+
+  /**
+   * Remove {@code annotation} from person. (Used for debugging.)
+   */
+  public Person withoutAnnotation(String annotation) {
+    if (hasAnnotation(annotation)) {
+      return new Person(id, name, emailAddress, creditCard, city, state, dateTime,
+          extra.substring(annotation.length() + 2));
+    } else {
+      return this;
+    }
+  }
+
+  @Override
+  public long sizeInBytes() {
+    return 8 + name.length() + 1 + emailAddress.length() + 1 + creditCard.length() + 1
+        + city.length() + 1 + state.length() + 8 + 1 + extra.length() + 1;
+  }
+
+  @Override
+  public String toString() {
+    try {
+      return NexmarkUtils.MAPPER.writeValueAsString(this);
+    } catch (JsonProcessingException e) {
+      throw new RuntimeException(e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/1f08970a/integration/java/src/main/java/org/apache/beam/integration/nexmark/PubsubHelper.java
----------------------------------------------------------------------
diff --git a/integration/java/src/main/java/org/apache/beam/integration/nexmark/PubsubHelper.java b/integration/java/src/main/java/org/apache/beam/integration/nexmark/PubsubHelper.java
new file mode 100644
index 0000000..1255154
--- /dev/null
+++ b/integration/java/src/main/java/org/apache/beam/integration/nexmark/PubsubHelper.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.integration.nexmark;
+
+import org.apache.beam.sdk.options.PubsubOptions;
+import org.apache.beam.sdk.util.PubsubClient;
+import org.apache.beam.sdk.util.PubsubJsonClient;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+/**
+ * Helper for working with pubsub.
+ */
+public class PubsubHelper implements AutoCloseable {
+  /**
+   * Underlying pub/sub client.
+   */
+  private final PubsubClient pubsubClient;
+
+  /**
+   * Project id.
+   */
+  private final String projectId;
+
+  /**
+   * Topics we should delete on close.
+   */
+  private final List<PubsubClient.TopicPath> createdTopics;
+
+  /**
+   * Subscriptions we should delete on close.
+   */
+  private final List<PubsubClient.SubscriptionPath> createdSubscriptions;
+
+  private PubsubHelper(PubsubClient pubsubClient, String projectId) {
+    this.pubsubClient = pubsubClient;
+    this.projectId = projectId;
+    createdTopics = new ArrayList<>();
+    createdSubscriptions = new ArrayList<>();
+  }
+
+  /**
+   * Create a helper.
+   */
+  public static PubsubHelper create(PubsubOptions options) {
+    try {
+      return new PubsubHelper(
+          PubsubJsonClient.FACTORY.newClient(null, null, options),
+          options.getProject());
+    } catch (IOException e) {
+      throw new RuntimeException("Unable to create Pubsub client: ", e);
+    }
+  }
+
+  /**
+   * Create a topic from short name. Delete it if it already exists. Ensure the topic will be
+   * deleted on cleanup. Return full topic name.
+   */
+  public PubsubClient.TopicPath createTopic(String shortTopic) {
+    PubsubClient.TopicPath topic = PubsubClient.topicPathFromName(projectId, shortTopic);
+    try {
+      if (topicExists(shortTopic)) {
+        NexmarkUtils.console("attempting to cleanup topic %s", topic);
+        pubsubClient.deleteTopic(topic);
+      }
+      NexmarkUtils.console("create topic %s", topic);
+      pubsubClient.createTopic(topic);
+      createdTopics.add(topic);
+      return topic;
+    } catch (IOException e) {
+      throw new RuntimeException("Unable to create Pubsub topic " + topic + ": ", e);
+    }
+  }
+
+  /**
+   * Create a topic from short name if it does not already exist. The topic will not be
+   * deleted on cleanup. Return full topic name.
+   */
+  public PubsubClient.TopicPath createOrReuseTopic(String shortTopic) {
+    PubsubClient.TopicPath topic = PubsubClient.topicPathFromName(projectId, shortTopic);
+    try {
+      if (topicExists(shortTopic)) {
+        NexmarkUtils.console("topic %s already exists", topic);
+        return topic;
+      }
+      NexmarkUtils.console("create topic %s", topic);
+      pubsubClient.createTopic(topic);
+      return topic;
+    } catch (IOException e) {
+      throw new RuntimeException("Unable to create or reuse Pubsub topic " + topic + ": ", e);
+    }
+  }
+
+  /**
+   * Check a topic corresponding to short name exists, and throw exception if not. The
+   * topic will not be deleted on cleanup. Return full topic name.
+   */
+  public PubsubClient.TopicPath reuseTopic(String shortTopic) {
+    PubsubClient.TopicPath topic = PubsubClient.topicPathFromName(projectId, shortTopic);
+    if (topicExists(shortTopic)) {
+      NexmarkUtils.console("reusing existing topic %s", topic);
+      return topic;
+    }
+    throw new RuntimeException("topic '" + topic + "' does not already exist");
+  }
+
+  /**
+   * Does topic corresponding to short name exist?
+   */
+  public boolean topicExists(String shortTopic) {
+    PubsubClient.ProjectPath project = PubsubClient.projectPathFromId(projectId);
+    PubsubClient.TopicPath topic = PubsubClient.topicPathFromName(projectId, shortTopic);
+    try {
+      Collection<PubsubClient.TopicPath> existingTopics = pubsubClient.listTopics(project);
+      return existingTopics.contains(topic);
+    } catch (IOException e) {
+      throw new RuntimeException("Unable to check Pubsub topic " + topic + ": ", e);
+    }
+  }
+
+  /**
+   * Create subscription from short name. Delete subscription if it already exists. Ensure the
+   * subscription will be deleted on cleanup. Return full subscription name.
+   */
+  public PubsubClient.SubscriptionPath createSubscription(
+      String shortTopic, String shortSubscription) {
+    PubsubClient.TopicPath topic = PubsubClient.topicPathFromName(projectId, shortTopic);
+    PubsubClient.SubscriptionPath subscription =
+        PubsubClient.subscriptionPathFromName(projectId, shortSubscription);
+    try {
+      if (subscriptionExists(shortTopic, shortSubscription)) {
+        NexmarkUtils.console("attempting to cleanup subscription %s", subscription);
+        pubsubClient.deleteSubscription(subscription);
+      }
+      NexmarkUtils.console("create subscription %s", subscription);
+      pubsubClient.createSubscription(topic, subscription, 60);
+      createdSubscriptions.add(subscription);
+    } catch (IOException e) {
+      throw new RuntimeException("Unable to create Pubsub subscription " + subscription + ": ", e);
+    }
+    return subscription;
+  }
+
+  /**
+   * Check a subscription corresponding to short name exists, and throw exception if not. The
+   * subscription will not be deleted on cleanup. Return full topic name.
+   */
+  public PubsubClient.SubscriptionPath reuseSubscription(
+      String shortTopic, String shortSubscription) {
+    PubsubClient.SubscriptionPath subscription =
+        PubsubClient.subscriptionPathFromName(projectId, shortSubscription);
+    if (subscriptionExists(shortTopic, shortSubscription)) {
+      NexmarkUtils.console("reusing existing subscription %s", subscription);
+      return subscription;
+    }
+    throw new RuntimeException("subscription'" + subscription + "' does not already exist");
+  }
+
+  /**
+   * Does subscription corresponding to short name exist?
+   */
+  public boolean subscriptionExists(String shortTopic, String shortSubscription) {
+    PubsubClient.ProjectPath project = PubsubClient.projectPathFromId(projectId);
+    PubsubClient.TopicPath topic = PubsubClient.topicPathFromName(projectId, shortTopic);
+    PubsubClient.SubscriptionPath subscription =
+        PubsubClient.subscriptionPathFromName(projectId, shortSubscription);
+    try {
+      Collection<PubsubClient.SubscriptionPath> existingSubscriptions =
+          pubsubClient.listSubscriptions(project, topic);
+      return existingSubscriptions.contains(subscription);
+    } catch (IOException e) {
+      throw new RuntimeException("Unable to check Pubsub subscription" + subscription + ": ", e);
+    }
+  }
+
+  /**
+   * Delete all the subscriptions and topics we created.
+   */
+  @Override
+  public void close() {
+    for (PubsubClient.SubscriptionPath subscription : createdSubscriptions) {
+      try {
+        NexmarkUtils.console("delete subscription %s", subscription);
+        pubsubClient.deleteSubscription(subscription);
+      } catch (IOException ex) {
+        NexmarkUtils.console("could not delete subscription %s", subscription);
+      }
+    }
+    for (PubsubClient.TopicPath topic : createdTopics) {
+      try {
+        NexmarkUtils.console("delete topic %s", topic);
+        pubsubClient.deleteTopic(topic);
+      } catch (IOException ex) {
+        NexmarkUtils.console("could not delete topic %s", topic);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/1f08970a/integration/java/src/main/java/org/apache/beam/integration/nexmark/Query0.java
----------------------------------------------------------------------
diff --git a/integration/java/src/main/java/org/apache/beam/integration/nexmark/Query0.java b/integration/java/src/main/java/org/apache/beam/integration/nexmark/Query0.java
new file mode 100644
index 0000000..ea0d7ca
--- /dev/null
+++ b/integration/java/src/main/java/org/apache/beam/integration/nexmark/Query0.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.integration.nexmark;
+
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.transforms.Aggregator;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.Sum.SumLongFn;
+import org.apache.beam.sdk.values.PCollection;
+
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+
+/**
+ * Query 0: Pass events through unchanged. However, force them to do a round trip through
+ * serialization so that we measure the impact of the choice of coders.
+ */
+public class Query0 extends NexmarkQuery {
+  public Query0(NexmarkConfiguration configuration) {
+    super(configuration, "Query0");
+  }
+
+  private PCollection<Event> applyTyped(PCollection<Event> events) {
+    final Coder<Event> coder = events.getCoder();
+
+    return events
+
+        // Force round trip through coder.
+        .apply(
+            ParDo.named(name + ".Serialize")
+                .of(new DoFn<Event, Event>() {
+                  private final Aggregator<Long, Long> bytes =
+                      createAggregator("bytes", new SumLongFn());
+
+                  @Override
+                  public void processElement(ProcessContext c) throws CoderException, IOException {
+                    ByteArrayOutputStream outStream = new ByteArrayOutputStream();
+                    coder.encode(c.element(), outStream, Coder.Context.OUTER);
+                    byte[] byteArray = outStream.toByteArray();
+                    bytes.addValue((long) byteArray.length);
+                    ByteArrayInputStream inStream = new ByteArrayInputStream(byteArray);
+                    Event event = coder.decode(inStream, Coder.Context.OUTER);
+                    c.output(event);
+                  }
+                }));
+  }
+
+  @Override
+  protected PCollection<KnownSize> applyPrim(PCollection<Event> events) {
+    return NexmarkUtils.castToKnownSize(name, applyTyped(events));
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/1f08970a/integration/java/src/main/java/org/apache/beam/integration/nexmark/Query0Model.java
----------------------------------------------------------------------
diff --git a/integration/java/src/main/java/org/apache/beam/integration/nexmark/Query0Model.java b/integration/java/src/main/java/org/apache/beam/integration/nexmark/Query0Model.java
new file mode 100644
index 0000000..f3ceca2
--- /dev/null
+++ b/integration/java/src/main/java/org/apache/beam/integration/nexmark/Query0Model.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.integration.nexmark;
+
+import org.apache.beam.sdk.values.TimestampedValue;
+
+import java.util.Collection;
+import java.util.Iterator;
+
+/**
+ * A direct implementation of {@link Query0}.
+ */
+public class Query0Model extends NexmarkQueryModel {
+  /**
+   * Simulator for query 0.
+   */
+  private class Simulator extends AbstractSimulator<Event, Event> {
+    public Simulator(NexmarkConfiguration configuration) {
+      super(NexmarkUtils.standardEventIterator(configuration));
+    }
+
+    @Override
+    protected void run() {
+      TimestampedValue<Event> timestampedEvent = nextInput();
+      if (timestampedEvent == null) {
+        allDone();
+        return;
+      }
+      addResult(timestampedEvent);
+    }
+  }
+
+  public Query0Model(NexmarkConfiguration configuration) {
+    super(configuration);
+  }
+
+  @Override
+  protected AbstractSimulator<?, ?> simulator() {
+    return new Simulator(configuration);
+  }
+
+  @Override
+  protected <T> Collection<String> toCollection(Iterator<TimestampedValue<T>> itr) {
+    return toValueTimestampOrder(itr);
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/1f08970a/integration/java/src/main/java/org/apache/beam/integration/nexmark/Query1.java
----------------------------------------------------------------------
diff --git a/integration/java/src/main/java/org/apache/beam/integration/nexmark/Query1.java b/integration/java/src/main/java/org/apache/beam/integration/nexmark/Query1.java
new file mode 100644
index 0000000..7e60b9c
--- /dev/null
+++ b/integration/java/src/main/java/org/apache/beam/integration/nexmark/Query1.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.integration.nexmark;
+
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+
+/**
+ * Query 1, 'Currency Conversion'. Convert each bid value from dollars to euros.
+ * In CQL syntax:
+ *
+ * <pre>
+ * SELECT Istream(auction, DOLTOEUR(price), bidder, datetime)
+ * FROM bid [ROWS UNBOUNDED];
+ * </pre>
+ *
+ * <p>To make things more interesting, allow the 'currency conversion' to be arbitrarily
+ * slowed down.
+ */
+class Query1 extends NexmarkQuery {
+  public Query1(NexmarkConfiguration configuration) {
+    super(configuration, "Query1");
+  }
+
+  private PCollection<Bid> applyTyped(PCollection<Event> events) {
+    return events
+        // Only want the bid events.
+        .apply(JUST_BIDS)
+
+        // Map the conversion function over all bids.
+        .apply(
+            ParDo.named(name + ".ToEuros")
+                .of(new DoFn<Bid, Bid>() {
+                  @Override
+                  public void processElement(ProcessContext c) {
+                    Bid bid = c.element();
+                    c.output(new Bid(
+                        bid.auction, bid.bidder, (bid.price * 89) / 100, bid.dateTime, bid.extra));
+                  }
+                }));
+  }
+
+  @Override
+  protected PCollection<KnownSize> applyPrim(PCollection<Event> events) {
+    return NexmarkUtils.castToKnownSize(name, applyTyped(events));
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/1f08970a/integration/java/src/main/java/org/apache/beam/integration/nexmark/Query10.java
----------------------------------------------------------------------
diff --git a/integration/java/src/main/java/org/apache/beam/integration/nexmark/Query10.java b/integration/java/src/main/java/org/apache/beam/integration/nexmark/Query10.java
new file mode 100644
index 0000000..74fb28c
--- /dev/null
+++ b/integration/java/src/main/java/org/apache/beam/integration/nexmark/Query10.java
@@ -0,0 +1,378 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.integration.nexmark;
+
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.options.GcsOptions;
+import org.apache.beam.sdk.transforms.Aggregator;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.DoFnWithContext;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.Sum.SumLongFn;
+import org.apache.beam.sdk.transforms.windowing.AfterEach;
+import org.apache.beam.sdk.transforms.windowing.AfterFirst;
+import org.apache.beam.sdk.transforms.windowing.AfterPane;
+import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
+import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing;
+import org.apache.beam.sdk.transforms.windowing.Repeatedly;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.util.GcsIOChannelFactory;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+
+import com.google.cloud.hadoop.gcsio.GoogleCloudStorageWriteChannel;
+import com.google.common.base.Preconditions;
+
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.Serializable;
+import java.nio.channels.Channels;
+import java.nio.channels.WritableByteChannel;
+import java.util.concurrent.ThreadLocalRandom;
+import javax.annotation.Nullable;
+
+/**
+ * Query "10", 'Log to sharded files' (Not in original suite.)
+ *
+ * <p>Every windowSizeSec, save all events from the last period into 2*maxWorkers log files.
+ */
+class Query10 extends NexmarkQuery {
+  private static final int CHANNEL_BUFFER = 8 << 20; // 8MB
+  private static final int NUM_SHARDS_PER_WORKER = 5;
+  private static final Duration LATE_BATCHING_PERIOD = Duration.standardSeconds(10);
+
+  /**
+   * Capture everything we need to know about the records in a single output file.
+   */
+  private static class OutputFile implements Serializable {
+    /** Maximum possible timestamp of records in file. */
+    private final Instant maxTimestamp;
+    /** Shard within window. */
+    private final String shard;
+    /** Index of file in all files in shard. */
+    private final long index;
+    /** Timing of records in this file. */
+    private final PaneInfo.Timing timing;
+    /** Path to file containing records, or {@literal null} if no output required. */
+    @Nullable
+    private final String filename;
+
+    public OutputFile(
+        Instant maxTimestamp,
+        String shard,
+        long index,
+        PaneInfo.Timing timing,
+        @Nullable String filename) {
+      this.maxTimestamp = maxTimestamp;
+      this.shard = shard;
+      this.index = index;
+      this.timing = timing;
+      this.filename = filename;
+    }
+
+    @Override
+    public String toString() {
+      return String.format("%s %s %d %s %s\n", maxTimestamp, shard, index, timing, filename);
+    }
+  }
+
+  /**
+   * GCS uri prefix for all log and 'finished' files. If null they won't be written.
+   */
+  @Nullable
+  private String outputPath;
+
+  /**
+   * Maximum number of workers, used to determine log sharding factor.
+   */
+  private int maxNumWorkers;
+
+  public Query10(NexmarkConfiguration configuration) {
+    super(configuration, "Query10");
+  }
+
+  public void setOutputPath(@Nullable String outputPath) {
+    this.outputPath = outputPath;
+  }
+
+  public void setMaxNumWorkers(int maxNumWorkers) {
+    this.maxNumWorkers = maxNumWorkers;
+  }
+
+  /**
+   * Return channel for writing bytes to GCS.
+   *
+   * @throws IOException
+   */
+  private WritableByteChannel openWritableGcsFile(GcsOptions options, String filename)
+      throws IOException {
+    WritableByteChannel channel = new GcsIOChannelFactory(options).create(filename, "text/plain");
+    Preconditions.checkState(channel instanceof GoogleCloudStorageWriteChannel);
+    ((GoogleCloudStorageWriteChannel) channel).setUploadBufferSize(CHANNEL_BUFFER);
+    return channel;
+  }
+
+  /** Return a short string to describe {@code timing}. */
+  private String timingToString(PaneInfo.Timing timing) {
+    switch (timing) {
+      case EARLY:
+        return "E";
+      case ON_TIME:
+        return "O";
+      case LATE:
+        return "L";
+    }
+    throw new RuntimeException(); // cases are exhaustive
+  }
+
+  /** Construct an {@link OutputFile} for {@code pane} in {@code window} for {@code shard}. */
+  private OutputFile outputFileFor(BoundedWindow window, String shard, PaneInfo pane) {
+    @Nullable String filename =
+        outputPath == null
+        ? null
+        : String.format("%s/LOG-%s-%s-%03d-%s-%x",
+            outputPath, window.maxTimestamp(), shard, pane.getIndex(),
+            timingToString(pane.getTiming()),
+            ThreadLocalRandom.current().nextLong());
+    return new OutputFile(window.maxTimestamp(), shard, pane.getIndex(),
+        pane.getTiming(), filename);
+  }
+
+  /**
+   * Return path to which we should write the index for {@code window}, or {@literal null}
+   * if no output required.
+   */
+  @Nullable
+  private String indexPathFor(BoundedWindow window) {
+    if (outputPath == null) {
+      return null;
+    }
+    return String.format("%s/INDEX-%s", outputPath, window.maxTimestamp());
+  }
+
+  private PCollection<Done> applyTyped(PCollection<Event> events) {
+    final int numLogShards = maxNumWorkers * NUM_SHARDS_PER_WORKER;
+
+    return events
+        .apply(ParDo.named(name + ".ShardEvents")
+                    .of(new DoFn<Event, KV<String, Event>>() {
+                      final Aggregator<Long, Long> lateCounter =
+                          createAggregator("actuallyLateEvent", new SumLongFn());
+                      final Aggregator<Long, Long> onTimeCounter =
+                          createAggregator("actuallyOnTimeEvent", new SumLongFn());
+
+                      @Override
+                      public void processElement(ProcessContext c) {
+                        if (c.element().hasAnnotation("LATE")) {
+                          lateCounter.addValue(1L);
+                          NexmarkUtils.error("Observed late: %s", c.element());
+                        } else {
+                          onTimeCounter.addValue(1L);
+                        }
+                        int shardNum = (int) Math.abs((long) c.element().hashCode() % numLogShards);
+                        String shard = String.format("shard-%05d-of-%05d", shardNum, numLogShards);
+                        c.output(KV.of(shard, c.element()));
+                      }
+                    }))
+        .apply(Window.<KV<String, Event>>into(
+            FixedWindows.of(Duration.standardSeconds(configuration.windowSizeSec)))
+            .named(name + ".WindowEvents")
+            .triggering(AfterEach.inOrder(
+                Repeatedly
+                    .forever(AfterPane.elementCountAtLeast(configuration.maxLogEvents))
+                    .orFinally(AfterWatermark.pastEndOfWindow()),
+                Repeatedly.forever(
+                    AfterFirst.of(AfterPane.elementCountAtLeast(configuration.maxLogEvents),
+                        AfterProcessingTime.pastFirstElementInPane()
+                                           .plusDelayOf(LATE_BATCHING_PERIOD)))))
+            .discardingFiredPanes()
+            // Use a 1 day allowed lateness so that any forgotten hold will stall the
+            // pipeline for that period and be very noticeable.
+            .withAllowedLateness(Duration.standardDays(1)))
+        .apply(GroupByKey.<String, Event>create())
+        .apply(
+            ParDo.named(name + ".CheckForLateEvents")
+                 .of(new DoFnWithContext<KV<String, Iterable<Event>>,
+                     KV<String, Iterable<Event>>>() {
+                   final Aggregator<Long, Long> earlyCounter =
+                       createAggregator("earlyShard", new SumLongFn());
+                   final Aggregator<Long, Long> onTimeCounter =
+                       createAggregator("onTimeShard", new SumLongFn());
+                   final Aggregator<Long, Long> lateCounter =
+                       createAggregator("lateShard", new SumLongFn());
+                   final Aggregator<Long, Long> unexpectedLatePaneCounter =
+                       createAggregator("ERROR_unexpectedLatePane", new SumLongFn());
+                   final Aggregator<Long, Long> unexpectedOnTimeElementCounter =
+                       createAggregator("ERROR_unexpectedOnTimeElement", new SumLongFn());
+
+                   @ProcessElement
+                   public void processElement(ProcessContext c, BoundedWindow window) {
+                     int numLate = 0;
+                     int numOnTime = 0;
+                     for (Event event : c.element().getValue()) {
+                       if (event.hasAnnotation("LATE")) {
+                         numLate++;
+                       } else {
+                         numOnTime++;
+                       }
+                     }
+                     String shard = c.element().getKey();
+                     NexmarkUtils.error(
+                         "%s with timestamp %s has %d actually late and %d on-time "
+                             + "elements in pane %s for window %s",
+                         shard, c.timestamp(), numLate, numOnTime, c.pane(),
+                         window.maxTimestamp());
+                     if (c.pane().getTiming() == PaneInfo.Timing.LATE) {
+                       if (numLate == 0) {
+                         NexmarkUtils.error(
+                             "ERROR! No late events in late pane for %s", shard);
+                         unexpectedLatePaneCounter.addValue(1L);
+                       }
+                       if (numOnTime > 0) {
+                         NexmarkUtils.error(
+                             "ERROR! Have %d on-time events in late pane for %s",
+                             numOnTime, shard);
+                         unexpectedOnTimeElementCounter.addValue(1L);
+                       }
+                       lateCounter.addValue(1L);
+                     } else if (c.pane().getTiming() == PaneInfo.Timing.EARLY) {
+                       if (numOnTime + numLate < configuration.maxLogEvents) {
+                         NexmarkUtils.error(
+                             "ERROR! Only have %d events in early pane for %s",
+                             numOnTime + numLate, shard);
+                       }
+                       earlyCounter.addValue(1L);
+                     } else {
+                       onTimeCounter.addValue(1L);
+                     }
+                     c.output(c.element());
+                   }
+                 }))
+        .apply(
+            ParDo.named(name + ".UploadEvents")
+                 .of(new DoFnWithContext<KV<String, Iterable<Event>>,
+                     KV<Void, OutputFile>>() {
+                   final Aggregator<Long, Long> savedFileCounter =
+                       createAggregator("savedFile", new SumLongFn());
+                   final Aggregator<Long, Long> writtenRecordsCounter =
+                       createAggregator("writtenRecords", new SumLongFn());
+
+                   @ProcessElement
+                   public void process(ProcessContext c, BoundedWindow window) throws IOException {
+                     String shard = c.element().getKey();
+                     GcsOptions options = c.getPipelineOptions().as(GcsOptions.class);
+                     OutputFile outputFile = outputFileFor(window, shard, c.pane());
+                     NexmarkUtils.error(
+                         "Writing %s with record timestamp %s, window timestamp %s, pane %s",
+                         shard, c.timestamp(), window.maxTimestamp(), c.pane());
+                     if (outputFile.filename != null) {
+                       NexmarkUtils.error("Beginning write to '%s'", outputFile.filename);
+                       int n = 0;
+                       try (OutputStream output =
+                                Channels.newOutputStream(openWritableGcsFile(options, outputFile
+                                    .filename))) {
+                         for (Event event : c.element().getValue()) {
+                           Event.CODER.encode(event, output, Coder.Context.OUTER);
+                           writtenRecordsCounter.addValue(1L);
+                           if (++n % 10000 == 0) {
+                             NexmarkUtils.error("So far written %d records to '%s'", n,
+                                 outputFile.filename);
+                           }
+                         }
+                       }
+                       NexmarkUtils.error("Written all %d records to '%s'", n, outputFile.filename);
+                     }
+                     savedFileCounter.addValue(1L);
+                     c.output(KV.<Void, OutputFile>of(null, outputFile));
+                   }
+                 }))
+        // Clear fancy triggering from above.
+        .apply(Window.<KV<Void, OutputFile>>into(
+            FixedWindows.of(Duration.standardSeconds(configuration.windowSizeSec)))
+            .named(name + ".WindowLogFiles")
+            .triggering(AfterWatermark.pastEndOfWindow())
+            // We expect no late data here, but we'll assume the worst so we can detect any.
+            .withAllowedLateness(Duration.standardDays(1))
+            .discardingFiredPanes())
+        .apply(GroupByKey.<Void, OutputFile>create())
+        .apply(
+            ParDo.named(name + ".Index")
+                 .of(new DoFnWithContext<KV<Void, Iterable<OutputFile>>, Done>() {
+                   final Aggregator<Long, Long> unexpectedLateCounter =
+                       createAggregator("ERROR_unexpectedLate", new SumLongFn());
+                   final Aggregator<Long, Long> unexpectedEarlyCounter =
+                       createAggregator("ERROR_unexpectedEarly", new SumLongFn());
+                   final Aggregator<Long, Long> unexpectedIndexCounter =
+                       createAggregator("ERROR_unexpectedIndex", new SumLongFn());
+                   final Aggregator<Long, Long> finalizedCounter =
+                       createAggregator("indexed", new SumLongFn());
+
+                   @ProcessElement
+                   public void process(ProcessContext c, BoundedWindow window) throws IOException {
+                     if (c.pane().getTiming() == Timing.LATE) {
+                       unexpectedLateCounter.addValue(1L);
+                       NexmarkUtils.error("ERROR! Unexpected LATE pane: %s", c.pane());
+                     } else if (c.pane().getTiming() == Timing.EARLY) {
+                       unexpectedEarlyCounter.addValue(1L);
+                       NexmarkUtils.error("ERROR! Unexpected EARLY pane: %s", c.pane());
+                     } else if (c.pane().getTiming() == Timing.ON_TIME
+                         && c.pane().getIndex() != 0) {
+                       unexpectedIndexCounter.addValue(1L);
+                       NexmarkUtils.error("ERROR! Unexpected ON_TIME pane index: %s", c.pane());
+                     } else {
+                       GcsOptions options = c.getPipelineOptions().as(GcsOptions.class);
+                       NexmarkUtils.error(
+                           "Index with record timestamp %s, window timestamp %s, pane %s",
+                           c.timestamp(), window.maxTimestamp(), c.pane());
+
+                       @Nullable String filename = indexPathFor(window);
+                       if (filename != null) {
+                         NexmarkUtils.error("Beginning write to '%s'", filename);
+                         int n = 0;
+                         try (OutputStream output =
+                                  Channels.newOutputStream(
+                                      openWritableGcsFile(options, filename))) {
+                           for (OutputFile outputFile : c.element().getValue()) {
+                             output.write(outputFile.toString().getBytes());
+                             n++;
+                           }
+                         }
+                         NexmarkUtils.error("Written all %d lines to '%s'", n, filename);
+                       }
+                       c.output(
+                           new Done("written for timestamp " + window.maxTimestamp()));
+                       finalizedCounter.addValue(1L);
+                     }
+                   }
+                 }));
+  }
+
+  @Override
+  protected PCollection<KnownSize> applyPrim(PCollection<Event> events) {
+    return NexmarkUtils.castToKnownSize(name, applyTyped(events));
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/1f08970a/integration/java/src/main/java/org/apache/beam/integration/nexmark/Query11.java
----------------------------------------------------------------------
diff --git a/integration/java/src/main/java/org/apache/beam/integration/nexmark/Query11.java b/integration/java/src/main/java/org/apache/beam/integration/nexmark/Query11.java
new file mode 100644
index 0000000..9841421
--- /dev/null
+++ b/integration/java/src/main/java/org/apache/beam/integration/nexmark/Query11.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.integration.nexmark;
+
+import org.apache.beam.sdk.transforms.Count;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.windowing.AfterPane;
+import org.apache.beam.sdk.transforms.windowing.Repeatedly;
+import org.apache.beam.sdk.transforms.windowing.Sessions;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+
+import org.joda.time.Duration;
+
+/**
+ * Query "11", 'User sessions' (Not in original suite.)
+ *
+ * <p>Group bids by the same user into sessions with {@code windowSizeSec} max gap.
+ * However limit the session to at most {@code maxLogEvents}. Emit the number of
+ * bids per session.
+ */
+class Query11 extends NexmarkQuery {
+  public Query11(NexmarkConfiguration configuration) {
+    super(configuration, "Query11");
+  }
+
+  private PCollection<BidsPerSession> applyTyped(PCollection<Event> events) {
+    return events.apply(JUST_BIDS)
+        .apply(
+            ParDo.named(name + ".Rekey")
+                .of(new DoFn<Bid, KV<Long, Void>>() {
+                  @Override
+                  public void processElement(ProcessContext c) {
+                    Bid bid = c.element();
+                    c.output(KV.of(bid.bidder, (Void) null));
+                  }
+                }))
+        .apply(Window.<KV<Long, Void>>into(
+            Sessions.withGapDuration(Duration.standardSeconds(configuration.windowSizeSec)))
+        .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(configuration.maxLogEvents)))
+        .discardingFiredPanes()
+        .withAllowedLateness(Duration.standardSeconds(configuration.occasionalDelaySec / 2)))
+        .apply(Count.<Long, Void>perKey())
+        .apply(
+            ParDo.named(name + ".ToResult")
+                .of(new DoFn<KV<Long, Long>, BidsPerSession>() {
+                  @Override
+                  public void processElement(ProcessContext c) {
+                    c.output(new BidsPerSession(c.element().getKey(), c.element().getValue()));
+                  }
+                }));
+  }
+
+  @Override
+  protected PCollection<KnownSize> applyPrim(PCollection<Event> events) {
+    return NexmarkUtils.castToKnownSize(name, applyTyped(events));
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/1f08970a/integration/java/src/main/java/org/apache/beam/integration/nexmark/Query12.java
----------------------------------------------------------------------
diff --git a/integration/java/src/main/java/org/apache/beam/integration/nexmark/Query12.java b/integration/java/src/main/java/org/apache/beam/integration/nexmark/Query12.java
new file mode 100644
index 0000000..dd39971
--- /dev/null
+++ b/integration/java/src/main/java/org/apache/beam/integration/nexmark/Query12.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.integration.nexmark;
+
+import org.apache.beam.sdk.transforms.Count;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
+import org.apache.beam.sdk.transforms.windowing.Repeatedly;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.joda.time.Duration;
+
+/**
+ * Query "12", 'Processing time windows' (Not in original suite.)
+ * <p>
+ * <p>Group bids by the same user into processing time windows of windowSize. Emit the count
+ * of bids per window.
+ */
+class Query12 extends NexmarkQuery {
+  public Query12(NexmarkConfiguration configuration) {
+    super(configuration, "Query12");
+  }
+
+  private PCollection<BidsPerSession> applyTyped(PCollection<Event> events) {
+    return events
+        .apply(JUST_BIDS)
+        .apply(
+            ParDo.named(name + ".Rekey")
+                 .of(new DoFn<Bid, KV<Long, Void>>() {
+                   @Override
+                   public void processElement(ProcessContext c) {
+                     Bid bid = c.element();
+                     c.output(KV.of(bid.bidder, (Void) null));
+                   }
+                 }))
+        .apply(Window.<KV<Long, Void>>into(new GlobalWindows())
+            .triggering(
+                Repeatedly.forever(
+                    AfterProcessingTime.pastFirstElementInPane()
+                                       .plusDelayOf(
+                                           Duration.standardSeconds(configuration.windowSizeSec))))
+            .discardingFiredPanes()
+            .withAllowedLateness(Duration.ZERO))
+        .apply(Count.<Long, Void>perKey())
+        .apply(
+            ParDo.named(name + ".ToResult")
+                 .of(new DoFn<KV<Long, Long>, BidsPerSession>() {
+                   @Override
+                   public void processElement(ProcessContext c) {
+                     c.output(
+                         new BidsPerSession(c.element().getKey(), c.element().getValue()));
+                   }
+                 }));
+  }
+
+  @Override
+  protected PCollection<KnownSize> applyPrim(PCollection<Event> events) {
+    return NexmarkUtils.castToKnownSize(name, applyTyped(events));
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/1f08970a/integration/java/src/main/java/org/apache/beam/integration/nexmark/Query1Model.java
----------------------------------------------------------------------
diff --git a/integration/java/src/main/java/org/apache/beam/integration/nexmark/Query1Model.java b/integration/java/src/main/java/org/apache/beam/integration/nexmark/Query1Model.java
new file mode 100644
index 0000000..462d426
--- /dev/null
+++ b/integration/java/src/main/java/org/apache/beam/integration/nexmark/Query1Model.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.integration.nexmark;
+
+import org.apache.beam.sdk.values.TimestampedValue;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.Iterator;
+
+/**
+ * A direct implementation of {@link Query1}.
+ */
+public class Query1Model extends NexmarkQueryModel implements Serializable {
+  /**
+   * Simulator for query 1.
+   */
+  private class Simulator extends AbstractSimulator<Event, Bid> {
+    public Simulator(NexmarkConfiguration configuration) {
+      super(NexmarkUtils.standardEventIterator(configuration));
+    }
+
+    @Override
+    protected void run() {
+      TimestampedValue<Event> timestampedEvent = nextInput();
+      if (timestampedEvent == null) {
+        allDone();
+        return;
+      }
+      Event event = timestampedEvent.getValue();
+      if (event.bid == null) {
+        // Ignore non-bid events.
+        return;
+      }
+      Bid bid = event.bid;
+      Bid resultBid =
+          new Bid(bid.auction, bid.bidder, bid.price * 89 / 100, bid.dateTime, bid.extra);
+      TimestampedValue<Bid> result =
+          TimestampedValue.of(resultBid, timestampedEvent.getTimestamp());
+      addResult(result);
+    }
+  }
+
+  public Query1Model(NexmarkConfiguration configuration) {
+    super(configuration);
+  }
+
+  @Override
+  public AbstractSimulator<?, ?> simulator() {
+    return new Simulator(configuration);
+  }
+
+  @Override
+  protected <T> Collection<String> toCollection(Iterator<TimestampedValue<T>> itr) {
+    return toValueTimestampOrder(itr);
+  }
+}