You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ie...@apache.org on 2017/08/23 17:09:52 UTC

[44/55] [abbrv] beam git commit: Move module beam-integration-java-nexmark to beam-sdks-java-nexmark

http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkUtils.java
----------------------------------------------------------------------
diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkUtils.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkUtils.java
new file mode 100644
index 0000000..fa1ef16
--- /dev/null
+++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkUtils.java
@@ -0,0 +1,674 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.nexmark;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.hash.Hashing;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.Iterator;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.AvroCoder;
+import org.apache.beam.sdk.coders.ByteArrayCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.beam.sdk.coders.CustomCoder;
+import org.apache.beam.sdk.coders.SerializableCoder;
+import org.apache.beam.sdk.io.Read;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Metrics;
+import org.apache.beam.sdk.nexmark.model.Auction;
+import org.apache.beam.sdk.nexmark.model.AuctionBid;
+import org.apache.beam.sdk.nexmark.model.AuctionCount;
+import org.apache.beam.sdk.nexmark.model.AuctionPrice;
+import org.apache.beam.sdk.nexmark.model.Bid;
+import org.apache.beam.sdk.nexmark.model.BidsPerSession;
+import org.apache.beam.sdk.nexmark.model.CategoryPrice;
+import org.apache.beam.sdk.nexmark.model.Done;
+import org.apache.beam.sdk.nexmark.model.Event;
+import org.apache.beam.sdk.nexmark.model.IdNameReserve;
+import org.apache.beam.sdk.nexmark.model.KnownSize;
+import org.apache.beam.sdk.nexmark.model.NameCityStateId;
+import org.apache.beam.sdk.nexmark.model.Person;
+import org.apache.beam.sdk.nexmark.model.SellerPrice;
+import org.apache.beam.sdk.nexmark.sources.BoundedEventSource;
+import org.apache.beam.sdk.nexmark.sources.Generator;
+import org.apache.beam.sdk.nexmark.sources.GeneratorConfig;
+import org.apache.beam.sdk.nexmark.sources.UnboundedEventSource;
+import org.apache.beam.sdk.state.StateSpec;
+import org.apache.beam.sdk.state.StateSpecs;
+import org.apache.beam.sdk.state.ValueState;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.windowing.AfterPane;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Odd's 'n Ends used throughout queries and driver.
+ */
+public class NexmarkUtils {
+  private static final Logger LOG = LoggerFactory.getLogger(NexmarkUtils.class);
+
+  /**
+   * Mapper for (de)serializing JSON.
+   */
+  public static final ObjectMapper MAPPER = new ObjectMapper();
+
+  /**
+   * Possible sources for events.
+   */
+  public enum SourceType {
+    /**
+     * Produce events directly.
+     */
+    DIRECT,
+    /**
+     * Read events from an Avro file.
+     */
+    AVRO,
+    /**
+     * Read from a PubSub topic. It will be fed the same synthetic events by this pipeline.
+     */
+    PUBSUB
+  }
+
+  /**
+   * Possible sinks for query results.
+   */
+  public enum SinkType {
+    /**
+     * Discard all results.
+     */
+    COUNT_ONLY,
+    /**
+     * Discard all results after converting them to strings.
+     */
+    DEVNULL,
+    /**
+     * Write to a PubSub topic. It will be drained by this pipeline.
+     */
+    PUBSUB,
+    /**
+     * Write to a text file. Only works in batch mode.
+     */
+    TEXT,
+    /**
+     * Write raw Events to Avro. Only works in batch mode.
+     */
+    AVRO,
+    /**
+     * Write raw Events to BigQuery.
+     */
+    BIGQUERY,
+  }
+
+  /**
+   * Pub/sub mode to run in.
+   */
+  public enum PubSubMode {
+    /**
+     * Publish events to pub/sub, but don't run the query.
+     */
+    PUBLISH_ONLY,
+    /**
+     * Consume events from pub/sub and run the query, but don't publish.
+     */
+    SUBSCRIBE_ONLY,
+    /**
+     * Both publish and consume, but as separate jobs.
+     */
+    COMBINED
+  }
+
+  /**
+   * Coder strategies.
+   */
+  public enum CoderStrategy {
+    /**
+     * Hand-written.
+     */
+    HAND,
+    /**
+     * Avro.
+     */
+    AVRO,
+    /**
+     * Java serialization.
+     */
+    JAVA
+  }
+
+  /**
+   * How to determine resource names.
+   */
+  public enum ResourceNameMode {
+    /** Names are used as provided. */
+    VERBATIM,
+    /** Names are suffixed with the query being run. */
+    QUERY,
+    /** Names are suffixed with the query being run and a random number. */
+    QUERY_AND_SALT
+  }
+
+  /**
+   * Units for rates.
+   */
+  public enum RateUnit {
+    PER_SECOND(1_000_000L),
+    PER_MINUTE(60_000_000L);
+
+    RateUnit(long usPerUnit) {
+      this.usPerUnit = usPerUnit;
+    }
+
+    /**
+     * Number of microseconds per unit.
+     */
+    private final long usPerUnit;
+
+    /**
+     * Number of microseconds between events at given rate.
+     */
+    public long rateToPeriodUs(long rate) {
+      return (usPerUnit + rate / 2) / rate;
+    }
+  }
+
+  /**
+   * Shape of event rate.
+   */
+  public enum RateShape {
+    SQUARE,
+    SINE;
+
+    /**
+     * Number of steps used to approximate sine wave.
+     */
+    private static final int N = 10;
+
+    /**
+     * Return inter-event delay, in microseconds, for each generator
+     * to follow in order to achieve {@code rate} at {@code unit} using {@code numGenerators}.
+     */
+    public long interEventDelayUs(int rate, RateUnit unit, int numGenerators) {
+      return unit.rateToPeriodUs(rate) * numGenerators;
+    }
+
+    /**
+     * Return array of successive inter-event delays, in microseconds, for each generator
+     * to follow in order to achieve this shape with {@code firstRate/nextRate} at
+     * {@code unit} using {@code numGenerators}.
+     */
+    public long[] interEventDelayUs(
+        int firstRate, int nextRate, RateUnit unit, int numGenerators) {
+      if (firstRate == nextRate) {
+        long[] interEventDelayUs = new long[1];
+        interEventDelayUs[0] = unit.rateToPeriodUs(firstRate) * numGenerators;
+        return interEventDelayUs;
+      }
+
+      switch (this) {
+        case SQUARE: {
+          long[] interEventDelayUs = new long[2];
+          interEventDelayUs[0] = unit.rateToPeriodUs(firstRate) * numGenerators;
+          interEventDelayUs[1] = unit.rateToPeriodUs(nextRate) * numGenerators;
+          return interEventDelayUs;
+        }
+        case SINE: {
+          double mid = (firstRate + nextRate) / 2.0;
+          double amp = (firstRate - nextRate) / 2.0; // may be -ve
+          long[] interEventDelayUs = new long[N];
+          for (int i = 0; i < N; i++) {
+            double r = (2.0 * Math.PI * i) / N;
+            double rate = mid + amp * Math.cos(r);
+            interEventDelayUs[i] = unit.rateToPeriodUs(Math.round(rate)) * numGenerators;
+          }
+          return interEventDelayUs;
+        }
+      }
+      throw new RuntimeException(); // switch should be exhaustive
+    }
+
+    /**
+     * Return delay between steps, in seconds, for result of {@link #interEventDelayUs}, so
+     * as to cycle through the entire sequence every {@code ratePeriodSec}.
+     */
+    public int stepLengthSec(int ratePeriodSec) {
+      int n = 0;
+      switch (this) {
+        case SQUARE:
+          n = 2;
+          break;
+        case SINE:
+          n = N;
+          break;
+      }
+      return (ratePeriodSec + n - 1) / n;
+    }
+  }
+
+  /**
+   * Set to true to capture all info messages. The logging level flags don't currently work.
+   */
+  private static final boolean LOG_INFO = false;
+
+  /**
+   * Set to true to capture all error messages. The logging level flags don't currently work.
+   */
+  private static final boolean LOG_ERROR = true;
+
+  /**
+   * Set to true to log directly to stdout. If run using Google Dataflow, you can watch the results
+   * in real-time with: tail -f /var/log/dataflow/streaming-harness/harness-stdout.log
+   */
+  private static final boolean LOG_TO_CONSOLE = false;
+
+  /**
+   * Log info message.
+   */
+  public static void info(String format, Object... args) {
+    if (LOG_INFO) {
+      LOG.info(String.format(format, args));
+      if (LOG_TO_CONSOLE) {
+        System.out.println(String.format(format, args));
+      }
+    }
+  }
+
+  /**
+   * Log message to console. For client side only.
+   */
+  public static void console(String format, Object... args) {
+    System.out.printf("%s %s%n", Instant.now(), String.format(format, args));
+  }
+
+  /**
+   * Label to use for timestamps on pub/sub messages.
+   */
+  public static final String PUBSUB_TIMESTAMP = "timestamp";
+
+  /**
+   * Label to use for windmill ids on pub/sub messages.
+   */
+  public static final String PUBSUB_ID = "id";
+
+  /**
+   * All events will be given a timestamp relative to this time (ms since epoch).
+   */
+  private static final long BASE_TIME = Instant.parse("2015-07-15T00:00:00.000Z").getMillis();
+
+  /**
+   * Instants guaranteed to be strictly before and after all event timestamps, and which won't
+   * be subject to underflow/overflow.
+   */
+  public static final Instant BEGINNING_OF_TIME = new Instant(0).plus(Duration.standardDays(365));
+  public static final Instant END_OF_TIME =
+      BoundedWindow.TIMESTAMP_MAX_VALUE.minus(Duration.standardDays(365));
+
+  /**
+   * Setup pipeline with codes and some other options.
+   */
+  public static void setupPipeline(CoderStrategy coderStrategy, Pipeline p) {
+    CoderRegistry registry = p.getCoderRegistry();
+    switch (coderStrategy) {
+      case HAND:
+        registry.registerCoderForClass(Auction.class, Auction.CODER);
+        registry.registerCoderForClass(AuctionBid.class, AuctionBid.CODER);
+        registry.registerCoderForClass(AuctionCount.class, AuctionCount.CODER);
+        registry.registerCoderForClass(AuctionPrice.class, AuctionPrice.CODER);
+        registry.registerCoderForClass(Bid.class, Bid.CODER);
+        registry.registerCoderForClass(CategoryPrice.class, CategoryPrice.CODER);
+        registry.registerCoderForClass(Event.class, Event.CODER);
+        registry.registerCoderForClass(IdNameReserve.class, IdNameReserve.CODER);
+        registry.registerCoderForClass(NameCityStateId.class, NameCityStateId.CODER);
+        registry.registerCoderForClass(Person.class, Person.CODER);
+        registry.registerCoderForClass(SellerPrice.class, SellerPrice.CODER);
+        registry.registerCoderForClass(Done.class, Done.CODER);
+        registry.registerCoderForClass(BidsPerSession.class, BidsPerSession.CODER);
+        break;
+      case AVRO:
+        registry.registerCoderProvider(AvroCoder.getCoderProvider());
+        break;
+      case JAVA:
+        registry.registerCoderProvider(SerializableCoder.getCoderProvider());
+        break;
+    }
+  }
+
+  /**
+   * Return a generator config to match the given {@code options}.
+   */
+  private static GeneratorConfig standardGeneratorConfig(NexmarkConfiguration configuration) {
+    return new GeneratorConfig(configuration,
+                               configuration.useWallclockEventTime ? System.currentTimeMillis()
+                                                                   : BASE_TIME, 0,
+                               configuration.numEvents, 0);
+  }
+
+  /**
+   * Return an iterator of events using the 'standard' generator config.
+   */
+  public static Iterator<TimestampedValue<Event>> standardEventIterator(
+      NexmarkConfiguration configuration) {
+    return new Generator(standardGeneratorConfig(configuration));
+  }
+
+  /**
+   * Return a transform which yields a finite number of synthesized events generated
+   * as a batch.
+   */
+  public static PTransform<PBegin, PCollection<Event>> batchEventsSource(
+          NexmarkConfiguration configuration) {
+    return Read.from(new BoundedEventSource(standardGeneratorConfig(configuration),
+      configuration.numEventGenerators));
+  }
+
+  /**
+   * Return a transform which yields a finite number of synthesized events generated
+   * on-the-fly in real time.
+   */
+  public static PTransform<PBegin, PCollection<Event>> streamEventsSource(
+          NexmarkConfiguration configuration) {
+    return Read.from(new UnboundedEventSource(NexmarkUtils.standardGeneratorConfig(configuration),
+                                              configuration.numEventGenerators,
+                                              configuration.watermarkHoldbackSec,
+                                              configuration.isRateLimited));
+  }
+
+  /**
+   * Return a transform to pass-through events, but count them as they go by.
+   */
+  public static ParDo.SingleOutput<Event, Event> snoop(final String name) {
+    return ParDo.of(new DoFn<Event, Event>() {
+      final Counter eventCounter = Metrics.counter(name, "events");
+      final Counter newPersonCounter = Metrics.counter(name, "newPersons");
+      final Counter newAuctionCounter = Metrics.counter(name, "newAuctions");
+      final Counter bidCounter = Metrics.counter(name, "bids");
+      final Counter endOfStreamCounter = Metrics.counter(name, "endOfStream");
+
+      @ProcessElement
+      public void processElement(ProcessContext c) {
+        eventCounter.inc();
+        if (c.element().newPerson != null) {
+          newPersonCounter.inc();
+        } else if (c.element().newAuction != null) {
+          newAuctionCounter.inc();
+        } else if (c.element().bid != null) {
+          bidCounter.inc();
+        } else {
+          endOfStreamCounter.inc();
+        }
+        info("%s snooping element %s", name, c.element());
+        c.output(c.element());
+      }
+    });
+  }
+
+  /**
+   * Return a transform to count and discard each element.
+   */
+  public static <T> ParDo.SingleOutput<T, Void> devNull(final String name) {
+    return ParDo.of(new DoFn<T, Void>() {
+      final Counter discardedCounterMetric = Metrics.counter(name, "discarded");
+
+      @ProcessElement
+      public void processElement(ProcessContext c) {
+        discardedCounterMetric.inc();
+      }
+    });
+  }
+
+  /**
+   * Return a transform to log each element, passing it through unchanged.
+   */
+  public static <T> ParDo.SingleOutput<T, T> log(final String name) {
+    return ParDo.of(new DoFn<T, T>() {
+      @ProcessElement
+      public void processElement(ProcessContext c) {
+        LOG.info("%s: %s", name, c.element());
+        c.output(c.element());
+      }
+    });
+  }
+
+  /**
+   * Return a transform to format each element as a string.
+   */
+  public static <T> ParDo.SingleOutput<T, String> format(final String name) {
+    return ParDo.of(new DoFn<T, String>() {
+      final Counter recordCounterMetric = Metrics.counter(name, "records");
+
+      @ProcessElement
+      public void processElement(ProcessContext c) {
+        recordCounterMetric.inc();
+        c.output(c.element().toString());
+      }
+    });
+  }
+
+  /**
+   * Return a transform to make explicit the timestamp of each element.
+   */
+  public static <T> ParDo.SingleOutput<T, TimestampedValue<T>> stamp(String name) {
+    return ParDo.of(new DoFn<T, TimestampedValue<T>>() {
+      @ProcessElement
+      public void processElement(ProcessContext c) {
+        c.output(TimestampedValue.of(c.element(), c.timestamp()));
+      }
+    });
+  }
+
+  /**
+   * Return a transform to reduce a stream to a single, order-invariant long hash.
+   */
+  public static <T> PTransform<PCollection<T>, PCollection<Long>> hash(
+      final long numEvents, String name) {
+    return new PTransform<PCollection<T>, PCollection<Long>>(name) {
+      @Override
+      public PCollection<Long> expand(PCollection<T> input) {
+        return input.apply(Window.<T>into(new GlobalWindows())
+                               .triggering(AfterPane.elementCountAtLeast((int) numEvents))
+                               .withAllowedLateness(Duration.standardDays(1))
+                               .discardingFiredPanes())
+
+                    .apply(name + ".Hash", ParDo.of(new DoFn<T, Long>() {
+                      @ProcessElement
+                      public void processElement(ProcessContext c) {
+                        long hash =
+                            Hashing.murmur3_128()
+                                   .newHasher()
+                                   .putLong(c.timestamp().getMillis())
+                                   .putString(c.element().toString(), StandardCharsets.UTF_8)
+                                   .hash()
+                                   .asLong();
+                        c.output(hash);
+                      }
+                    }))
+
+                    .apply(Combine.globally(new Combine.BinaryCombineFn<Long>() {
+                      @Override
+                      public Long apply(Long left, Long right) {
+                        return left ^ right;
+                      }
+                    }));
+      }
+    };
+  }
+
+  private static final long MASK = (1L << 16) - 1L;
+  private static final long HASH = 0x243F6A8885A308D3L;
+  private static final long INIT_PLAINTEXT = 50000L;
+
+  /**
+   * Return a transform to keep the CPU busy for given milliseconds on every record.
+   */
+  public static <T> ParDo.SingleOutput<T, T> cpuDelay(String name, final long delayMs) {
+    return ParDo.of(new DoFn<T, T>() {
+                  @ProcessElement
+                  public void processElement(ProcessContext c) {
+                    long now = System.currentTimeMillis();
+                    long end = now + delayMs;
+                    while (now < end) {
+                      // Find plaintext which hashes to HASH in lowest MASK bits.
+                      // Values chosen to roughly take 1ms on typical workstation.
+                      long p = INIT_PLAINTEXT;
+                      while (true) {
+                        long t = Hashing.murmur3_128().hashLong(p).asLong();
+                        if ((t & MASK) == (HASH & MASK)) {
+                          break;
+                        }
+                        p++;
+                      }
+                      now = System.currentTimeMillis();
+                    }
+                    c.output(c.element());
+                  }
+                });
+  }
+
+  private static final int MAX_BUFFER_SIZE = 1 << 24;
+
+  private static class DiskBusyTransform<T> extends PTransform<PCollection<T>, PCollection<T>>{
+
+    private long bytes;
+
+    private DiskBusyTransform(long bytes) {
+      this.bytes = bytes;
+    }
+
+    @Override public PCollection<T> expand(PCollection<T> input) {
+      // Add dummy key to be able to use State API
+      PCollection<KV<Integer, T>> kvCollection = input
+          .apply("diskBusy.keyElements", ParDo.of(new DoFn<T, KV<Integer, T>>() {
+
+            @ProcessElement public void processElement(ProcessContext context) {
+              context.output(KV.of(0, context.element()));
+        }
+      }));
+      // Apply actual transform that generates disk IO using state API
+      PCollection<T> output = kvCollection
+          .apply("diskBusy.generateIO", ParDo.of(new DoFn<KV<Integer, T>, T>() {
+
+            private static final String DISK_BUSY = "diskBusy";
+
+        @StateId(DISK_BUSY) private final StateSpec<ValueState<byte[]>> spec = StateSpecs
+            .value(ByteArrayCoder.of());
+
+        @ProcessElement public void processElement(ProcessContext c,
+            @StateId(DISK_BUSY) ValueState<byte[]> state) {
+          long remain = bytes;
+          long now = System.currentTimeMillis();
+          while (remain > 0) {
+            long thisBytes = Math.min(remain, MAX_BUFFER_SIZE);
+            remain -= thisBytes;
+            byte[] arr = new byte[(int) thisBytes];
+            for (int i = 0; i < thisBytes; i++) {
+              arr[i] = (byte) now;
+            }
+            state.write(arr);
+            now = System.currentTimeMillis();
+          }
+          c.output(c.element().getValue());
+        }
+      }));
+      return output;
+    }
+  }
+
+
+  /**
+   * Return a transform to write given number of bytes to durable store on every record.
+   */
+  public static <T> PTransform<PCollection<T>, PCollection<T>> diskBusy(final long bytes) {
+    return new DiskBusyTransform<>(bytes);
+  }
+
+  /**
+   * Return a transform to cast each element to {@link KnownSize}.
+   */
+  private static <T extends KnownSize> ParDo.SingleOutput<T, KnownSize> castToKnownSize() {
+    return ParDo.of(new DoFn<T, KnownSize>() {
+                  @ProcessElement
+                  public void processElement(ProcessContext c) {
+                    c.output(c.element());
+                  }
+                });
+  }
+
+  /**
+   * A coder for instances of {@code T} cast up to {@link KnownSize}.
+   *
+   * @param <T> True type of object.
+   */
+  private static class CastingCoder<T extends KnownSize> extends CustomCoder<KnownSize> {
+    private final Coder<T> trueCoder;
+
+    public CastingCoder(Coder<T> trueCoder) {
+      this.trueCoder = trueCoder;
+    }
+
+    @Override
+    public void encode(KnownSize value, OutputStream outStream)
+        throws CoderException, IOException {
+      @SuppressWarnings("unchecked")
+      T typedValue = (T) value;
+      trueCoder.encode(typedValue, outStream);
+    }
+
+    @Override
+    public KnownSize decode(InputStream inStream)
+        throws CoderException, IOException {
+      return trueCoder.decode(inStream);
+    }
+  }
+
+  /**
+   * Return a coder for {@code KnownSize} that are known to be exactly of type {@code T}.
+   */
+  private static <T extends KnownSize> Coder<KnownSize> makeCastingCoder(Coder<T> trueCoder) {
+    return new CastingCoder<>(trueCoder);
+  }
+
+  /**
+   * Return {@code elements} as {@code KnownSize}s.
+   */
+  public static <T extends KnownSize> PCollection<KnownSize> castToKnownSize(
+      final String name, PCollection<T> elements) {
+    return elements.apply(name + ".Forget", castToKnownSize())
+            .setCoder(makeCastingCoder(elements.getCoder()));
+  }
+
+  // Do not instantiate.
+  private NexmarkUtils() {
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/Auction.java
----------------------------------------------------------------------
diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/Auction.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/Auction.java
new file mode 100644
index 0000000..6a37ade
--- /dev/null
+++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/Auction.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.nexmark.model;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.Serializable;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.CustomCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.VarLongCoder;
+import org.apache.beam.sdk.nexmark.NexmarkUtils;
+
+/**
+ * An auction submitted by a person.
+ */
+public class Auction implements KnownSize, Serializable {
+  private static final Coder<Long> LONG_CODER = VarLongCoder.of();
+  private static final Coder<String> STRING_CODER = StringUtf8Coder.of();
+
+  public static final Coder<Auction> CODER = new CustomCoder<Auction>() {
+    @Override
+    public void encode(Auction value, OutputStream outStream)
+        throws CoderException, IOException {
+      LONG_CODER.encode(value.id, outStream);
+      STRING_CODER.encode(value.itemName, outStream);
+      STRING_CODER.encode(value.description, outStream);
+      LONG_CODER.encode(value.initialBid, outStream);
+      LONG_CODER.encode(value.reserve, outStream);
+      LONG_CODER.encode(value.dateTime, outStream);
+      LONG_CODER.encode(value.expires, outStream);
+      LONG_CODER.encode(value.seller, outStream);
+      LONG_CODER.encode(value.category, outStream);
+      STRING_CODER.encode(value.extra, outStream);
+    }
+
+    @Override
+    public Auction decode(
+        InputStream inStream)
+        throws CoderException, IOException {
+      long id = LONG_CODER.decode(inStream);
+      String itemName = STRING_CODER.decode(inStream);
+      String description = STRING_CODER.decode(inStream);
+      long initialBid = LONG_CODER.decode(inStream);
+      long reserve = LONG_CODER.decode(inStream);
+      long dateTime = LONG_CODER.decode(inStream);
+      long expires = LONG_CODER.decode(inStream);
+      long seller = LONG_CODER.decode(inStream);
+      long category = LONG_CODER.decode(inStream);
+      String extra = STRING_CODER.decode(inStream);
+      return new Auction(
+          id, itemName, description, initialBid, reserve, dateTime, expires, seller, category,
+          extra);
+    }
+  };
+
+
+  /** Id of auction. */
+  @JsonProperty
+  public final long id; // primary key
+
+  /** Extra auction properties. */
+  @JsonProperty
+  private final String itemName;
+
+  @JsonProperty
+  private final String description;
+
+  /** Initial bid price, in cents. */
+  @JsonProperty
+  private final long initialBid;
+
+  /** Reserve price, in cents. */
+  @JsonProperty
+  public final long reserve;
+
+  @JsonProperty
+  public final long dateTime;
+
+  /** When does auction expire? (ms since epoch). Bids at or after this time are ignored. */
+  @JsonProperty
+  public final long expires;
+
+  /** Id of person who instigated auction. */
+  @JsonProperty
+  public final long seller; // foreign key: Person.id
+
+  /** Id of category auction is listed under. */
+  @JsonProperty
+  public final long category; // foreign key: Category.id
+
+  /** Additional arbitrary payload for performance testing. */
+  @JsonProperty
+  private final String extra;
+
+
+  // For Avro only.
+  @SuppressWarnings("unused")
+  private Auction() {
+    id = 0;
+    itemName = null;
+    description = null;
+    initialBid = 0;
+    reserve = 0;
+    dateTime = 0;
+    expires = 0;
+    seller = 0;
+    category = 0;
+    extra = null;
+  }
+
+  public Auction(long id, String itemName, String description, long initialBid, long reserve,
+      long dateTime, long expires, long seller, long category, String extra) {
+    this.id = id;
+    this.itemName = itemName;
+    this.description = description;
+    this.initialBid = initialBid;
+    this.reserve = reserve;
+    this.dateTime = dateTime;
+    this.expires = expires;
+    this.seller = seller;
+    this.category = category;
+    this.extra = extra;
+  }
+
+  /**
+   * Return a copy of auction which capture the given annotation.
+   * (Used for debugging).
+   */
+  public Auction withAnnotation(String annotation) {
+    return new Auction(id, itemName, description, initialBid, reserve, dateTime, expires, seller,
+        category, annotation + ": " + extra);
+  }
+
+  /**
+   * Does auction have {@code annotation}? (Used for debugging.)
+   */
+  public boolean hasAnnotation(String annotation) {
+    return extra.startsWith(annotation + ": ");
+  }
+
+  /**
+   * Remove {@code annotation} from auction. (Used for debugging.)
+   */
+  public Auction withoutAnnotation(String annotation) {
+    if (hasAnnotation(annotation)) {
+      return new Auction(id, itemName, description, initialBid, reserve, dateTime, expires, seller,
+          category, extra.substring(annotation.length() + 2));
+    } else {
+      return this;
+    }
+  }
+
+  @Override
+  public long sizeInBytes() {
+    return 8 + itemName.length() + 1 + description.length() + 1 + 8 + 8 + 8 + 8 + 8 + 8
+        + extra.length() + 1;
+  }
+
+  @Override
+  public String toString() {
+    try {
+      return NexmarkUtils.MAPPER.writeValueAsString(this);
+    } catch (JsonProcessingException e) {
+      throw new RuntimeException(e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/AuctionBid.java
----------------------------------------------------------------------
diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/AuctionBid.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/AuctionBid.java
new file mode 100644
index 0000000..cb1aac5
--- /dev/null
+++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/AuctionBid.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.nexmark.model;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.Serializable;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.CustomCoder;
+import org.apache.beam.sdk.nexmark.NexmarkUtils;
+import org.apache.beam.sdk.nexmark.queries.WinningBids;
+
+/**
+ * Result of {@link WinningBids} transform.
+ */
+public class AuctionBid implements KnownSize, Serializable {
+  public static final Coder<AuctionBid> CODER = new CustomCoder<AuctionBid>() {
+    @Override
+    public void encode(AuctionBid value, OutputStream outStream)
+        throws CoderException, IOException {
+      Auction.CODER.encode(value.auction, outStream);
+      Bid.CODER.encode(value.bid, outStream);
+    }
+
+    @Override
+    public AuctionBid decode(
+        InputStream inStream)
+        throws CoderException, IOException {
+      Auction auction = Auction.CODER.decode(inStream);
+      Bid bid = Bid.CODER.decode(inStream);
+      return new AuctionBid(auction, bid);
+    }
+  };
+
+  @JsonProperty
+  public final Auction auction;
+
+  @JsonProperty
+  public final Bid bid;
+
+  // For Avro only.
+  @SuppressWarnings("unused")
+  private AuctionBid() {
+    auction = null;
+    bid = null;
+  }
+
+  public AuctionBid(Auction auction, Bid bid) {
+    this.auction = auction;
+    this.bid = bid;
+  }
+
+  @Override
+  public long sizeInBytes() {
+    return auction.sizeInBytes() + bid.sizeInBytes();
+  }
+
+  @Override
+  public String toString() {
+    try {
+      return NexmarkUtils.MAPPER.writeValueAsString(this);
+    } catch (JsonProcessingException e) {
+      throw new RuntimeException(e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/AuctionCount.java
----------------------------------------------------------------------
diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/AuctionCount.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/AuctionCount.java
new file mode 100644
index 0000000..4d15d25
--- /dev/null
+++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/AuctionCount.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.nexmark.model;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.Serializable;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.CustomCoder;
+import org.apache.beam.sdk.coders.VarLongCoder;
+import org.apache.beam.sdk.nexmark.NexmarkUtils;
+
+/**
+ * Result of Query5.
+ */
+public class AuctionCount implements KnownSize, Serializable {
+  private static final Coder<Long> LONG_CODER = VarLongCoder.of();
+
+  public static final Coder<AuctionCount> CODER = new CustomCoder<AuctionCount>() {
+    @Override
+    public void encode(AuctionCount value, OutputStream outStream)
+        throws CoderException, IOException {
+      LONG_CODER.encode(value.auction, outStream);
+      LONG_CODER.encode(value.count, outStream);
+    }
+
+    @Override
+    public AuctionCount decode(InputStream inStream)
+        throws CoderException, IOException {
+      long auction = LONG_CODER.decode(inStream);
+      long count = LONG_CODER.decode(inStream);
+      return new AuctionCount(auction, count);
+    }
+  };
+
+  @JsonProperty private final long auction;
+
+  @JsonProperty private final long count;
+
+  // For Avro only.
+  @SuppressWarnings("unused")
+  private AuctionCount() {
+    auction = 0;
+    count = 0;
+  }
+
+  public AuctionCount(long auction, long count) {
+    this.auction = auction;
+    this.count = count;
+  }
+
+  @Override
+  public long sizeInBytes() {
+    return 8 + 8;
+  }
+
+  @Override
+  public String toString() {
+    try {
+      return NexmarkUtils.MAPPER.writeValueAsString(this);
+    } catch (JsonProcessingException e) {
+      throw new RuntimeException(e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/AuctionPrice.java
----------------------------------------------------------------------
diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/AuctionPrice.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/AuctionPrice.java
new file mode 100644
index 0000000..f4fe881
--- /dev/null
+++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/AuctionPrice.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.nexmark.model;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.Serializable;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.CustomCoder;
+import org.apache.beam.sdk.coders.VarLongCoder;
+import org.apache.beam.sdk.nexmark.NexmarkUtils;
+
+/**
+ * Result of Query2.
+ */
+public class AuctionPrice implements KnownSize, Serializable {
+  private static final Coder<Long> LONG_CODER = VarLongCoder.of();
+
+  public static final Coder<AuctionPrice> CODER = new CustomCoder<AuctionPrice>() {
+    @Override
+    public void encode(AuctionPrice value, OutputStream outStream)
+        throws CoderException, IOException {
+      LONG_CODER.encode(value.auction, outStream);
+      LONG_CODER.encode(value.price, outStream);
+    }
+
+    @Override
+    public AuctionPrice decode(
+        InputStream inStream)
+        throws CoderException, IOException {
+      long auction = LONG_CODER.decode(inStream);
+      long price = LONG_CODER.decode(inStream);
+      return new AuctionPrice(auction, price);
+    }
+  };
+
+  @JsonProperty
+  private final long auction;
+
+  /** Price in cents. */
+  @JsonProperty
+  private final long price;
+
+  // For Avro only.
+  @SuppressWarnings("unused")
+  private AuctionPrice() {
+    auction = 0;
+    price = 0;
+  }
+
+  public AuctionPrice(long auction, long price) {
+    this.auction = auction;
+    this.price = price;
+  }
+
+  @Override
+  public long sizeInBytes() {
+    return 8 + 8;
+  }
+
+  @Override
+  public String toString() {
+    try {
+      return NexmarkUtils.MAPPER.writeValueAsString(this);
+    } catch (JsonProcessingException e) {
+      throw new RuntimeException(e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/Bid.java
----------------------------------------------------------------------
diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/Bid.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/Bid.java
new file mode 100644
index 0000000..b465e62
--- /dev/null
+++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/Bid.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.nexmark.model;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.Serializable;
+import java.util.Comparator;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.CustomCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.VarLongCoder;
+import org.apache.beam.sdk.nexmark.NexmarkUtils;
+
+/**
+ * A bid for an item on auction.
+ */
+public class Bid implements KnownSize, Serializable {
+  private static final Coder<Long> LONG_CODER = VarLongCoder.of();
+  private static final Coder<String> STRING_CODER = StringUtf8Coder.of();
+
+  public static final Coder<Bid> CODER = new CustomCoder<Bid>() {
+    @Override
+    public void encode(Bid value, OutputStream outStream)
+        throws CoderException, IOException {
+      LONG_CODER.encode(value.auction, outStream);
+      LONG_CODER.encode(value.bidder, outStream);
+      LONG_CODER.encode(value.price, outStream);
+      LONG_CODER.encode(value.dateTime, outStream);
+      STRING_CODER.encode(value.extra, outStream);
+    }
+
+    @Override
+    public Bid decode(
+        InputStream inStream)
+        throws CoderException, IOException {
+      long auction = LONG_CODER.decode(inStream);
+      long bidder = LONG_CODER.decode(inStream);
+      long price = LONG_CODER.decode(inStream);
+      long dateTime = LONG_CODER.decode(inStream);
+      String extra = STRING_CODER.decode(inStream);
+      return new Bid(auction, bidder, price, dateTime, extra);
+    }
+
+    @Override public void verifyDeterministic() throws NonDeterministicException {}
+  };
+
+  /**
+   * Comparator to order bids by ascending price then descending time
+   * (for finding winning bids).
+   */
+  public static final Comparator<Bid> PRICE_THEN_DESCENDING_TIME = new Comparator<Bid>() {
+    @Override
+    public int compare(Bid left, Bid right) {
+      int i = Double.compare(left.price, right.price);
+      if (i != 0) {
+        return i;
+      }
+      return Long.compare(right.dateTime, left.dateTime);
+    }
+  };
+
+  /**
+   * Comparator to order bids by ascending time then ascending price.
+   * (for finding most recent bids).
+   */
+  public static final Comparator<Bid> ASCENDING_TIME_THEN_PRICE = new Comparator<Bid>() {
+    @Override
+    public int compare(Bid left, Bid right) {
+      int i = Long.compare(left.dateTime, right.dateTime);
+      if (i != 0) {
+        return i;
+      }
+      return Double.compare(left.price, right.price);
+    }
+  };
+
+  /** Id of auction this bid is for. */
+  @JsonProperty
+  public final long auction; // foreign key: Auction.id
+
+  /** Id of person bidding in auction. */
+  @JsonProperty
+  public final long bidder; // foreign key: Person.id
+
+  /** Price of bid, in cents. */
+  @JsonProperty
+  public final long price;
+
+  /**
+   * Instant at which bid was made (ms since epoch).
+   * NOTE: This may be earlier than the system's event time.
+   */
+  @JsonProperty
+  public final long dateTime;
+
+  /** Additional arbitrary payload for performance testing. */
+  @JsonProperty
+  public final String extra;
+
+  // For Avro only.
+  @SuppressWarnings("unused")
+  private Bid() {
+    auction = 0;
+    bidder = 0;
+    price = 0;
+    dateTime = 0;
+    extra = null;
+  }
+
+  public Bid(long auction, long bidder, long price, long dateTime, String extra) {
+    this.auction = auction;
+    this.bidder = bidder;
+    this.price = price;
+    this.dateTime = dateTime;
+    this.extra = extra;
+  }
+
+  /**
+   * Return a copy of bid which capture the given annotation.
+   * (Used for debugging).
+   */
+  public Bid withAnnotation(String annotation) {
+    return new Bid(auction, bidder, price, dateTime, annotation + ": " + extra);
+  }
+
+  /**
+   * Does bid have {@code annotation}? (Used for debugging.)
+   */
+  public boolean hasAnnotation(String annotation) {
+    return extra.startsWith(annotation + ": ");
+  }
+
+  /**
+   * Remove {@code annotation} from bid. (Used for debugging.)
+   */
+  public Bid withoutAnnotation(String annotation) {
+    if (hasAnnotation(annotation)) {
+      return new Bid(auction, bidder, price, dateTime, extra.substring(annotation.length() + 2));
+    } else {
+      return this;
+    }
+  }
+
+  @Override
+  public long sizeInBytes() {
+    return 8 + 8 + 8 + 8 + extra.length() + 1;
+  }
+
+  @Override
+  public String toString() {
+    try {
+      return NexmarkUtils.MAPPER.writeValueAsString(this);
+    } catch (JsonProcessingException e) {
+      throw new RuntimeException(e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/BidsPerSession.java
----------------------------------------------------------------------
diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/BidsPerSession.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/BidsPerSession.java
new file mode 100644
index 0000000..84e23e7
--- /dev/null
+++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/BidsPerSession.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.nexmark.model;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.Serializable;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.CustomCoder;
+import org.apache.beam.sdk.coders.VarLongCoder;
+import org.apache.beam.sdk.nexmark.NexmarkUtils;
+
+/**
+ * Result of query 11.
+ */
+public class BidsPerSession implements KnownSize, Serializable {
+  private static final Coder<Long> LONG_CODER = VarLongCoder.of();
+
+  public static final Coder<BidsPerSession> CODER = new CustomCoder<BidsPerSession>() {
+    @Override
+    public void encode(BidsPerSession value, OutputStream outStream)
+        throws CoderException, IOException {
+      LONG_CODER.encode(value.personId, outStream);
+      LONG_CODER.encode(value.bidsPerSession, outStream);
+    }
+
+    @Override
+    public BidsPerSession decode(
+        InputStream inStream)
+        throws CoderException, IOException {
+      long personId = LONG_CODER.decode(inStream);
+      long bidsPerSession = LONG_CODER.decode(inStream);
+      return new BidsPerSession(personId, bidsPerSession);
+    }
+    @Override public void verifyDeterministic() throws NonDeterministicException {}
+  };
+
+  @JsonProperty
+  private final long personId;
+
+  @JsonProperty
+  private final long bidsPerSession;
+
+  public BidsPerSession() {
+    personId = 0;
+    bidsPerSession = 0;
+  }
+
+  public BidsPerSession(long personId, long bidsPerSession) {
+    this.personId = personId;
+    this.bidsPerSession = bidsPerSession;
+  }
+
+  @Override
+  public long sizeInBytes() {
+    // Two longs.
+    return 8 + 8;
+  }
+
+  @Override
+  public String toString() {
+    try {
+      return NexmarkUtils.MAPPER.writeValueAsString(this);
+    } catch (JsonProcessingException e) {
+      throw new RuntimeException(e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/CategoryPrice.java
----------------------------------------------------------------------
diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/CategoryPrice.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/CategoryPrice.java
new file mode 100644
index 0000000..3b33635
--- /dev/null
+++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/CategoryPrice.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.nexmark.model;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.Serializable;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.CustomCoder;
+import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.coders.VarLongCoder;
+import org.apache.beam.sdk.nexmark.NexmarkUtils;
+
+/**
+ * Result of Query4.
+ */
+public class CategoryPrice implements KnownSize, Serializable {
+  private static final Coder<Long> LONG_CODER = VarLongCoder.of();
+  private static final Coder<Integer> INT_CODER = VarIntCoder.of();
+
+  public static final Coder<CategoryPrice> CODER = new CustomCoder<CategoryPrice>() {
+    @Override
+    public void encode(CategoryPrice value, OutputStream outStream)
+        throws CoderException, IOException {
+      LONG_CODER.encode(value.category, outStream);
+      LONG_CODER.encode(value.price, outStream);
+      INT_CODER.encode(value.isLast ? 1 : 0, outStream);
+    }
+
+    @Override
+    public CategoryPrice decode(InputStream inStream)
+        throws CoderException, IOException {
+      long category = LONG_CODER.decode(inStream);
+      long price = LONG_CODER.decode(inStream);
+      boolean isLast = INT_CODER.decode(inStream) != 0;
+      return new CategoryPrice(category, price, isLast);
+    }
+    @Override public void verifyDeterministic() throws NonDeterministicException {}
+  };
+
+  @JsonProperty
+  public final long category;
+
+  /** Price in cents. */
+  @JsonProperty
+  public final long price;
+
+  @JsonProperty
+  public final boolean isLast;
+
+  // For Avro only.
+  @SuppressWarnings("unused")
+  private CategoryPrice() {
+    category = 0;
+    price = 0;
+    isLast = false;
+  }
+
+  public CategoryPrice(long category, long price, boolean isLast) {
+    this.category = category;
+    this.price = price;
+    this.isLast = isLast;
+  }
+
+  @Override
+  public long sizeInBytes() {
+    return 8 + 8 + 1;
+  }
+
+  @Override
+  public String toString() {
+    try {
+      return NexmarkUtils.MAPPER.writeValueAsString(this);
+    } catch (JsonProcessingException e) {
+      throw new RuntimeException(e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/Done.java
----------------------------------------------------------------------
diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/Done.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/Done.java
new file mode 100644
index 0000000..e285041
--- /dev/null
+++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/Done.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.nexmark.model;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.Serializable;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.CustomCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.nexmark.NexmarkUtils;
+
+/**
+ * Result of query 10.
+ */
+public class Done implements KnownSize, Serializable {
+  private static final Coder<String> STRING_CODER = StringUtf8Coder.of();
+
+  public static final Coder<Done> CODER = new CustomCoder<Done>() {
+    @Override
+    public void encode(Done value, OutputStream outStream)
+        throws CoderException, IOException {
+      STRING_CODER.encode(value.message, outStream);
+    }
+
+    @Override
+    public Done decode(InputStream inStream)
+        throws CoderException, IOException {
+      String message = STRING_CODER.decode(inStream);
+      return new Done(message);
+    }
+    @Override public void verifyDeterministic() throws NonDeterministicException {}
+  };
+
+  @JsonProperty
+  private final String message;
+
+  // For Avro only.
+  @SuppressWarnings("unused")
+  public Done() {
+    message = null;
+  }
+
+  public Done(String message) {
+    this.message = message;
+  }
+
+  @Override
+  public long sizeInBytes() {
+    return message.length();
+  }
+
+  @Override
+  public String toString() {
+    try {
+      return NexmarkUtils.MAPPER.writeValueAsString(this);
+    } catch (JsonProcessingException e) {
+      throw new RuntimeException(e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/Event.java
----------------------------------------------------------------------
diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/Event.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/Event.java
new file mode 100644
index 0000000..880cfe4
--- /dev/null
+++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/Event.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.nexmark.model;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.Serializable;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CustomCoder;
+import org.apache.beam.sdk.coders.VarIntCoder;
+
+/**
+ * An event in the auction system, either a (new) {@link Person}, a (new) {@link Auction}, or a
+ * {@link Bid}.
+ */
+public class Event implements KnownSize, Serializable {
+  private enum Tag {
+    PERSON(0),
+    AUCTION(1),
+    BID(2);
+
+    private int value = -1;
+
+    Tag(int value){
+      this.value = value;
+    }
+  }
+  private static final Coder<Integer> INT_CODER = VarIntCoder.of();
+
+  public static final Coder<Event> CODER =
+      new CustomCoder<Event>() {
+        @Override
+        public void encode(Event value, OutputStream outStream) throws IOException {
+          if (value.newPerson != null) {
+            INT_CODER.encode(Tag.PERSON.value, outStream);
+            Person.CODER.encode(value.newPerson, outStream);
+          } else if (value.newAuction != null) {
+            INT_CODER.encode(Tag.AUCTION.value, outStream);
+            Auction.CODER.encode(value.newAuction, outStream);
+          } else if (value.bid != null) {
+            INT_CODER.encode(Tag.BID.value, outStream);
+            Bid.CODER.encode(value.bid, outStream);
+          } else {
+            throw new RuntimeException("invalid event");
+          }
+        }
+
+        @Override
+        public Event decode(InputStream inStream) throws IOException {
+          int tag = INT_CODER.decode(inStream);
+          if (tag == Tag.PERSON.value) {
+            Person person = Person.CODER.decode(inStream);
+            return new Event(person);
+          } else if (tag == Tag.AUCTION.value) {
+            Auction auction = Auction.CODER.decode(inStream);
+            return new Event(auction);
+          } else if (tag == Tag.BID.value) {
+            Bid bid = Bid.CODER.decode(inStream);
+            return new Event(bid);
+          } else {
+            throw new RuntimeException("invalid event encoding");
+          }
+        }
+
+        @Override
+        public void verifyDeterministic() throws NonDeterministicException {}
+      };
+
+  @Nullable
+  @org.apache.avro.reflect.Nullable
+  public final Person newPerson;
+
+  @Nullable
+  @org.apache.avro.reflect.Nullable
+  public final Auction newAuction;
+
+  @Nullable
+  @org.apache.avro.reflect.Nullable
+  public final Bid bid;
+
+  // For Avro only.
+  @SuppressWarnings("unused")
+  private Event() {
+    newPerson = null;
+    newAuction = null;
+    bid = null;
+  }
+
+  public Event(Person newPerson) {
+    this.newPerson = newPerson;
+    newAuction = null;
+    bid = null;
+  }
+
+  public Event(Auction newAuction) {
+    newPerson = null;
+    this.newAuction = newAuction;
+    bid = null;
+  }
+
+  public Event(Bid bid) {
+    newPerson = null;
+    newAuction = null;
+    this.bid = bid;
+  }
+
+  /** Return a copy of event which captures {@code annotation}. (Used for debugging). */
+  public Event withAnnotation(String annotation) {
+    if (newPerson != null) {
+      return new Event(newPerson.withAnnotation(annotation));
+    } else if (newAuction != null) {
+      return new Event(newAuction.withAnnotation(annotation));
+    } else {
+      return new Event(bid.withAnnotation(annotation));
+    }
+  }
+
+  /** Does event have {@code annotation}? (Used for debugging.) */
+  public boolean hasAnnotation(String annotation) {
+    if (newPerson != null) {
+      return newPerson.hasAnnotation(annotation);
+    } else if (newAuction != null) {
+      return newAuction.hasAnnotation(annotation);
+    } else {
+      return bid.hasAnnotation(annotation);
+    }
+  }
+
+  @Override
+  public long sizeInBytes() {
+    if (newPerson != null) {
+      return 1 + newPerson.sizeInBytes();
+    } else if (newAuction != null) {
+      return 1 + newAuction.sizeInBytes();
+    } else if (bid != null) {
+      return 1 + bid.sizeInBytes();
+    } else {
+      throw new RuntimeException("invalid event");
+    }
+  }
+
+  @Override
+  public String toString() {
+    if (newPerson != null) {
+      return newPerson.toString();
+    } else if (newAuction != null) {
+      return newAuction.toString();
+    } else if (bid != null) {
+      return bid.toString();
+    } else {
+      throw new RuntimeException("invalid event");
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/IdNameReserve.java
----------------------------------------------------------------------
diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/IdNameReserve.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/IdNameReserve.java
new file mode 100644
index 0000000..0519f5d
--- /dev/null
+++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/IdNameReserve.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.nexmark.model;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.Serializable;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.CustomCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.VarLongCoder;
+import org.apache.beam.sdk.nexmark.NexmarkUtils;
+
+/**
+ * Result type of Query8.
+ */
+public class IdNameReserve implements KnownSize, Serializable {
+  private static final Coder<Long> LONG_CODER = VarLongCoder.of();
+  private static final Coder<String> STRING_CODER = StringUtf8Coder.of();
+
+  public static final Coder<IdNameReserve> CODER = new CustomCoder<IdNameReserve>() {
+    @Override
+    public void encode(IdNameReserve value, OutputStream outStream)
+        throws CoderException, IOException {
+      LONG_CODER.encode(value.id, outStream);
+      STRING_CODER.encode(value.name, outStream);
+      LONG_CODER.encode(value.reserve, outStream);
+    }
+
+    @Override
+    public IdNameReserve decode(
+        InputStream inStream)
+        throws CoderException, IOException {
+      long id = LONG_CODER.decode(inStream);
+      String name = STRING_CODER.decode(inStream);
+      long reserve = LONG_CODER.decode(inStream);
+      return new IdNameReserve(id, name, reserve);
+    }
+    @Override public void verifyDeterministic() throws NonDeterministicException {}
+  };
+
+  @JsonProperty
+  private final long id;
+
+  @JsonProperty
+  private final String name;
+
+  /** Reserve price in cents. */
+  @JsonProperty
+  private final long reserve;
+
+  // For Avro only.
+  @SuppressWarnings("unused")
+  private IdNameReserve() {
+    id = 0;
+    name = null;
+    reserve = 0;
+  }
+
+  public IdNameReserve(long id, String name, long reserve) {
+    this.id = id;
+    this.name = name;
+    this.reserve = reserve;
+  }
+
+  @Override
+  public long sizeInBytes() {
+    return 8 + name.length() + 1 + 8;
+  }
+
+  @Override
+  public String toString() {
+    try {
+      return NexmarkUtils.MAPPER.writeValueAsString(this);
+    } catch (JsonProcessingException e) {
+      throw new RuntimeException(e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/KnownSize.java
----------------------------------------------------------------------
diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/KnownSize.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/KnownSize.java
new file mode 100644
index 0000000..45af3fc
--- /dev/null
+++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/KnownSize.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.nexmark.model;
+
+/**
+ * Interface for elements which can quickly estimate their encoded byte size.
+ */
+public interface KnownSize {
+  long sizeInBytes();
+}
+

http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/NameCityStateId.java
----------------------------------------------------------------------
diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/NameCityStateId.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/NameCityStateId.java
new file mode 100644
index 0000000..55fca62
--- /dev/null
+++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/NameCityStateId.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.nexmark.model;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.Serializable;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.CustomCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.VarLongCoder;
+import org.apache.beam.sdk.nexmark.NexmarkUtils;
+
+/**
+ * Result of Query3.
+ */
+public class NameCityStateId implements KnownSize, Serializable {
+  private static final Coder<Long> LONG_CODER = VarLongCoder.of();
+  private static final Coder<String> STRING_CODER = StringUtf8Coder.of();
+
+  public static final Coder<NameCityStateId> CODER = new CustomCoder<NameCityStateId>() {
+    @Override
+    public void encode(NameCityStateId value, OutputStream outStream)
+        throws CoderException, IOException {
+      STRING_CODER.encode(value.name, outStream);
+      STRING_CODER.encode(value.city, outStream);
+      STRING_CODER.encode(value.state, outStream);
+      LONG_CODER.encode(value.id, outStream);
+    }
+
+    @Override
+    public NameCityStateId decode(InputStream inStream)
+        throws CoderException, IOException {
+      String name = STRING_CODER.decode(inStream);
+      String city = STRING_CODER.decode(inStream);
+      String state = STRING_CODER.decode(inStream);
+      long id = LONG_CODER.decode(inStream);
+      return new NameCityStateId(name, city, state, id);
+    }
+    @Override public void verifyDeterministic() throws NonDeterministicException {}
+  };
+
+  @JsonProperty
+  private final String name;
+
+  @JsonProperty
+  private final String city;
+
+  @JsonProperty
+  private final String state;
+
+  @JsonProperty
+  private final long id;
+
+  // For Avro only.
+  @SuppressWarnings("unused")
+  private NameCityStateId() {
+    name = null;
+    city = null;
+    state = null;
+    id = 0;
+  }
+
+  public NameCityStateId(String name, String city, String state, long id) {
+    this.name = name;
+    this.city = city;
+    this.state = state;
+    this.id = id;
+  }
+
+  @Override
+  public long sizeInBytes() {
+    return name.length() + 1 + city.length() + 1 + state.length() + 1 + 8;
+  }
+
+  @Override
+  public String toString() {
+    try {
+      return NexmarkUtils.MAPPER.writeValueAsString(this);
+    } catch (JsonProcessingException e) {
+      throw new RuntimeException(e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/Person.java
----------------------------------------------------------------------
diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/Person.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/Person.java
new file mode 100644
index 0000000..800f937
--- /dev/null
+++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/Person.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.nexmark.model;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.Serializable;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.CustomCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.VarLongCoder;
+import org.apache.beam.sdk.nexmark.NexmarkUtils;
+
+/**
+ * A person either creating an auction or making a bid.
+ */
+public class Person implements KnownSize, Serializable {
+  private static final Coder<Long> LONG_CODER = VarLongCoder.of();
+  private static final Coder<String> STRING_CODER = StringUtf8Coder.of();
+  public static final Coder<Person> CODER = new CustomCoder<Person>() {
+    @Override
+    public void encode(Person value, OutputStream outStream)
+        throws CoderException, IOException {
+      LONG_CODER.encode(value.id, outStream);
+      STRING_CODER.encode(value.name, outStream);
+      STRING_CODER.encode(value.emailAddress, outStream);
+      STRING_CODER.encode(value.creditCard, outStream);
+      STRING_CODER.encode(value.city, outStream);
+      STRING_CODER.encode(value.state, outStream);
+      LONG_CODER.encode(value.dateTime, outStream);
+      STRING_CODER.encode(value.extra, outStream);
+    }
+
+    @Override
+    public Person decode(InputStream inStream)
+        throws CoderException, IOException {
+      long id = LONG_CODER.decode(inStream);
+      String name = STRING_CODER.decode(inStream);
+      String emailAddress = STRING_CODER.decode(inStream);
+      String creditCard = STRING_CODER.decode(inStream);
+      String city = STRING_CODER.decode(inStream);
+      String state = STRING_CODER.decode(inStream);
+      long dateTime = LONG_CODER.decode(inStream);
+      String extra = STRING_CODER.decode(inStream);
+      return new Person(id, name, emailAddress, creditCard, city, state, dateTime, extra);
+    }
+    @Override public void verifyDeterministic() throws NonDeterministicException {}
+  };
+
+  /** Id of person. */
+  @JsonProperty
+  public final long id; // primary key
+
+  /** Extra person properties. */
+  @JsonProperty
+  public final String name;
+
+  @JsonProperty
+  private final String emailAddress;
+
+  @JsonProperty
+  private final String creditCard;
+
+  @JsonProperty
+  public final String city;
+
+  @JsonProperty
+  public final String state;
+
+  @JsonProperty
+  public final long dateTime;
+
+  /** Additional arbitrary payload for performance testing. */
+  @JsonProperty
+  private final String extra;
+
+  // For Avro only.
+  @SuppressWarnings("unused")
+  private Person() {
+    id = 0;
+    name = null;
+    emailAddress = null;
+    creditCard = null;
+    city = null;
+    state = null;
+    dateTime = 0;
+    extra = null;
+  }
+
+  public Person(long id, String name, String emailAddress, String creditCard, String city,
+      String state, long dateTime, String extra) {
+    this.id = id;
+    this.name = name;
+    this.emailAddress = emailAddress;
+    this.creditCard = creditCard;
+    this.city = city;
+    this.state = state;
+    this.dateTime = dateTime;
+    this.extra = extra;
+  }
+
+  /**
+   * Return a copy of person which capture the given annotation.
+   * (Used for debugging).
+   */
+  public Person withAnnotation(String annotation) {
+    return new Person(id, name, emailAddress, creditCard, city, state, dateTime,
+        annotation + ": " + extra);
+  }
+
+  /**
+   * Does person have {@code annotation}? (Used for debugging.)
+   */
+  public boolean hasAnnotation(String annotation) {
+    return extra.startsWith(annotation + ": ");
+  }
+
+  /**
+   * Remove {@code annotation} from person. (Used for debugging.)
+   */
+  public Person withoutAnnotation(String annotation) {
+    if (hasAnnotation(annotation)) {
+      return new Person(id, name, emailAddress, creditCard, city, state, dateTime,
+          extra.substring(annotation.length() + 2));
+    } else {
+      return this;
+    }
+  }
+
+  @Override
+  public long sizeInBytes() {
+    return 8 + name.length() + 1 + emailAddress.length() + 1 + creditCard.length() + 1
+        + city.length() + 1 + state.length() + 8 + 1 + extra.length() + 1;
+  }
+
+  @Override
+  public String toString() {
+    try {
+      return NexmarkUtils.MAPPER.writeValueAsString(this);
+    } catch (JsonProcessingException e) {
+      throw new RuntimeException(e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/SellerPrice.java
----------------------------------------------------------------------
diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/SellerPrice.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/SellerPrice.java
new file mode 100644
index 0000000..82b551c
--- /dev/null
+++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/SellerPrice.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.nexmark.model;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.Serializable;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.CustomCoder;
+import org.apache.beam.sdk.coders.VarLongCoder;
+import org.apache.beam.sdk.nexmark.NexmarkUtils;
+
+/**
+ * Result of Query6.
+ */
+public class SellerPrice implements KnownSize, Serializable {
+  private static final Coder<Long> LONG_CODER = VarLongCoder.of();
+
+  public static final Coder<SellerPrice> CODER = new CustomCoder<SellerPrice>() {
+    @Override
+    public void encode(SellerPrice value, OutputStream outStream)
+        throws CoderException, IOException {
+      LONG_CODER.encode(value.seller, outStream);
+      LONG_CODER.encode(value.price, outStream);
+    }
+
+    @Override
+    public SellerPrice decode(
+        InputStream inStream)
+        throws CoderException, IOException {
+      long seller = LONG_CODER.decode(inStream);
+      long price = LONG_CODER.decode(inStream);
+      return new SellerPrice(seller, price);
+    }
+    @Override public void verifyDeterministic() throws NonDeterministicException {}
+  };
+
+  @JsonProperty
+  public final long seller;
+
+  /** Price in cents. */
+  @JsonProperty
+  private final long price;
+
+  // For Avro only.
+  @SuppressWarnings("unused")
+  private SellerPrice() {
+    seller = 0;
+    price = 0;
+  }
+
+  public SellerPrice(long seller, long price) {
+    this.seller = seller;
+    this.price = price;
+  }
+
+  @Override
+  public long sizeInBytes() {
+    return 8 + 8;
+  }
+
+  @Override
+  public String toString() {
+    try {
+      return NexmarkUtils.MAPPER.writeValueAsString(this);
+    } catch (JsonProcessingException e) {
+      throw new RuntimeException(e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/package-info.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/package-info.java
new file mode 100644
index 0000000..3b4bb63
--- /dev/null
+++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/model/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Nexmark Benchmark Model.
+ */
+package org.apache.beam.sdk.nexmark.model;

http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/package-info.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/package-info.java
new file mode 100644
index 0000000..7500a24
--- /dev/null
+++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Nexmark.
+ */
+package org.apache.beam.sdk.nexmark;