You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ie...@apache.org on 2017/08/23 17:09:20 UTC

[12/55] [abbrv] beam git commit: Refactor classes into packages

http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/UnboundedEventSource.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/UnboundedEventSource.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/UnboundedEventSource.java
deleted file mode 100644
index 9573ef7..0000000
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/UnboundedEventSource.java
+++ /dev/null
@@ -1,328 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.integration.nexmark;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.NoSuchElementException;
-import java.util.PriorityQueue;
-import java.util.Queue;
-import java.util.concurrent.ThreadLocalRandom;
-import javax.annotation.Nullable;
-
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.io.UnboundedSource;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.values.TimestampedValue;
-import org.joda.time.Duration;
-import org.joda.time.Instant;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * A custom, unbounded source of event records.
- *
- * <p>If {@code isRateLimited} is true, events become available for return from the reader such
- * that the overall rate respect the {@code interEventDelayUs} period if possible. Otherwise,
- * events are returned every time the system asks for one.
- */
-class UnboundedEventSource extends UnboundedSource<Event, Generator.Checkpoint> {
-  private static final Duration BACKLOG_PERIOD = Duration.standardSeconds(30);
-  private static final Logger LOG = LoggerFactory.getLogger(UnboundedEventSource.class);
-
-  /** Configuration for generator to use when reading synthetic events. May be split. */
-  private final GeneratorConfig config;
-
-  /** How many unbounded sources to create. */
-  private final int numEventGenerators;
-
-  /** How many seconds to hold back the watermark. */
-  private final long watermarkHoldbackSec;
-
-  /** Are we rate limiting the events? */
-  private final boolean isRateLimited;
-
-  public UnboundedEventSource(GeneratorConfig config, int numEventGenerators,
-      long watermarkHoldbackSec, boolean isRateLimited) {
-    this.config = config;
-    this.numEventGenerators = numEventGenerators;
-    this.watermarkHoldbackSec = watermarkHoldbackSec;
-    this.isRateLimited = isRateLimited;
-  }
-
-  /** A reader to pull events from the generator. */
-  private class EventReader extends UnboundedReader<Event> {
-    /** Generator we are reading from. */
-    private final Generator generator;
-
-    /**
-     * Current watermark (ms since epoch). Initially set to beginning of time.
-     * Then updated to be the time of the next generated event.
-     * Then, once all events have been generated, set to the end of time.
-     */
-    private long watermark;
-
-    /**
-     * Current backlog (ms), as delay between timestamp of last returned event and the timestamp
-     * we should be up to according to wall-clock time. Used only for logging.
-     */
-    private long backlogDurationMs;
-
-    /**
-     * Current backlog, as estimated number of event bytes we are behind, or null if
-     * unknown. Reported to callers.
-     */
-    @Nullable
-    private Long backlogBytes;
-
-    /**
-     * Wallclock time (ms since epoch) we last reported the backlog, or -1 if never reported.
-     */
-    private long lastReportedBacklogWallclock;
-
-    /**
-     * Event time (ms since epoch) of pending event at last reported backlog, or -1 if never
-     * calculated.
-     */
-    private long timestampAtLastReportedBacklogMs;
-
-    /** Next event to make 'current' when wallclock time has advanced sufficiently. */
-    @Nullable
-    private TimestampedValue<Event> pendingEvent;
-
-    /** Wallclock time when {@link #pendingEvent} is due, or -1 if no pending event. */
-    private long pendingEventWallclockTime;
-
-    /** Current event to return from getCurrent. */
-    @Nullable
-    private TimestampedValue<Event> currentEvent;
-
-    /** Events which have been held back so as to force them to be late. */
-    private Queue<Generator.NextEvent> heldBackEvents = new PriorityQueue<>();
-
-    public EventReader(Generator generator) {
-      this.generator = generator;
-      watermark = NexmarkUtils.BEGINNING_OF_TIME.getMillis();
-      lastReportedBacklogWallclock = -1;
-      pendingEventWallclockTime = -1;
-      timestampAtLastReportedBacklogMs = -1;
-    }
-
-    public EventReader(GeneratorConfig config) {
-      this(new Generator(config));
-    }
-
-    @Override
-    public boolean start() {
-      LOG.trace("starting unbounded generator {}", generator);
-      return advance();
-    }
-
-
-    @Override
-    public boolean advance() {
-      long now = System.currentTimeMillis();
-
-      while (pendingEvent == null) {
-        if (!generator.hasNext() && heldBackEvents.isEmpty()) {
-          // No more events, EVER.
-          if (isRateLimited) {
-            updateBacklog(System.currentTimeMillis(), 0);
-          }
-          if (watermark < BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()) {
-            watermark = BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis();
-            LOG.trace("stopped unbounded generator {}", generator);
-          }
-          return false;
-        }
-
-        Generator.NextEvent next = heldBackEvents.peek();
-        if (next != null && next.wallclockTimestamp <= now) {
-          // Time to use the held-back event.
-          heldBackEvents.poll();
-          LOG.debug("replaying held-back event {}ms behind watermark",
-                             watermark - next.eventTimestamp);
-        } else if (generator.hasNext()) {
-          next = generator.nextEvent();
-          if (isRateLimited && config.configuration.probDelayedEvent > 0.0
-              && config.configuration.occasionalDelaySec > 0
-              && ThreadLocalRandom.current().nextDouble() < config.configuration.probDelayedEvent) {
-            // We'll hold back this event and go around again.
-            long delayMs =
-                ThreadLocalRandom.current().nextLong(config.configuration.occasionalDelaySec * 1000)
-                + 1L;
-            LOG.debug("delaying event by {}ms", delayMs);
-            heldBackEvents.add(next.withDelay(delayMs));
-            continue;
-          }
-        } else {
-          // Waiting for held-back event to fire.
-          if (isRateLimited) {
-            updateBacklog(now, 0);
-          }
-          return false;
-        }
-
-        pendingEventWallclockTime = next.wallclockTimestamp;
-        pendingEvent = TimestampedValue.of(next.event, new Instant(next.eventTimestamp));
-        long newWatermark =
-            next.watermark - Duration.standardSeconds(watermarkHoldbackSec).getMillis();
-        if (newWatermark > watermark) {
-          watermark = newWatermark;
-        }
-      }
-
-      if (isRateLimited) {
-        if (pendingEventWallclockTime > now) {
-          // We want this event to fire in the future. Try again later.
-          updateBacklog(now, 0);
-          return false;
-        }
-        updateBacklog(now, now - pendingEventWallclockTime);
-      }
-
-      // This event is ready to fire.
-      currentEvent = pendingEvent;
-      pendingEvent = null;
-      return true;
-    }
-
-    private void updateBacklog(long now, long newBacklogDurationMs) {
-      backlogDurationMs = newBacklogDurationMs;
-      long interEventDelayUs = generator.currentInterEventDelayUs();
-      if (interEventDelayUs != 0) {
-        long backlogEvents = (backlogDurationMs * 1000 + interEventDelayUs - 1) / interEventDelayUs;
-        backlogBytes = generator.getCurrentConfig().estimatedBytesForEvents(backlogEvents);
-      }
-      if (lastReportedBacklogWallclock < 0
-          || now - lastReportedBacklogWallclock > BACKLOG_PERIOD.getMillis()) {
-        double timeDialation = Double.NaN;
-        if (pendingEvent != null
-            && lastReportedBacklogWallclock >= 0
-            && timestampAtLastReportedBacklogMs >= 0) {
-          long wallclockProgressionMs = now - lastReportedBacklogWallclock;
-          long eventTimeProgressionMs =
-              pendingEvent.getTimestamp().getMillis() - timestampAtLastReportedBacklogMs;
-          timeDialation = (double) eventTimeProgressionMs / (double) wallclockProgressionMs;
-        }
-        LOG.debug(
-            "unbounded generator backlog now {}ms ({} bytes) at {}us interEventDelay "
-            + "with {} time dilation",
-            backlogDurationMs, backlogBytes, interEventDelayUs, timeDialation);
-        lastReportedBacklogWallclock = now;
-        if (pendingEvent != null) {
-          timestampAtLastReportedBacklogMs = pendingEvent.getTimestamp().getMillis();
-        }
-      }
-    }
-
-    @Override
-    public Event getCurrent() {
-      if (currentEvent == null) {
-        throw new NoSuchElementException();
-      }
-      return currentEvent.getValue();
-    }
-
-    @Override
-    public Instant getCurrentTimestamp() {
-      if (currentEvent == null) {
-        throw new NoSuchElementException();
-      }
-      return currentEvent.getTimestamp();
-    }
-
-    @Override
-    public void close() {
-      // Nothing to close.
-    }
-
-    @Override
-    public UnboundedEventSource getCurrentSource() {
-      return UnboundedEventSource.this;
-    }
-
-    @Override
-    public Instant getWatermark() {
-      return new Instant(watermark);
-    }
-
-    @Override
-    public Generator.Checkpoint getCheckpointMark() {
-      return generator.toCheckpoint();
-    }
-
-    @Override
-    public long getSplitBacklogBytes() {
-      return backlogBytes == null ? BACKLOG_UNKNOWN : backlogBytes;
-    }
-
-    @Override
-    public String toString() {
-      return String.format("EventReader(%d, %d, %d)",
-          generator.getCurrentConfig().getStartEventId(), generator.getNextEventId(),
-          generator.getCurrentConfig().getStopEventId());
-    }
-  }
-
-  @Override
-  public Coder<Generator.Checkpoint> getCheckpointMarkCoder() {
-    return Generator.Checkpoint.CODER_INSTANCE;
-  }
-
-  @Override
-  public List<UnboundedEventSource> generateInitialSplits(
-      int desiredNumSplits, PipelineOptions options) {
-    LOG.trace("splitting unbounded source into {} sub-sources", numEventGenerators);
-    List<UnboundedEventSource> results = new ArrayList<>();
-    // Ignore desiredNumSplits and use numEventGenerators instead.
-    for (GeneratorConfig subConfig : config.split(numEventGenerators)) {
-      results.add(new UnboundedEventSource(subConfig, 1, watermarkHoldbackSec, isRateLimited));
-    }
-    return results;
-  }
-
-  @Override
-  public EventReader createReader(
-      PipelineOptions options, @Nullable Generator.Checkpoint checkpoint) {
-    if (checkpoint == null) {
-      LOG.trace("creating initial unbounded reader for {}", config);
-      return new EventReader(config);
-    } else {
-      LOG.trace("resuming unbounded reader from {}", checkpoint);
-      return new EventReader(checkpoint.toGenerator(config));
-    }
-  }
-
-  @Override
-  public void validate() {
-    // Nothing to validate.
-  }
-
-  @Override
-  public Coder<Event> getDefaultOutputCoder() {
-    return Event.CODER;
-  }
-
-  @Override
-  public String toString() {
-    return String.format(
-        "UnboundedEventSource(%d, %d)", config.getStartEventId(), config.getStopEventId());
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/WinningBids.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/WinningBids.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/WinningBids.java
index 594195a..9f1ddf8 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/WinningBids.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/WinningBids.java
@@ -20,7 +20,6 @@ package org.apache.beam.integration.nexmark;
 import static com.google.common.base.Preconditions.checkState;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
-
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
@@ -31,7 +30,11 @@ import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
-
+import org.apache.beam.integration.nexmark.model.Auction;
+import org.apache.beam.integration.nexmark.model.AuctionBid;
+import org.apache.beam.integration.nexmark.model.Bid;
+import org.apache.beam.integration.nexmark.model.Event;
+import org.apache.beam.integration.nexmark.sources.GeneratorConfig;
 import org.apache.beam.sdk.coders.AtomicCoder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
@@ -45,10 +48,10 @@ import org.apache.beam.sdk.transforms.Sum;
 import org.apache.beam.sdk.transforms.join.CoGbkResult;
 import org.apache.beam.sdk.transforms.join.CoGroupByKey;
 import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
 import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.transforms.windowing.WindowMappingFn;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.joda.time.Instant;
@@ -249,7 +252,7 @@ public class WinningBids extends PTransform<PCollection<Event>, PCollection<Auct
     }
 
     @Override
-    public AuctionOrBidWindow getSideInputWindow(BoundedWindow window) {
+    public WindowMappingFn<AuctionOrBidWindow> getDefaultWindowMappingFn() {
       throw new UnsupportedOperationException("AuctionWindowFn not supported for side inputs");
     }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/WinningBidsSimulator.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/WinningBidsSimulator.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/WinningBidsSimulator.java
index dc8094b..e7f51b7 100644
--- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/WinningBidsSimulator.java
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/WinningBidsSimulator.java
@@ -26,6 +26,10 @@ import java.util.TreeMap;
 import java.util.TreeSet;
 import javax.annotation.Nullable;
 
+import org.apache.beam.integration.nexmark.model.Auction;
+import org.apache.beam.integration.nexmark.model.AuctionBid;
+import org.apache.beam.integration.nexmark.model.Bid;
+import org.apache.beam.integration.nexmark.model.Event;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.values.TimestampedValue;
 import org.joda.time.Instant;

http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/drivers/NexmarkApexDriver.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/drivers/NexmarkApexDriver.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/drivers/NexmarkApexDriver.java
new file mode 100644
index 0000000..265ccf7
--- /dev/null
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/drivers/NexmarkApexDriver.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.integration.nexmark.drivers;
+
+import org.apache.beam.integration.nexmark.NexmarkDriver;
+import org.apache.beam.integration.nexmark.NexmarkOptions;
+import org.apache.beam.runners.apex.ApexPipelineOptions;
+import org.apache.beam.runners.apex.ApexRunner;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+
+/**
+ * Run NexMark queries using the Apex runner.
+ */
+public class NexmarkApexDriver extends NexmarkDriver<NexmarkApexDriver.NexmarkApexOptions> {
+  /**
+   * Command line flags.
+   */
+  public interface NexmarkApexOptions extends NexmarkOptions, ApexPipelineOptions {
+  }
+
+  /**
+   * Entry point.
+   */
+  public static void main(String[] args) {
+    // Gather command line args, baseline, configurations, etc.
+    NexmarkApexOptions options = PipelineOptionsFactory.fromArgs(args)
+                                                        .withValidation()
+                                                        .as(NexmarkApexOptions.class);
+    options.setRunner(ApexRunner.class);
+    NexmarkApexRunner runner = new NexmarkApexRunner(options);
+    new NexmarkApexDriver().runAll(options, runner);
+  }
+}
+
+

http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/drivers/NexmarkApexRunner.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/drivers/NexmarkApexRunner.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/drivers/NexmarkApexRunner.java
new file mode 100644
index 0000000..2bcf82d
--- /dev/null
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/drivers/NexmarkApexRunner.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.integration.nexmark.drivers;
+
+import javax.annotation.Nullable;
+
+import org.apache.beam.integration.nexmark.NexmarkPerf;
+import org.apache.beam.integration.nexmark.NexmarkQuery;
+import org.apache.beam.integration.nexmark.NexmarkRunner;
+
+/**
+ * Run a query using the Apex runner.
+ */
+public class NexmarkApexRunner extends NexmarkRunner<NexmarkApexDriver.NexmarkApexOptions> {
+  @Override
+  protected boolean isStreaming() {
+    return options.isStreaming();
+  }
+
+  @Override
+  protected int coresPerWorker() {
+    return 4;
+  }
+
+  @Override
+  protected int maxNumWorkers() {
+    return 5;
+  }
+
+  @Override
+  protected void invokeBuilderForPublishOnlyPipeline(
+      PipelineBuilder builder) {
+    builder.build(options);
+  }
+
+  @Override
+  protected void waitForPublisherPreload() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  @Nullable
+  protected NexmarkPerf monitor(NexmarkQuery query) {
+    return null;
+  }
+
+  public NexmarkApexRunner(NexmarkApexDriver.NexmarkApexOptions options) {
+    super(options);
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/drivers/NexmarkDirectDriver.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/drivers/NexmarkDirectDriver.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/drivers/NexmarkDirectDriver.java
new file mode 100644
index 0000000..2b825f3
--- /dev/null
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/drivers/NexmarkDirectDriver.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.integration.nexmark.drivers;
+
+import org.apache.beam.integration.nexmark.NexmarkDriver;
+import org.apache.beam.integration.nexmark.NexmarkOptions;
+import org.apache.beam.runners.direct.DirectOptions;
+import org.apache.beam.runners.direct.DirectRunner;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+
+/**
+ * An implementation of the 'NEXMark queries' using the Direct Runner.
+ */
+class NexmarkDirectDriver extends NexmarkDriver<NexmarkDirectDriver.NexmarkDirectOptions> {
+  /**
+   * Command line flags.
+   */
+  public interface NexmarkDirectOptions extends NexmarkOptions, DirectOptions {
+  }
+
+  /**
+   * Entry point.
+   */
+  public static void main(String[] args) {
+    NexmarkDirectOptions options =
+        PipelineOptionsFactory.fromArgs(args)
+                              .withValidation()
+                              .as(NexmarkDirectOptions.class);
+    options.setRunner(DirectRunner.class);
+    NexmarkDirectRunner runner = new NexmarkDirectRunner(options);
+    new NexmarkDirectDriver().runAll(options, runner);
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/drivers/NexmarkDirectRunner.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/drivers/NexmarkDirectRunner.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/drivers/NexmarkDirectRunner.java
new file mode 100644
index 0000000..1391040
--- /dev/null
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/drivers/NexmarkDirectRunner.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.integration.nexmark.drivers;
+
+import org.apache.beam.integration.nexmark.NexmarkRunner;
+
+/**
+ * Run a single query using the Direct Runner.
+ */
+class NexmarkDirectRunner extends NexmarkRunner<NexmarkDirectDriver.NexmarkDirectOptions> {
+  public NexmarkDirectRunner(NexmarkDirectDriver.NexmarkDirectOptions options) {
+    super(options);
+  }
+
+  @Override
+  protected boolean isStreaming() {
+    return options.isStreaming();
+  }
+
+  @Override
+  protected int coresPerWorker() {
+    return 4;
+  }
+
+  @Override
+  protected int maxNumWorkers() {
+    return 1;
+  }
+
+  @Override
+  protected void invokeBuilderForPublishOnlyPipeline(PipelineBuilder builder) {
+    throw new UnsupportedOperationException(
+        "Cannot use --pubSubMode=COMBINED with DirectRunner");
+  }
+
+  /**
+   * Monitor the progress of the publisher job. Return when it has been generating events for
+   * at least {@code configuration.preloadSeconds}.
+   */
+  @Override
+  protected void waitForPublisherPreload() {
+    throw new UnsupportedOperationException(
+        "Cannot use --pubSubMode=COMBINED with DirectRunner");
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/drivers/NexmarkFlinkDriver.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/drivers/NexmarkFlinkDriver.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/drivers/NexmarkFlinkDriver.java
new file mode 100644
index 0000000..bf0b115
--- /dev/null
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/drivers/NexmarkFlinkDriver.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.integration.nexmark.drivers;
+
+import org.apache.beam.integration.nexmark.NexmarkDriver;
+import org.apache.beam.integration.nexmark.NexmarkOptions;
+import org.apache.beam.runners.flink.FlinkPipelineOptions;
+import org.apache.beam.runners.flink.FlinkRunner;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+
+/**
+ * Run NexMark queries using the Flink runner.
+ */
+public class NexmarkFlinkDriver extends NexmarkDriver<NexmarkFlinkDriver.NexmarkFlinkOptions> {
+  /**
+   * Command line flags.
+   */
+  public interface NexmarkFlinkOptions extends NexmarkOptions, FlinkPipelineOptions {
+  }
+
+  /**
+   * Entry point.
+   */
+  public static void main(String[] args) {
+    // Gather command line args, baseline, configurations, etc.
+    NexmarkFlinkOptions options = PipelineOptionsFactory.fromArgs(args)
+                                                        .withValidation()
+                                                        .as(NexmarkFlinkOptions.class);
+    options.setRunner(FlinkRunner.class);
+    NexmarkFlinkRunner runner = new NexmarkFlinkRunner(options);
+    new NexmarkFlinkDriver().runAll(options, runner);
+  }
+}
+
+

http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/drivers/NexmarkFlinkRunner.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/drivers/NexmarkFlinkRunner.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/drivers/NexmarkFlinkRunner.java
new file mode 100644
index 0000000..9d547ef
--- /dev/null
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/drivers/NexmarkFlinkRunner.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.integration.nexmark.drivers;
+
+import org.apache.beam.integration.nexmark.NexmarkRunner;
+
+/**
+ * Run a query using the Flink runner.
+ */
+public class NexmarkFlinkRunner extends NexmarkRunner<NexmarkFlinkDriver.NexmarkFlinkOptions> {
+  @Override
+  protected boolean isStreaming() {
+    return options.isStreaming();
+  }
+
+  @Override
+  protected int coresPerWorker() {
+    return 4;
+  }
+
+  @Override
+  protected int maxNumWorkers() {
+    return 5;
+  }
+
+  @Override
+  protected void invokeBuilderForPublishOnlyPipeline(
+      PipelineBuilder builder) {
+    builder.build(options);
+  }
+
+  @Override
+  protected void waitForPublisherPreload() {
+    throw new UnsupportedOperationException();
+  }
+
+  public NexmarkFlinkRunner(NexmarkFlinkDriver.NexmarkFlinkOptions options) {
+    super(options);
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/drivers/NexmarkGoogleDriver.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/drivers/NexmarkGoogleDriver.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/drivers/NexmarkGoogleDriver.java
new file mode 100644
index 0000000..f5a9751
--- /dev/null
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/drivers/NexmarkGoogleDriver.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.integration.nexmark.drivers;
+
+import org.apache.beam.integration.nexmark.NexmarkDriver;
+import org.apache.beam.integration.nexmark.NexmarkOptions;
+import org.apache.beam.integration.nexmark.model.Auction;
+import org.apache.beam.integration.nexmark.model.Bid;
+import org.apache.beam.integration.nexmark.model.Person;
+import org.apache.beam.runners.dataflow.DataflowRunner;
+import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+
+/**
+ * An implementation of the 'NEXMark queries' for Google Dataflow.
+ * These are multiple queries over a three table schema representing an online auction system:
+ * <ul>
+ * <li>{@link Person} represents a person submitting an item for auction and/or making a bid
+ * on an auction.
+ * <li>{@link Auction} represents an item under auction.
+ * <li>{@link Bid} represents a bid for an item under auction.
+ * </ul>
+ * The queries exercise many aspects of streaming dataflow.
+ *
+ * <p>We synthesize the creation of people, auctions and bids in real-time. The data is not
+ * particularly sensible.
+ *
+ * <p>See <a href="http://datalab.cs.pdx.edu/niagaraST/NEXMark/">
+ * http://datalab.cs.pdx.edu/niagaraST/NEXMark/</a>
+ */
+class NexmarkGoogleDriver extends NexmarkDriver<NexmarkGoogleDriver.NexmarkGoogleOptions> {
+  /**
+   * Command line flags.
+   */
+  public interface NexmarkGoogleOptions extends NexmarkOptions, DataflowPipelineOptions {
+
+  }
+
+  /**
+   * Entry point.
+   */
+  public static void main(String[] args) {
+    // Gather command line args, baseline, configurations, etc.
+    NexmarkGoogleOptions options = PipelineOptionsFactory.fromArgs(args)
+                                                         .withValidation()
+                                                         .as(NexmarkGoogleOptions.class);
+    options.setRunner(DataflowRunner.class);
+    NexmarkGoogleRunner runner = new NexmarkGoogleRunner(options);
+    new NexmarkGoogleDriver().runAll(options, runner);
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/drivers/NexmarkGoogleRunner.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/drivers/NexmarkGoogleRunner.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/drivers/NexmarkGoogleRunner.java
new file mode 100644
index 0000000..7ffd47a
--- /dev/null
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/drivers/NexmarkGoogleRunner.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.integration.nexmark.drivers;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import org.apache.beam.integration.nexmark.Monitor;
+import org.apache.beam.integration.nexmark.NexmarkRunner;
+import org.apache.beam.integration.nexmark.NexmarkUtils;
+import org.apache.beam.integration.nexmark.model.Event;
+import org.apache.beam.runners.dataflow.DataflowPipelineJob;
+import org.apache.beam.sdk.PipelineResult;
+import org.joda.time.Duration;
+
+/**
+ * Run a singe Nexmark query using a given configuration on Google Dataflow.
+ */
+class NexmarkGoogleRunner extends NexmarkRunner<NexmarkGoogleDriver.NexmarkGoogleOptions> {
+
+  public NexmarkGoogleRunner(NexmarkGoogleDriver.NexmarkGoogleOptions options) {
+    super(options);
+  }
+
+  @Override
+  protected boolean isStreaming() {
+    return options.isStreaming();
+  }
+
+  @Override
+  protected int coresPerWorker() {
+    String machineType = options.getWorkerMachineType();
+    if (machineType == null || machineType.isEmpty()) {
+      return 1;
+    }
+    String[] split = machineType.split("-");
+    if (split.length != 3) {
+      return 1;
+    }
+    try {
+      return Integer.parseInt(split[2]);
+    } catch (NumberFormatException ex) {
+      return 1;
+    }
+  }
+
+  @Override
+  protected int maxNumWorkers() {
+    return Math.max(options.getNumWorkers(), options.getMaxNumWorkers());
+  }
+
+  @Override
+  protected String getJobId(PipelineResult job) {
+    return ((DataflowPipelineJob) job).getJobId();
+  }
+
+  @Override
+  protected void invokeBuilderForPublishOnlyPipeline(PipelineBuilder builder) {
+    String jobName = options.getJobName();
+    String appName = options.getAppName();
+    options.setJobName("p-" + jobName);
+    options.setAppName("p-" + appName);
+    int coresPerWorker = coresPerWorker();
+    int eventGeneratorWorkers = (configuration.numEventGenerators + coresPerWorker - 1)
+                                / coresPerWorker;
+    options.setMaxNumWorkers(Math.min(options.getMaxNumWorkers(), eventGeneratorWorkers));
+    options.setNumWorkers(Math.min(options.getNumWorkers(), eventGeneratorWorkers));
+    publisherMonitor = new Monitor<Event>(queryName, "publisher");
+    try {
+      builder.build(options);
+    } finally {
+      options.setJobName(jobName);
+      options.setAppName(appName);
+      options.setMaxNumWorkers(options.getMaxNumWorkers());
+      options.setNumWorkers(options.getNumWorkers());
+    }
+  }
+
+  /**
+   * Monitor the progress of the publisher job. Return when it has been generating events for
+   * at least {@code configuration.preloadSeconds}.
+   */
+  @Override
+  protected void waitForPublisherPreload() {
+    checkNotNull(publisherMonitor);
+    checkNotNull(publisherResult);
+    if (!options.getMonitorJobs()) {
+      return;
+    }
+    if (!(publisherResult instanceof DataflowPipelineJob)) {
+      return;
+    }
+    if (configuration.preloadSeconds <= 0) {
+      return;
+    }
+
+    NexmarkUtils.console("waiting for publisher to pre-load");
+
+    DataflowPipelineJob job = (DataflowPipelineJob) publisherResult;
+
+    long numEvents = 0;
+    long startMsSinceEpoch = -1;
+    long endMsSinceEpoch = -1;
+    while (true) {
+      PipelineResult.State state = job.getState();
+      switch (state) {
+        case UNKNOWN:
+          // Keep waiting.
+          NexmarkUtils.console("%s publisher (%d events)", state, numEvents);
+          break;
+        case STOPPED:
+        case DONE:
+        case CANCELLED:
+        case FAILED:
+        case UPDATED:
+          NexmarkUtils.console("%s publisher (%d events)", state, numEvents);
+          return;
+        case RUNNING:
+          numEvents = getLong(job, publisherMonitor.getElementCounter());
+          if (startMsSinceEpoch < 0 && numEvents > 0) {
+            startMsSinceEpoch = System.currentTimeMillis();
+            endMsSinceEpoch = startMsSinceEpoch
+                              + Duration.standardSeconds(configuration.preloadSeconds).getMillis();
+          }
+          if (endMsSinceEpoch < 0) {
+            NexmarkUtils.console("%s publisher (%d events)", state, numEvents);
+          } else {
+            long remainMs = endMsSinceEpoch - System.currentTimeMillis();
+            if (remainMs > 0) {
+              NexmarkUtils.console("%s publisher (%d events, waiting for %ds)", state, numEvents,
+                  remainMs / 1000);
+            } else {
+              NexmarkUtils.console("publisher preloaded %d events", numEvents);
+              return;
+            }
+          }
+          break;
+      }
+
+      try {
+        Thread.sleep(PERF_DELAY.getMillis());
+      } catch (InterruptedException e) {
+        Thread.interrupted();
+        throw new RuntimeException("Interrupted: publisher still running.");
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/drivers/NexmarkSparkDriver.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/drivers/NexmarkSparkDriver.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/drivers/NexmarkSparkDriver.java
new file mode 100644
index 0000000..c7c32c2
--- /dev/null
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/drivers/NexmarkSparkDriver.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.integration.nexmark.drivers;
+
+import org.apache.beam.integration.nexmark.NexmarkDriver;
+import org.apache.beam.integration.nexmark.NexmarkOptions;
+import org.apache.beam.runners.spark.SparkPipelineOptions;
+import org.apache.beam.runners.spark.SparkRunner;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+
+/**
+ * Run NexMark queries using the Spark runner.
+ */
+class NexmarkSparkDriver extends NexmarkDriver<NexmarkSparkDriver.NexmarkSparkOptions> {
+    /**
+     * Command line flags.
+     */
+    public interface NexmarkSparkOptions extends NexmarkOptions, SparkPipelineOptions {
+    }
+
+    /**
+     * Entry point.
+     */
+    public static void main(String[] args) {
+        NexmarkSparkOptions options =
+                PipelineOptionsFactory.fromArgs(args)
+                        .withValidation()
+                        .as(NexmarkSparkOptions.class);
+        options.setRunner(SparkRunner.class);
+        NexmarkSparkRunner runner = new NexmarkSparkRunner(options);
+        new NexmarkSparkDriver().runAll(options, runner);
+    }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/drivers/NexmarkSparkRunner.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/drivers/NexmarkSparkRunner.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/drivers/NexmarkSparkRunner.java
new file mode 100644
index 0000000..1d49a3a
--- /dev/null
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/drivers/NexmarkSparkRunner.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.integration.nexmark.drivers;
+
+import org.apache.beam.integration.nexmark.NexmarkRunner;
+
+/**
+ * Run a query using the Spark runner.
+ */
+public class NexmarkSparkRunner extends NexmarkRunner<NexmarkSparkDriver.NexmarkSparkOptions> {
+    @Override
+    protected boolean isStreaming() {
+        return options.isStreaming();
+    }
+
+    @Override
+    protected int coresPerWorker() {
+        return 4;
+    }
+
+    @Override
+    protected int maxNumWorkers() {
+        return 5;
+    }
+
+    @Override
+    protected void invokeBuilderForPublishOnlyPipeline(
+            PipelineBuilder builder) {
+        builder.build(options);
+    }
+
+    @Override
+    protected void waitForPublisherPreload() {
+        throw new UnsupportedOperationException();
+    }
+
+
+    public NexmarkSparkRunner(NexmarkSparkDriver.NexmarkSparkOptions options) {
+        super(options);
+    }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/drivers/package-info.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/drivers/package-info.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/drivers/package-info.java
new file mode 100644
index 0000000..c8aa144
--- /dev/null
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/drivers/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Nexmark Benchmark Execution Drivers.
+ */
+package org.apache.beam.integration.nexmark.drivers;

http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/io/PubsubHelper.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/io/PubsubHelper.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/io/PubsubHelper.java
new file mode 100644
index 0000000..f5cfc2b
--- /dev/null
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/io/PubsubHelper.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.integration.nexmark.io;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.beam.integration.nexmark.NexmarkUtils;
+import org.apache.beam.sdk.options.PubsubOptions;
+import org.apache.beam.sdk.util.PubsubClient;
+import org.apache.beam.sdk.util.PubsubJsonClient;
+
+/**
+ * Helper for working with pubsub.
+ */
+public class PubsubHelper implements AutoCloseable {
+  /**
+   * Underlying pub/sub client.
+   */
+  private final PubsubClient pubsubClient;
+
+  /**
+   * Project id.
+   */
+  private final String projectId;
+
+  /**
+   * Topics we should delete on close.
+   */
+  private final List<PubsubClient.TopicPath> createdTopics;
+
+  /**
+   * Subscriptions we should delete on close.
+   */
+  private final List<PubsubClient.SubscriptionPath> createdSubscriptions;
+
+  private PubsubHelper(PubsubClient pubsubClient, String projectId) {
+    this.pubsubClient = pubsubClient;
+    this.projectId = projectId;
+    createdTopics = new ArrayList<>();
+    createdSubscriptions = new ArrayList<>();
+  }
+
+  /**
+   * Create a helper.
+   */
+  public static PubsubHelper create(PubsubOptions options) {
+    try {
+      return new PubsubHelper(
+          PubsubJsonClient.FACTORY.newClient(null, null, options),
+          options.getProject());
+    } catch (IOException e) {
+      throw new RuntimeException("Unable to create Pubsub client: ", e);
+    }
+  }
+
+  /**
+   * Create a topic from short name. Delete it if it already exists. Ensure the topic will be
+   * deleted on cleanup. Return full topic name.
+   */
+  public PubsubClient.TopicPath createTopic(String shortTopic) {
+    PubsubClient.TopicPath topic = PubsubClient.topicPathFromName(projectId, shortTopic);
+    try {
+      if (topicExists(shortTopic)) {
+        NexmarkUtils.console("attempting to cleanup topic %s", topic);
+        pubsubClient.deleteTopic(topic);
+      }
+      NexmarkUtils.console("create topic %s", topic);
+      pubsubClient.createTopic(topic);
+      createdTopics.add(topic);
+      return topic;
+    } catch (IOException e) {
+      throw new RuntimeException("Unable to create Pubsub topic " + topic + ": ", e);
+    }
+  }
+
+  /**
+   * Create a topic from short name if it does not already exist. The topic will not be
+   * deleted on cleanup. Return full topic name.
+   */
+  public PubsubClient.TopicPath createOrReuseTopic(String shortTopic) {
+    PubsubClient.TopicPath topic = PubsubClient.topicPathFromName(projectId, shortTopic);
+    try {
+      if (topicExists(shortTopic)) {
+        NexmarkUtils.console("topic %s already exists", topic);
+        return topic;
+      }
+      NexmarkUtils.console("create topic %s", topic);
+      pubsubClient.createTopic(topic);
+      return topic;
+    } catch (IOException e) {
+      throw new RuntimeException("Unable to create or reuse Pubsub topic " + topic + ": ", e);
+    }
+  }
+
+  /**
+   * Check a topic corresponding to short name exists, and throw exception if not. The
+   * topic will not be deleted on cleanup. Return full topic name.
+   */
+  public PubsubClient.TopicPath reuseTopic(String shortTopic) {
+    PubsubClient.TopicPath topic = PubsubClient.topicPathFromName(projectId, shortTopic);
+    if (topicExists(shortTopic)) {
+      NexmarkUtils.console("reusing existing topic %s", topic);
+      return topic;
+    }
+    throw new RuntimeException("topic '" + topic + "' does not already exist");
+  }
+
+  /**
+   * Does topic corresponding to short name exist?
+   */
+  public boolean topicExists(String shortTopic) {
+    PubsubClient.ProjectPath project = PubsubClient.projectPathFromId(projectId);
+    PubsubClient.TopicPath topic = PubsubClient.topicPathFromName(projectId, shortTopic);
+    try {
+      Collection<PubsubClient.TopicPath> existingTopics = pubsubClient.listTopics(project);
+      return existingTopics.contains(topic);
+    } catch (IOException e) {
+      throw new RuntimeException("Unable to check Pubsub topic " + topic + ": ", e);
+    }
+  }
+
+  /**
+   * Create subscription from short name. Delete subscription if it already exists. Ensure the
+   * subscription will be deleted on cleanup. Return full subscription name.
+   */
+  public PubsubClient.SubscriptionPath createSubscription(
+      String shortTopic, String shortSubscription) {
+    PubsubClient.TopicPath topic = PubsubClient.topicPathFromName(projectId, shortTopic);
+    PubsubClient.SubscriptionPath subscription =
+        PubsubClient.subscriptionPathFromName(projectId, shortSubscription);
+    try {
+      if (subscriptionExists(shortTopic, shortSubscription)) {
+        NexmarkUtils.console("attempting to cleanup subscription %s", subscription);
+        pubsubClient.deleteSubscription(subscription);
+      }
+      NexmarkUtils.console("create subscription %s", subscription);
+      pubsubClient.createSubscription(topic, subscription, 60);
+      createdSubscriptions.add(subscription);
+    } catch (IOException e) {
+      throw new RuntimeException("Unable to create Pubsub subscription " + subscription + ": ", e);
+    }
+    return subscription;
+  }
+
+  /**
+   * Check a subscription corresponding to short name exists, and throw exception if not. The
+   * subscription will not be deleted on cleanup. Return full topic name.
+   */
+  public PubsubClient.SubscriptionPath reuseSubscription(
+      String shortTopic, String shortSubscription) {
+    PubsubClient.SubscriptionPath subscription =
+        PubsubClient.subscriptionPathFromName(projectId, shortSubscription);
+    if (subscriptionExists(shortTopic, shortSubscription)) {
+      NexmarkUtils.console("reusing existing subscription %s", subscription);
+      return subscription;
+    }
+    throw new RuntimeException("subscription'" + subscription + "' does not already exist");
+  }
+
+  /**
+   * Does subscription corresponding to short name exist?
+   */
+  public boolean subscriptionExists(String shortTopic, String shortSubscription) {
+    PubsubClient.ProjectPath project = PubsubClient.projectPathFromId(projectId);
+    PubsubClient.TopicPath topic = PubsubClient.topicPathFromName(projectId, shortTopic);
+    PubsubClient.SubscriptionPath subscription =
+        PubsubClient.subscriptionPathFromName(projectId, shortSubscription);
+    try {
+      Collection<PubsubClient.SubscriptionPath> existingSubscriptions =
+          pubsubClient.listSubscriptions(project, topic);
+      return existingSubscriptions.contains(subscription);
+    } catch (IOException e) {
+      throw new RuntimeException("Unable to check Pubsub subscription" + subscription + ": ", e);
+    }
+  }
+
+  /**
+   * Delete all the subscriptions and topics we created.
+   */
+  @Override
+  public void close() {
+    for (PubsubClient.SubscriptionPath subscription : createdSubscriptions) {
+      try {
+        NexmarkUtils.console("delete subscription %s", subscription);
+        pubsubClient.deleteSubscription(subscription);
+      } catch (IOException ex) {
+        NexmarkUtils.console("could not delete subscription %s", subscription);
+      }
+    }
+    for (PubsubClient.TopicPath topic : createdTopics) {
+      try {
+        NexmarkUtils.console("delete topic %s", topic);
+        pubsubClient.deleteTopic(topic);
+      } catch (IOException ex) {
+        NexmarkUtils.console("could not delete topic %s", topic);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/io/package-info.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/io/package-info.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/io/package-info.java
new file mode 100644
index 0000000..1161f3e
--- /dev/null
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/io/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Nexmark Beam IO related utilities.
+ */
+package org.apache.beam.integration.nexmark.io;

http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Auction.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Auction.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Auction.java
new file mode 100644
index 0000000..ac30568
--- /dev/null
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Auction.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.integration.nexmark.model;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonProcessingException;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.Serializable;
+
+import org.apache.beam.integration.nexmark.NexmarkUtils;
+import org.apache.beam.sdk.coders.AtomicCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.VarLongCoder;
+
+/**
+ * An auction submitted by a person.
+ */
+public class Auction implements KnownSize, Serializable {
+  private static final Coder<Long> LONG_CODER = VarLongCoder.of();
+  private static final Coder<String> STRING_CODER = StringUtf8Coder.of();
+
+  public static final Coder<Auction> CODER = new AtomicCoder<Auction>() {
+    @Override
+    public void encode(Auction value, OutputStream outStream,
+        Coder.Context context)
+        throws CoderException, IOException {
+      LONG_CODER.encode(value.id, outStream, Context.NESTED);
+      STRING_CODER.encode(value.itemName, outStream, Context.NESTED);
+      STRING_CODER.encode(value.description, outStream, Context.NESTED);
+      LONG_CODER.encode(value.initialBid, outStream, Context.NESTED);
+      LONG_CODER.encode(value.reserve, outStream, Context.NESTED);
+      LONG_CODER.encode(value.dateTime, outStream, Context.NESTED);
+      LONG_CODER.encode(value.expires, outStream, Context.NESTED);
+      LONG_CODER.encode(value.seller, outStream, Context.NESTED);
+      LONG_CODER.encode(value.category, outStream, Context.NESTED);
+      STRING_CODER.encode(value.extra, outStream, Context.NESTED);
+    }
+
+    @Override
+    public Auction decode(
+        InputStream inStream, Coder.Context context)
+        throws CoderException, IOException {
+      long id = LONG_CODER.decode(inStream, Context.NESTED);
+      String itemName = STRING_CODER.decode(inStream, Context.NESTED);
+      String description = STRING_CODER.decode(inStream, Context.NESTED);
+      long initialBid = LONG_CODER.decode(inStream, Context.NESTED);
+      long reserve = LONG_CODER.decode(inStream, Context.NESTED);
+      long dateTime = LONG_CODER.decode(inStream, Context.NESTED);
+      long expires = LONG_CODER.decode(inStream, Context.NESTED);
+      long seller = LONG_CODER.decode(inStream, Context.NESTED);
+      long category = LONG_CODER.decode(inStream, Context.NESTED);
+      String extra = STRING_CODER.decode(inStream, Context.NESTED);
+      return new Auction(
+          id, itemName, description, initialBid, reserve, dateTime, expires, seller, category,
+          extra);
+    }
+  };
+
+
+  /** Id of auction. */
+  @JsonProperty
+  public final long id; // primary key
+
+  /** Extra auction properties. */
+  @JsonProperty
+  public final String itemName;
+
+  @JsonProperty
+  public final String description;
+
+  /** Initial bid price, in cents. */
+  @JsonProperty
+  public final long initialBid;
+
+  /** Reserve price, in cents. */
+  @JsonProperty
+  public final long reserve;
+
+  @JsonProperty
+  public final long dateTime;
+
+  /** When does auction expire? (ms since epoch). Bids at or after this time are ignored. */
+  @JsonProperty
+  public final long expires;
+
+  /** Id of person who instigated auction. */
+  @JsonProperty
+  public final long seller; // foreign key: Person.id
+
+  /** Id of category auction is listed under. */
+  @JsonProperty
+  public final long category; // foreign key: Category.id
+
+  /** Additional arbitrary payload for performance testing. */
+  @JsonProperty
+  public final String extra;
+
+
+  // For Avro only.
+  @SuppressWarnings("unused")
+  private Auction() {
+    id = 0;
+    itemName = null;
+    description = null;
+    initialBid = 0;
+    reserve = 0;
+    dateTime = 0;
+    expires = 0;
+    seller = 0;
+    category = 0;
+    extra = null;
+  }
+
+  public Auction(long id, String itemName, String description, long initialBid, long reserve,
+      long dateTime, long expires, long seller, long category, String extra) {
+    this.id = id;
+    this.itemName = itemName;
+    this.description = description;
+    this.initialBid = initialBid;
+    this.reserve = reserve;
+    this.dateTime = dateTime;
+    this.expires = expires;
+    this.seller = seller;
+    this.category = category;
+    this.extra = extra;
+  }
+
+  /**
+   * Return a copy of auction which capture the given annotation.
+   * (Used for debugging).
+   */
+  public Auction withAnnotation(String annotation) {
+    return new Auction(id, itemName, description, initialBid, reserve, dateTime, expires, seller,
+        category, annotation + ": " + extra);
+  }
+
+  /**
+   * Does auction have {@code annotation}? (Used for debugging.)
+   */
+  public boolean hasAnnotation(String annotation) {
+    return extra.startsWith(annotation + ": ");
+  }
+
+  /**
+   * Remove {@code annotation} from auction. (Used for debugging.)
+   */
+  public Auction withoutAnnotation(String annotation) {
+    if (hasAnnotation(annotation)) {
+      return new Auction(id, itemName, description, initialBid, reserve, dateTime, expires, seller,
+          category, extra.substring(annotation.length() + 2));
+    } else {
+      return this;
+    }
+  }
+
+  @Override
+  public long sizeInBytes() {
+    return 8 + itemName.length() + 1 + description.length() + 1 + 8 + 8 + 8 + 8 + 8 + 8
+        + extra.length() + 1;
+  }
+
+  @Override
+  public String toString() {
+    try {
+      return NexmarkUtils.MAPPER.writeValueAsString(this);
+    } catch (JsonProcessingException e) {
+      throw new RuntimeException(e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/AuctionBid.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/AuctionBid.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/AuctionBid.java
new file mode 100644
index 0000000..c014257
--- /dev/null
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/AuctionBid.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.integration.nexmark.model;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonProcessingException;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.Serializable;
+
+import org.apache.beam.integration.nexmark.NexmarkUtils;
+import org.apache.beam.integration.nexmark.WinningBids;
+import org.apache.beam.sdk.coders.AtomicCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+
+/**
+ * Result of {@link WinningBids} transform.
+ */
+public class AuctionBid implements KnownSize, Serializable {
+  public static final Coder<AuctionBid> CODER = new AtomicCoder<AuctionBid>() {
+    @Override
+    public void encode(AuctionBid value, OutputStream outStream,
+        Coder.Context context)
+        throws CoderException, IOException {
+      Auction.CODER.encode(value.auction, outStream, Context.NESTED);
+      Bid.CODER.encode(value.bid, outStream, Context.NESTED);
+    }
+
+    @Override
+    public AuctionBid decode(
+        InputStream inStream, Coder.Context context)
+        throws CoderException, IOException {
+      Auction auction = Auction.CODER.decode(inStream, Context.NESTED);
+      Bid bid = Bid.CODER.decode(inStream, Context.NESTED);
+      return new AuctionBid(auction, bid);
+    }
+  };
+
+  @JsonProperty
+  public final Auction auction;
+
+  @JsonProperty
+  public final Bid bid;
+
+  // For Avro only.
+  @SuppressWarnings("unused")
+  private AuctionBid() {
+    auction = null;
+    bid = null;
+  }
+
+  public AuctionBid(Auction auction, Bid bid) {
+    this.auction = auction;
+    this.bid = bid;
+  }
+
+  @Override
+  public long sizeInBytes() {
+    return auction.sizeInBytes() + bid.sizeInBytes();
+  }
+
+  @Override
+  public String toString() {
+    try {
+      return NexmarkUtils.MAPPER.writeValueAsString(this);
+    } catch (JsonProcessingException e) {
+      throw new RuntimeException(e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/AuctionCount.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/AuctionCount.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/AuctionCount.java
new file mode 100644
index 0000000..aa16629
--- /dev/null
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/AuctionCount.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.integration.nexmark.model;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonProcessingException;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.Serializable;
+
+import org.apache.beam.integration.nexmark.NexmarkUtils;
+import org.apache.beam.sdk.coders.AtomicCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.VarLongCoder;
+
+/**
+ * Result of Query5.
+ */
+public class AuctionCount implements KnownSize, Serializable {
+  private static final Coder<Long> LONG_CODER = VarLongCoder.of();
+
+  public static final Coder<AuctionCount> CODER = new AtomicCoder<AuctionCount>() {
+    @Override
+    public void encode(AuctionCount value, OutputStream outStream,
+        Coder.Context context)
+        throws CoderException, IOException {
+      LONG_CODER.encode(value.auction, outStream, Context.NESTED);
+      LONG_CODER.encode(value.count, outStream, Context.NESTED);
+    }
+
+    @Override
+    public AuctionCount decode(
+        InputStream inStream, Coder.Context context)
+        throws CoderException, IOException {
+      long auction = LONG_CODER.decode(inStream, Context.NESTED);
+      long count = LONG_CODER.decode(inStream, Context.NESTED);
+      return new AuctionCount(auction, count);
+    }
+  };
+
+  @JsonProperty
+  public final long auction;
+
+  @JsonProperty
+  public final long count;
+
+  // For Avro only.
+  @SuppressWarnings("unused")
+  private AuctionCount() {
+    auction = 0;
+    count = 0;
+  }
+
+  public AuctionCount(long auction, long count) {
+    this.auction = auction;
+    this.count = count;
+  }
+
+  @Override
+  public long sizeInBytes() {
+    return 8 + 8;
+  }
+
+  @Override
+  public String toString() {
+    try {
+      return NexmarkUtils.MAPPER.writeValueAsString(this);
+    } catch (JsonProcessingException e) {
+      throw new RuntimeException(e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/AuctionPrice.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/AuctionPrice.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/AuctionPrice.java
new file mode 100644
index 0000000..f365cc8
--- /dev/null
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/AuctionPrice.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.integration.nexmark.model;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonProcessingException;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.Serializable;
+
+import org.apache.beam.integration.nexmark.NexmarkUtils;
+import org.apache.beam.sdk.coders.AtomicCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.VarLongCoder;
+
+/**
+ * Result of Query2.
+ */
+public class AuctionPrice implements KnownSize, Serializable {
+  private static final Coder<Long> LONG_CODER = VarLongCoder.of();
+
+  public static final Coder<AuctionPrice> CODER = new AtomicCoder<AuctionPrice>() {
+    @Override
+    public void encode(AuctionPrice value, OutputStream outStream,
+        Coder.Context context)
+        throws CoderException, IOException {
+      LONG_CODER.encode(value.auction, outStream, Context.NESTED);
+      LONG_CODER.encode(value.price, outStream, Context.NESTED);
+    }
+
+    @Override
+    public AuctionPrice decode(
+        InputStream inStream, Coder.Context context)
+        throws CoderException, IOException {
+      long auction = LONG_CODER.decode(inStream, Context.NESTED);
+      long price = LONG_CODER.decode(inStream, Context.NESTED);
+      return new AuctionPrice(auction, price);
+    }
+  };
+
+  @JsonProperty
+  public final long auction;
+
+  /** Price in cents. */
+  @JsonProperty
+  public final long price;
+
+  // For Avro only.
+  @SuppressWarnings("unused")
+  private AuctionPrice() {
+    auction = 0;
+    price = 0;
+  }
+
+  public AuctionPrice(long auction, long price) {
+    this.auction = auction;
+    this.price = price;
+  }
+
+  @Override
+  public long sizeInBytes() {
+    return 8 + 8;
+  }
+
+  @Override
+  public String toString() {
+    try {
+      return NexmarkUtils.MAPPER.writeValueAsString(this);
+    } catch (JsonProcessingException e) {
+      throw new RuntimeException(e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Bid.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Bid.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Bid.java
new file mode 100644
index 0000000..59a33c1
--- /dev/null
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/Bid.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.integration.nexmark.model;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonProcessingException;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.Serializable;
+import java.util.Comparator;
+
+import org.apache.beam.integration.nexmark.NexmarkUtils;
+import org.apache.beam.sdk.coders.AtomicCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.VarLongCoder;
+
+/**
+ * A bid for an item on auction.
+ */
+public class Bid implements KnownSize, Serializable {
+  private static final Coder<Long> LONG_CODER = VarLongCoder.of();
+  private static final Coder<String> STRING_CODER = StringUtf8Coder.of();
+
+  public static final Coder<Bid> CODER = new AtomicCoder<Bid>() {
+    @Override
+    public void encode(Bid value, OutputStream outStream,
+        Coder.Context context)
+        throws CoderException, IOException {
+      LONG_CODER.encode(value.auction, outStream, Context.NESTED);
+      LONG_CODER.encode(value.bidder, outStream, Context.NESTED);
+      LONG_CODER.encode(value.price, outStream, Context.NESTED);
+      LONG_CODER.encode(value.dateTime, outStream, Context.NESTED);
+      STRING_CODER.encode(value.extra, outStream, Context.NESTED);
+    }
+
+    @Override
+    public Bid decode(
+        InputStream inStream, Coder.Context context)
+        throws CoderException, IOException {
+      long auction = LONG_CODER.decode(inStream, Context.NESTED);
+      long bidder = LONG_CODER.decode(inStream, Context.NESTED);
+      long price = LONG_CODER.decode(inStream, Context.NESTED);
+      long dateTime = LONG_CODER.decode(inStream, Context.NESTED);
+      String extra = STRING_CODER.decode(inStream, Context.NESTED);
+      return new Bid(auction, bidder, price, dateTime, extra);
+    }
+  };
+
+  /**
+   * Comparator to order bids by ascending price then descending time
+   * (for finding winning bids).
+   */
+  public static final Comparator<Bid> PRICE_THEN_DESCENDING_TIME = new Comparator<Bid>() {
+    @Override
+    public int compare(Bid left, Bid right) {
+      int i = Double.compare(left.price, right.price);
+      if (i != 0) {
+        return i;
+      }
+      return Long.compare(right.dateTime, left.dateTime);
+    }
+  };
+
+  /**
+   * Comparator to order bids by ascending time then ascending price.
+   * (for finding most recent bids).
+   */
+  public static final Comparator<Bid> ASCENDING_TIME_THEN_PRICE = new Comparator<Bid>() {
+    @Override
+    public int compare(Bid left, Bid right) {
+      int i = Long.compare(left.dateTime, right.dateTime);
+      if (i != 0) {
+        return i;
+      }
+      return Double.compare(left.price, right.price);
+    }
+  };
+
+  /** Id of auction this bid is for. */
+  @JsonProperty
+  public final long auction; // foreign key: Auction.id
+
+  /** Id of person bidding in auction. */
+  @JsonProperty
+  public final long bidder; // foreign key: Person.id
+
+  /** Price of bid, in cents. */
+  @JsonProperty
+  public final long price;
+
+  /**
+   * Instant at which bid was made (ms since epoch).
+   * NOTE: This may be earlier than the system's event time.
+   */
+  @JsonProperty
+  public final long dateTime;
+
+  /** Additional arbitrary payload for performance testing. */
+  @JsonProperty
+  public final String extra;
+
+  // For Avro only.
+  @SuppressWarnings("unused")
+  private Bid() {
+    auction = 0;
+    bidder = 0;
+    price = 0;
+    dateTime = 0;
+    extra = null;
+  }
+
+  public Bid(long auction, long bidder, long price, long dateTime, String extra) {
+    this.auction = auction;
+    this.bidder = bidder;
+    this.price = price;
+    this.dateTime = dateTime;
+    this.extra = extra;
+  }
+
+  /**
+   * Return a copy of bid which capture the given annotation.
+   * (Used for debugging).
+   */
+  public Bid withAnnotation(String annotation) {
+    return new Bid(auction, bidder, price, dateTime, annotation + ": " + extra);
+  }
+
+  /**
+   * Does bid have {@code annotation}? (Used for debugging.)
+   */
+  public boolean hasAnnotation(String annotation) {
+    return extra.startsWith(annotation + ": ");
+  }
+
+  /**
+   * Remove {@code annotation} from bid. (Used for debugging.)
+   */
+  public Bid withoutAnnotation(String annotation) {
+    if (hasAnnotation(annotation)) {
+      return new Bid(auction, bidder, price, dateTime, extra.substring(annotation.length() + 2));
+    } else {
+      return this;
+    }
+  }
+
+  @Override
+  public long sizeInBytes() {
+    return 8 + 8 + 8 + 8 + extra.length() + 1;
+  }
+
+  @Override
+  public String toString() {
+    try {
+      return NexmarkUtils.MAPPER.writeValueAsString(this);
+    } catch (JsonProcessingException e) {
+      throw new RuntimeException(e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/a7f9f7d0/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/BidsPerSession.java
----------------------------------------------------------------------
diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/BidsPerSession.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/BidsPerSession.java
new file mode 100644
index 0000000..7c4dfae
--- /dev/null
+++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/model/BidsPerSession.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.integration.nexmark.model;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonProcessingException;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.Serializable;
+
+import org.apache.beam.integration.nexmark.NexmarkUtils;
+import org.apache.beam.sdk.coders.AtomicCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.VarLongCoder;
+
+/**
+ * Result of query 11.
+ */
+public class BidsPerSession implements KnownSize, Serializable {
+  private static final Coder<Long> LONG_CODER = VarLongCoder.of();
+
+  public static final Coder<BidsPerSession> CODER = new AtomicCoder<BidsPerSession>() {
+    @Override
+    public void encode(BidsPerSession value, OutputStream outStream,
+        Coder.Context context)
+        throws CoderException, IOException {
+      LONG_CODER.encode(value.personId, outStream, Context.NESTED);
+      LONG_CODER.encode(value.bidsPerSession, outStream, Context.NESTED);
+    }
+
+    @Override
+    public BidsPerSession decode(
+        InputStream inStream, Coder.Context context)
+        throws CoderException, IOException {
+      long personId = LONG_CODER.decode(inStream, Context.NESTED);
+      long bidsPerSession = LONG_CODER.decode(inStream, Context.NESTED);
+      return new BidsPerSession(personId, bidsPerSession);
+    }
+  };
+
+  @JsonProperty
+  public final long personId;
+
+  @JsonProperty
+  public final long bidsPerSession;
+
+  public BidsPerSession() {
+    personId = 0;
+    bidsPerSession = 0;
+  }
+
+  public BidsPerSession(long personId, long bidsPerSession) {
+    this.personId = personId;
+    this.bidsPerSession = bidsPerSession;
+  }
+
+  @Override
+  public long sizeInBytes() {
+    // Two longs.
+    return 8 + 8;
+  }
+
+  @Override
+  public String toString() {
+    try {
+      return NexmarkUtils.MAPPER.writeValueAsString(this);
+    } catch (JsonProcessingException e) {
+      throw new RuntimeException(e);
+    }
+  }
+}