You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ie...@apache.org on 2017/08/23 17:09:14 UTC

[06/55] [abbrv] beam git commit: NexMark

NexMark

Port unit tests, cleanup pom and add license to readme


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/1f08970a
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/1f08970a
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/1f08970a

Branch: refs/heads/master
Commit: 1f08970a8fdc9c5e5613227031125d9d929ca841
Parents: f0ce31b
Author: Mark Shields <ma...@google.com>
Authored: Mon Mar 28 16:25:29 2016 -0700
Committer: Ismaël Mejía <ie...@gmail.com>
Committed: Wed Aug 23 19:07:26 2017 +0200

----------------------------------------------------------------------
 integration/java/pom.xml                        | 288 +++++++
 .../integration/nexmark/AbstractSimulator.java  | 212 ++++++
 .../beam/integration/nexmark/Auction.java       | 190 +++++
 .../beam/integration/nexmark/AuctionBid.java    |  87 +++
 .../beam/integration/nexmark/AuctionCount.java  |  90 +++
 .../beam/integration/nexmark/AuctionPrice.java  |  91 +++
 .../integration/nexmark/BEAM_ON_FLINK_ON_GCP.md | 282 +++++++
 .../apache/beam/integration/nexmark/Bid.java    | 178 +++++
 .../integration/nexmark/BidsPerSession.java     |  89 +++
 .../integration/nexmark/BoundedEventSource.java | 197 +++++
 .../beam/integration/nexmark/CategoryPrice.java | 100 +++
 .../apache/beam/integration/nexmark/Done.java   |  83 +++
 .../apache/beam/integration/nexmark/Event.java  | 181 +++++
 .../beam/integration/nexmark/Generator.java     | 590 +++++++++++++++
 .../integration/nexmark/GeneratorConfig.java    | 295 ++++++++
 .../beam/integration/nexmark/IdNameReserve.java | 100 +++
 .../beam/integration/nexmark/KnownSize.java     |  27 +
 .../beam/integration/nexmark/Monitor.java       | 102 +++
 .../integration/nexmark/NameCityStateId.java    | 106 +++
 .../nexmark/NexmarkConfiguration.java           | 662 ++++++++++++++++
 .../beam/integration/nexmark/NexmarkDriver.java | 297 ++++++++
 .../integration/nexmark/NexmarkFlinkDriver.java |  49 ++
 .../integration/nexmark/NexmarkFlinkRunner.java |  67 ++
 .../nexmark/NexmarkGoogleDriver.java            |  90 +++
 .../nexmark/NexmarkGoogleRunner.java            | 660 ++++++++++++++++
 .../nexmark/NexmarkInProcessDriver.java         |  48 ++
 .../nexmark/NexmarkInProcessRunner.java         |  77 ++
 .../beam/integration/nexmark/NexmarkPerf.java   | 212 ++++++
 .../beam/integration/nexmark/NexmarkQuery.java  | 276 +++++++
 .../integration/nexmark/NexmarkQueryModel.java  | 123 +++
 .../beam/integration/nexmark/NexmarkRunner.java | 746 +++++++++++++++++++
 .../beam/integration/nexmark/NexmarkSuite.java  | 112 +++
 .../beam/integration/nexmark/NexmarkUtils.java  | 681 +++++++++++++++++
 .../beam/integration/nexmark/Options.java       | 360 +++++++++
 .../apache/beam/integration/nexmark/Person.java | 166 +++++
 .../beam/integration/nexmark/PubsubHelper.java  | 217 ++++++
 .../apache/beam/integration/nexmark/Query0.java |  72 ++
 .../beam/integration/nexmark/Query0Model.java   |  62 ++
 .../apache/beam/integration/nexmark/Query1.java |  64 ++
 .../beam/integration/nexmark/Query10.java       | 378 ++++++++++
 .../beam/integration/nexmark/Query11.java       |  76 ++
 .../beam/integration/nexmark/Query12.java       |  79 ++
 .../beam/integration/nexmark/Query1Model.java   |  73 ++
 .../apache/beam/integration/nexmark/Query2.java |  75 ++
 .../beam/integration/nexmark/Query2Model.java   |  76 ++
 .../apache/beam/integration/nexmark/Query3.java | 248 ++++++
 .../beam/integration/nexmark/Query3Model.java   | 119 +++
 .../apache/beam/integration/nexmark/Query4.java | 110 +++
 .../beam/integration/nexmark/Query4Model.java   | 181 +++++
 .../apache/beam/integration/nexmark/Query5.java | 127 ++++
 .../beam/integration/nexmark/Query5Model.java   | 174 +++++
 .../apache/beam/integration/nexmark/Query6.java | 154 ++++
 .../beam/integration/nexmark/Query6Model.java   | 128 ++++
 .../apache/beam/integration/nexmark/Query7.java |  87 +++
 .../beam/integration/nexmark/Query7Model.java   | 128 ++++
 .../apache/beam/integration/nexmark/Query8.java |  92 +++
 .../beam/integration/nexmark/Query8Model.java   | 145 ++++
 .../apache/beam/integration/nexmark/Query9.java |  40 +
 .../beam/integration/nexmark/Query9Model.java   |  44 ++
 .../apache/beam/integration/nexmark/README.md   | 166 +++++
 .../beam/integration/nexmark/SellerPrice.java   |  91 +++
 .../nexmark/UnboundedEventSource.java           | 329 ++++++++
 .../beam/integration/nexmark/WinningBids.java   | 378 ++++++++++
 .../nexmark/WinningBidsSimulator.java           | 203 +++++
 .../nexmark/BoundedEventSourceTest.java         |  71 ++
 .../beam/integration/nexmark/GeneratorTest.java | 111 +++
 .../beam/integration/nexmark/QueryTest.java     | 103 +++
 .../nexmark/UnboundedEventSourceTest.java       | 109 +++
 68 files changed, 12424 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/1f08970a/integration/java/pom.xml
----------------------------------------------------------------------
diff --git a/integration/java/pom.xml b/integration/java/pom.xml
new file mode 100644
index 0000000..b160b56
--- /dev/null
+++ b/integration/java/pom.xml
@@ -0,0 +1,288 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <groupId>org.apache.beam</groupId>
+    <artifactId>parent</artifactId>
+    <version>0.2.0-incubating-SNAPSHOT</version>
+    <relativePath>../../pom.xml</relativePath>
+  </parent>
+
+  <artifactId>java-integration-all</artifactId>
+
+  <name>Apache Beam :: Integration Tests :: Java All</name>
+
+  <packaging>jar</packaging>
+
+  <properties>
+    <skipITs>true</skipITs>
+  </properties>
+
+  <build>
+    <extensions>
+      <!-- Use os-maven-plugin to initialize the "os.detected" properties -->
+      <extension>
+        <groupId>kr.motd.maven</groupId>
+        <artifactId>os-maven-plugin</artifactId>
+        <version>1.4.0.Final</version>
+      </extension>
+    </extensions>
+
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-compiler-plugin</artifactId>
+      </plugin>
+
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-surefire-plugin</artifactId>
+        <configuration>
+          <systemPropertyVariables>
+            <beamTestPipelineOptions>
+            </beamTestPipelineOptions>
+          </systemPropertyVariables>
+        </configuration>
+      </plugin>
+
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-dependency-plugin</artifactId>
+      </plugin>
+
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-checkstyle-plugin</artifactId>
+      </plugin>
+
+
+      <!-- Source plugin for generating source and test-source JARs. -->
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-source-plugin</artifactId>
+      </plugin>
+
+
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-shade-plugin</artifactId>
+        <executions>
+          <execution>
+            <phase>package</phase>
+            <goals>
+              <goal>shade</goal>
+            </goals>
+            <configuration>
+              <finalName>${project.artifactId}-bundled-${project.version}</finalName>
+              <artifactSet>
+                <includes>
+                  <include>*:*</include>
+                </includes>
+              </artifactSet>
+              <filters>
+                <filter>
+                  <artifact>*:*</artifact>
+                  <excludes>
+                    <exclude>META-INF/*.SF</exclude>
+                    <exclude>META-INF/*.DSA</exclude>
+                    <exclude>META-INF/*.RSA</exclude>
+                  </excludes>
+                </filter>
+              </filters>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-jar-plugin</artifactId>
+      </plugin>
+
+      <!-- Avro plugin for automatic code generation -->
+      <plugin>
+        <groupId>org.apache.avro</groupId>
+        <artifactId>avro-maven-plugin</artifactId>
+        <version>${avro.version}</version>
+        <executions>
+          <execution>
+            <id>schemas</id>
+            <phase>generate-sources</phase>
+            <goals>
+              <goal>schema</goal>
+            </goals>
+            <configuration>
+              <sourceDirectory>${project.basedir}/src/main/</sourceDirectory>
+              <outputDirectory>${project.build.directory}/generated-sources/java</outputDirectory>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+
+      <!-- Coverage analysis for unit tests. -->
+      <plugin>
+        <groupId>org.jacoco</groupId>
+        <artifactId>jacoco-maven-plugin</artifactId>
+      </plugin>
+    </plugins>
+  </build>
+
+  <dependencies>
+    <!-- Java SDK -->
+    <dependency>
+      <groupId>org.apache.beam</groupId>
+      <artifactId>java-sdk-all</artifactId>
+    </dependency>
+
+    <!-- Java runner for Google Cloud Dataflow -->
+    <dependency>
+      <groupId>org.apache.beam.runners</groupId>
+      <artifactId>google-cloud-dataflow-java</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+
+    <!-- Direct runner -->
+    <dependency>
+      <groupId>org.apache.beam.runners</groupId>
+      <artifactId>direct</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+
+    <!-- Java runner for Flink -->
+    <dependency>
+      <groupId>org.apache.beam.runners</groupId>
+      <artifactId>flink_2.10</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.flink</groupId>
+      <artifactId>flink-shaded-hadoop2</artifactId>
+      <version>1.0.3</version>
+      <scope>provided</scope>
+    </dependency>
+
+    <!-- Extra libraries -->
+    <dependency>
+      <groupId>com.google.apis</groupId>
+      <artifactId>google-api-services-dataflow</artifactId>
+      <version>${dataflow.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>joda-time</groupId>
+      <artifactId>joda-time</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>com.fasterxml.jackson.core</groupId>
+      <artifactId>jackson-core</artifactId>
+      <version>${jackson.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>com.google.apis</groupId>
+      <artifactId>google-api-services-bigquery</artifactId>
+      <version>${bigquery.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>com.google.cloud.bigdataoss</groupId>
+      <artifactId>gcsio</artifactId>
+      <version>${google-cloud-bigdataoss.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>com.fasterxml.jackson.core</groupId>
+      <artifactId>jackson-annotations</artifactId>
+      <version>${jackson.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>com.fasterxml.jackson.core</groupId>
+      <artifactId>jackson-databind</artifactId>
+      <version>${jackson.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.avro</groupId>
+      <artifactId>avro</artifactId>
+      <version>${avro.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>com.google.guava</groupId>
+      <artifactId>guava</artifactId>
+      <version>${guava.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>org.hamcrest</groupId>
+      <artifactId>hamcrest-all</artifactId>
+      <version>${hamcrest.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-api</artifactId>
+      <version>${slf4j.version}</version>
+      <scope>compile</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-jdk14</artifactId>
+      <version>${slf4j.version}</version>
+      <!-- When loaded at runtime this will wire up slf4j to the JUL backend -->
+      <scope>runtime</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>com.google.code.findbugs</groupId>
+      <artifactId>jsr305</artifactId>
+      <version>${jsr305.version}</version>
+      <scope>runtime</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>log4j</groupId>
+      <artifactId>log4j</artifactId>
+      <version>1.2.17</version>
+      <scope>runtime</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <version>${junit.version}</version>
+      <scope>compile</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>io.netty</groupId>
+      <artifactId>netty-tcnative-boringssl-static</artifactId>
+      <version>1.1.33.Fork13</version>
+      <classifier>${os.detected.classifier}</classifier>
+      <scope>runtime</scope>
+    </dependency>
+  </dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/beam/blob/1f08970a/integration/java/src/main/java/org/apache/beam/integration/nexmark/AbstractSimulator.java
----------------------------------------------------------------------
diff --git a/integration/java/src/main/java/org/apache/beam/integration/nexmark/AbstractSimulator.java b/integration/java/src/main/java/org/apache/beam/integration/nexmark/AbstractSimulator.java
new file mode 100644
index 0000000..6473c35
--- /dev/null
+++ b/integration/java/src/main/java/org/apache/beam/integration/nexmark/AbstractSimulator.java
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.integration.nexmark;
+
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.values.TimestampedValue;
+
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import javax.annotation.Nullable;
+
+/**
+ * Abstract base class for simulator of a query.
+ *
+ * @param <InputT> Type of input elements.
+ * @param <OutputT> Type of output elements.
+ */
+abstract class AbstractSimulator<InputT, OutputT> {
+  /** Window size for action bucket sampling. */
+  public static final Duration WINDOW_SIZE = Duration.standardMinutes(1);
+
+  /** Input event stream we should draw from. */
+  private final Iterator<TimestampedValue<InputT>> input;
+
+  /** Set to true when no more results. */
+  private boolean isDone;
+
+  /**
+   * Results which have not yet been returned by the {@link #results} iterator.
+   */
+  private final List<TimestampedValue<OutputT>> pendingResults;
+
+  /**
+   * Current window timestamp (ms since epoch).
+   */
+  private long currentWindow;
+
+  /**
+   * Number of (possibly intermediate) results for the current window.
+   */
+  private long currentCount;
+
+  /**
+   * Result counts per window which have not yet been returned by the {@link #resultsPerWindow}
+   * iterator.
+   */
+  private final List<Long> pendingCounts;
+
+  public AbstractSimulator(Iterator<TimestampedValue<InputT>> input) {
+    this.input = input;
+    isDone = false;
+    pendingResults = new ArrayList<>();
+    currentWindow = BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis();
+    currentCount = 0;
+    pendingCounts = new ArrayList<>();
+  }
+
+  /** Called by implementors of {@link #run}: Fetch the next input element. */
+  @Nullable
+  protected TimestampedValue<InputT> nextInput() {
+    if (!input.hasNext()) {
+      return null;
+    }
+    TimestampedValue<InputT> timestampedInput = input.next();
+    NexmarkUtils.info("input: %s", timestampedInput);
+    return timestampedInput;
+  }
+
+  /**
+   * Called by implementors of {@link #run}:  Capture an intermediate result, for the purpose of
+   * recording the expected activity of the query over time.
+   */
+  protected void addIntermediateResult(TimestampedValue<OutputT> result) {
+    NexmarkUtils.info("intermediate result: %s", result);
+    updateCounts(result.getTimestamp());
+  }
+
+  /**
+   * Called by implementors of {@link #run}: Capture a final result, for the purpose of checking
+   * semantic correctness.
+   */
+  protected void addResult(TimestampedValue<OutputT> result) {
+    NexmarkUtils.info("result: %s", result);
+    pendingResults.add(result);
+    updateCounts(result.getTimestamp());
+  }
+
+  /**
+   * Update window and counts.
+   */
+  private void updateCounts(Instant timestamp) {
+    long window = timestamp.getMillis() - timestamp.getMillis() % WINDOW_SIZE.getMillis();
+    if (window > currentWindow) {
+      if (currentWindow > BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis()) {
+        pendingCounts.add(currentCount);
+      }
+      currentCount = 0;
+      currentWindow = window;
+    }
+    currentCount++;
+  }
+
+  /** Called by implementors of {@link #run}: Record that no more results will be emitted. */
+  protected void allDone() {
+    isDone = true;
+  }
+
+  /**
+   * Overridden by derived classes to do the next increment of work. Each call should
+   * call one or more of {@link #nextInput}, {@link #addIntermediateResult}, {@link #addResult}
+   * or {@link #allDone}. It is ok for a single call to emit more than one result via
+   * {@link #addResult}. It is ok for a single call to run the entire simulation, though
+   * this will prevent the {@link #results} and {@link #resultsPerWindow} iterators to
+   * stall.
+   */
+  protected abstract void run();
+
+  /**
+   * Return iterator over all expected timestamped results. The underlying simulator state is
+   * changed. Only one of {@link #results} or {@link #resultsPerWindow} can be called.
+   */
+  public Iterator<TimestampedValue<OutputT>> results() {
+    return new Iterator<TimestampedValue<OutputT>>() {
+      @Override
+      public boolean hasNext() {
+        while (true) {
+          if (!pendingResults.isEmpty()) {
+            return true;
+          }
+          if (isDone) {
+            return false;
+          }
+          run();
+        }
+      }
+
+      @Override
+      public TimestampedValue<OutputT> next() {
+        TimestampedValue<OutputT> result = pendingResults.get(0);
+        pendingResults.remove(0);
+        return result;
+      }
+
+      @Override
+      public void remove() {
+        throw new UnsupportedOperationException();
+      }
+    };
+  }
+
+  /**
+   * Return an iterator over the number of results per {@link #WINDOW_SIZE} period. The underlying
+   * simulator state is changed.  Only one of {@link #results} or {@link #resultsPerWindow} can be
+   * called.
+   */
+  public Iterator<Long> resultsPerWindow() {
+    return new Iterator<Long>() {
+      @Override
+      public boolean hasNext() {
+        while (true) {
+          if (!pendingCounts.isEmpty()) {
+            return true;
+          }
+          if (isDone) {
+            if (currentCount > 0) {
+              pendingCounts.add(currentCount);
+              currentCount = 0;
+              currentWindow = BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis();
+              return true;
+            } else {
+              return false;
+            }
+          }
+          run();
+        }
+      }
+
+      @Override
+      public Long next() {
+        Long result = pendingCounts.get(0);
+        pendingCounts.remove(0);
+        return result;
+      }
+
+      @Override
+      public void remove() {
+        throw new UnsupportedOperationException();
+      }
+    };
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/1f08970a/integration/java/src/main/java/org/apache/beam/integration/nexmark/Auction.java
----------------------------------------------------------------------
diff --git a/integration/java/src/main/java/org/apache/beam/integration/nexmark/Auction.java b/integration/java/src/main/java/org/apache/beam/integration/nexmark/Auction.java
new file mode 100644
index 0000000..94f2647
--- /dev/null
+++ b/integration/java/src/main/java/org/apache/beam/integration/nexmark/Auction.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.integration.nexmark;
+
+import org.apache.beam.sdk.coders.AtomicCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.VarLongCoder;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonProcessingException;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.Serializable;
+
+/**
+ * An auction submitted by a person.
+ */
+public class Auction implements KnownSize, Serializable {
+  private static final Coder<Long> LONG_CODER = VarLongCoder.of();
+  private static final Coder<String> STRING_CODER = StringUtf8Coder.of();
+
+  public static final Coder<Auction> CODER = new AtomicCoder<Auction>() {
+    @Override
+    public void encode(Auction value, OutputStream outStream,
+        Coder.Context context)
+        throws CoderException, IOException {
+      LONG_CODER.encode(value.id, outStream, Context.NESTED);
+      STRING_CODER.encode(value.itemName, outStream, Context.NESTED);
+      STRING_CODER.encode(value.description, outStream, Context.NESTED);
+      LONG_CODER.encode(value.initialBid, outStream, Context.NESTED);
+      LONG_CODER.encode(value.reserve, outStream, Context.NESTED);
+      LONG_CODER.encode(value.dateTime, outStream, Context.NESTED);
+      LONG_CODER.encode(value.expires, outStream, Context.NESTED);
+      LONG_CODER.encode(value.seller, outStream, Context.NESTED);
+      LONG_CODER.encode(value.category, outStream, Context.NESTED);
+      STRING_CODER.encode(value.extra, outStream, Context.NESTED);
+    }
+
+    @Override
+    public Auction decode(
+        InputStream inStream, Coder.Context context)
+        throws CoderException, IOException {
+      long id = LONG_CODER.decode(inStream, Context.NESTED);
+      String itemName = STRING_CODER.decode(inStream, Context.NESTED);
+      String description = STRING_CODER.decode(inStream, Context.NESTED);
+      long initialBid = LONG_CODER.decode(inStream, Context.NESTED);
+      long reserve = LONG_CODER.decode(inStream, Context.NESTED);
+      long dateTime = LONG_CODER.decode(inStream, Context.NESTED);
+      long expires = LONG_CODER.decode(inStream, Context.NESTED);
+      long seller = LONG_CODER.decode(inStream, Context.NESTED);
+      long category = LONG_CODER.decode(inStream, Context.NESTED);
+      String extra = STRING_CODER.decode(inStream, Context.NESTED);
+      return new Auction(
+          id, itemName, description, initialBid, reserve, dateTime, expires, seller, category,
+          extra);
+    }
+  };
+
+
+  /** Id of auction. */
+  @JsonProperty
+  public final long id; // primary key
+
+  /** Extra auction properties. */
+  @JsonProperty
+  public final String itemName;
+
+  @JsonProperty
+  public final String description;
+
+  /** Initial bid price, in cents. */
+  @JsonProperty
+  public final long initialBid;
+
+  /** Reserve price, in cents. */
+  @JsonProperty
+  public final long reserve;
+
+  @JsonProperty
+  public final long dateTime;
+
+  /** When does auction expire? (ms since epoch). Bids at or after this time are ignored. */
+  @JsonProperty
+  public final long expires;
+
+  /** Id of person who instigated auction. */
+  @JsonProperty
+  public final long seller; // foreign key: Person.id
+
+  /** Id of category auction is listed under. */
+  @JsonProperty
+  public final long category; // foreign key: Category.id
+
+  /** Additional arbitrary payload for performance testing. */
+  @JsonProperty
+  public final String extra;
+
+
+  // For Avro only.
+  @SuppressWarnings("unused")
+  private Auction() {
+    id = 0;
+    itemName = null;
+    description = null;
+    initialBid = 0;
+    reserve = 0;
+    dateTime = 0;
+    expires = 0;
+    seller = 0;
+    category = 0;
+    extra = null;
+  }
+
+  public Auction(long id, String itemName, String description, long initialBid, long reserve,
+      long dateTime, long expires, long seller, long category, String extra) {
+    this.id = id;
+    this.itemName = itemName;
+    this.description = description;
+    this.initialBid = initialBid;
+    this.reserve = reserve;
+    this.dateTime = dateTime;
+    this.expires = expires;
+    this.seller = seller;
+    this.category = category;
+    this.extra = extra;
+  }
+
+  /**
+   * Return a copy of auction which capture the given annotation.
+   * (Used for debugging).
+   */
+  public Auction withAnnotation(String annotation) {
+    return new Auction(id, itemName, description, initialBid, reserve, dateTime, expires, seller,
+        category, annotation + ": " + extra);
+  }
+
+  /**
+   * Does auction have {@code annotation}? (Used for debugging.)
+   */
+  public boolean hasAnnotation(String annotation) {
+    return extra.startsWith(annotation + ": ");
+  }
+
+  /**
+   * Remove {@code annotation} from auction. (Used for debugging.)
+   */
+  public Auction withoutAnnotation(String annotation) {
+    if (hasAnnotation(annotation)) {
+      return new Auction(id, itemName, description, initialBid, reserve, dateTime, expires, seller,
+          category, extra.substring(annotation.length() + 2));
+    } else {
+      return this;
+    }
+  }
+
+  @Override
+  public long sizeInBytes() {
+    return 8 + itemName.length() + 1 + description.length() + 1 + 8 + 8 + 8 + 8 + 8 + 8
+        + extra.length() + 1;
+  }
+
+  @Override
+  public String toString() {
+    try {
+      return NexmarkUtils.MAPPER.writeValueAsString(this);
+    } catch (JsonProcessingException e) {
+      throw new RuntimeException(e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/1f08970a/integration/java/src/main/java/org/apache/beam/integration/nexmark/AuctionBid.java
----------------------------------------------------------------------
diff --git a/integration/java/src/main/java/org/apache/beam/integration/nexmark/AuctionBid.java b/integration/java/src/main/java/org/apache/beam/integration/nexmark/AuctionBid.java
new file mode 100644
index 0000000..8c3697a
--- /dev/null
+++ b/integration/java/src/main/java/org/apache/beam/integration/nexmark/AuctionBid.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.integration.nexmark;
+
+import org.apache.beam.sdk.coders.AtomicCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonProcessingException;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.Serializable;
+
+/**
+ * Result of {@link WinningBids} transform.
+ */
+public class AuctionBid implements KnownSize, Serializable {
+  public static final Coder<AuctionBid> CODER = new AtomicCoder<AuctionBid>() {
+    @Override
+    public void encode(AuctionBid value, OutputStream outStream,
+        Coder.Context context)
+        throws CoderException, IOException {
+      Auction.CODER.encode(value.auction, outStream, Context.NESTED);
+      Bid.CODER.encode(value.bid, outStream, Context.NESTED);
+    }
+
+    @Override
+    public AuctionBid decode(
+        InputStream inStream, Coder.Context context)
+        throws CoderException, IOException {
+      Auction auction = Auction.CODER.decode(inStream, Context.NESTED);
+      Bid bid = Bid.CODER.decode(inStream, Context.NESTED);
+      return new AuctionBid(auction, bid);
+    }
+  };
+
+  @JsonProperty
+  public final Auction auction;
+
+  @JsonProperty
+  public final Bid bid;
+
+  // For Avro only.
+  @SuppressWarnings("unused")
+  private AuctionBid() {
+    auction = null;
+    bid = null;
+  }
+
+  public AuctionBid(Auction auction, Bid bid) {
+    this.auction = auction;
+    this.bid = bid;
+  }
+
+  @Override
+  public long sizeInBytes() {
+    return auction.sizeInBytes() + bid.sizeInBytes();
+  }
+
+  @Override
+  public String toString() {
+    try {
+      return NexmarkUtils.MAPPER.writeValueAsString(this);
+    } catch (JsonProcessingException e) {
+      throw new RuntimeException(e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/1f08970a/integration/java/src/main/java/org/apache/beam/integration/nexmark/AuctionCount.java
----------------------------------------------------------------------
diff --git a/integration/java/src/main/java/org/apache/beam/integration/nexmark/AuctionCount.java b/integration/java/src/main/java/org/apache/beam/integration/nexmark/AuctionCount.java
new file mode 100644
index 0000000..a0fbebc
--- /dev/null
+++ b/integration/java/src/main/java/org/apache/beam/integration/nexmark/AuctionCount.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.integration.nexmark;
+
+import org.apache.beam.sdk.coders.AtomicCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.VarLongCoder;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonProcessingException;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.Serializable;
+
+/**
+ * Result of {@link Query5}.
+ */
+public class AuctionCount implements KnownSize, Serializable {
+  private static final Coder<Long> LONG_CODER = VarLongCoder.of();
+
+  public static final Coder<AuctionCount> CODER = new AtomicCoder<AuctionCount>() {
+    @Override
+    public void encode(AuctionCount value, OutputStream outStream,
+        Coder.Context context)
+        throws CoderException, IOException {
+      LONG_CODER.encode(value.auction, outStream, Context.NESTED);
+      LONG_CODER.encode(value.count, outStream, Context.NESTED);
+    }
+
+    @Override
+    public AuctionCount decode(
+        InputStream inStream, Coder.Context context)
+        throws CoderException, IOException {
+      long auction = LONG_CODER.decode(inStream, Context.NESTED);
+      long count = LONG_CODER.decode(inStream, Context.NESTED);
+      return new AuctionCount(auction, count);
+    }
+  };
+
+  @JsonProperty
+  public final long auction;
+
+  @JsonProperty
+  public final long count;
+
+  // For Avro only.
+  @SuppressWarnings("unused")
+  private AuctionCount() {
+    auction = 0;
+    count = 0;
+  }
+
+  public AuctionCount(long auction, long count) {
+    this.auction = auction;
+    this.count = count;
+  }
+
+  @Override
+  public long sizeInBytes() {
+    return 8 + 8;
+  }
+
+  @Override
+  public String toString() {
+    try {
+      return NexmarkUtils.MAPPER.writeValueAsString(this);
+    } catch (JsonProcessingException e) {
+      throw new RuntimeException(e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/1f08970a/integration/java/src/main/java/org/apache/beam/integration/nexmark/AuctionPrice.java
----------------------------------------------------------------------
diff --git a/integration/java/src/main/java/org/apache/beam/integration/nexmark/AuctionPrice.java b/integration/java/src/main/java/org/apache/beam/integration/nexmark/AuctionPrice.java
new file mode 100644
index 0000000..4f25a9b
--- /dev/null
+++ b/integration/java/src/main/java/org/apache/beam/integration/nexmark/AuctionPrice.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.integration.nexmark;
+
+import org.apache.beam.sdk.coders.AtomicCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.VarLongCoder;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonProcessingException;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.Serializable;
+
+/**
+ * Result of {@link Query2}.
+ */
+public class AuctionPrice implements KnownSize, Serializable {
+  private static final Coder<Long> LONG_CODER = VarLongCoder.of();
+
+  public static final Coder<AuctionPrice> CODER = new AtomicCoder<AuctionPrice>() {
+    @Override
+    public void encode(AuctionPrice value, OutputStream outStream,
+        Coder.Context context)
+        throws CoderException, IOException {
+      LONG_CODER.encode(value.auction, outStream, Context.NESTED);
+      LONG_CODER.encode(value.price, outStream, Context.NESTED);
+    }
+
+    @Override
+    public AuctionPrice decode(
+        InputStream inStream, Coder.Context context)
+        throws CoderException, IOException {
+      long auction = LONG_CODER.decode(inStream, Context.NESTED);
+      long price = LONG_CODER.decode(inStream, Context.NESTED);
+      return new AuctionPrice(auction, price);
+    }
+  };
+
+  @JsonProperty
+  public final long auction;
+
+  /** Price in cents. */
+  @JsonProperty
+  public final long price;
+
+  // For Avro only.
+  @SuppressWarnings("unused")
+  private AuctionPrice() {
+    auction = 0;
+    price = 0;
+  }
+
+  public AuctionPrice(long auction, long price) {
+    this.auction = auction;
+    this.price = price;
+  }
+
+  @Override
+  public long sizeInBytes() {
+    return 8 + 8;
+  }
+
+  @Override
+  public String toString() {
+    try {
+      return NexmarkUtils.MAPPER.writeValueAsString(this);
+    } catch (JsonProcessingException e) {
+      throw new RuntimeException(e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/1f08970a/integration/java/src/main/java/org/apache/beam/integration/nexmark/BEAM_ON_FLINK_ON_GCP.md
----------------------------------------------------------------------
diff --git a/integration/java/src/main/java/org/apache/beam/integration/nexmark/BEAM_ON_FLINK_ON_GCP.md b/integration/java/src/main/java/org/apache/beam/integration/nexmark/BEAM_ON_FLINK_ON_GCP.md
new file mode 100644
index 0000000..d1b51e8
--- /dev/null
+++ b/integration/java/src/main/java/org/apache/beam/integration/nexmark/BEAM_ON_FLINK_ON_GCP.md
@@ -0,0 +1,282 @@
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+-->
+
+# Running NexMark on Beam on Flink on Google Compute Platform
+
+Here's how to create a cluster of VMs on Google Compute Platform, deploy
+Flink to them, and invoke a NexMark pipeline using the Beam-on-Flink
+runner.
+
+These instructions are somewhat baroque and I hope they can be
+simplified over time.
+
+## Prerequisites
+
+You'll need:
+
+* the Google Cloud SDK
+* a clone of the Beam repository
+* a Flink binary distribution
+* a project on Google Compute Platform.
+
+## Establish the shell environment
+
+```
+# Beam root
+BEAM=<path to Beam source directory>
+# Flink root
+FLINK_VER=flink-1.0.3
+FLINK=<path to Flink distribution directory>
+# Google Cloud project
+PROJECT=<your project id>
+# Google Cloud zone
+ZONE=<your project zone>
+# Cloud commands
+GCLOUD=<path to gcloud command>
+GSUTIL=<path to gsutil command>
+```
+
+## Establish VM names for Flink master and workers
+
+```
+MASTER=flink-m
+NUM_WORKERS=5
+WORKERS=""
+for (( i = 0; i < $NUM_WORKERS; i++ )); do
+  WORKERS="$WORKERS flink-w-$i"
+done
+ALL="$MASTER $WORKERS"
+```
+
+## Build Beam
+
+```
+( cd $BEAM && mvn clean install )
+```
+
+## Bring up the cluster
+
+Establish project defaults and authenticate:
+```
+$GCLOUD init
+$GCLOUD auth login
+```
+
+Build Google Cloud Dataproc cluster:
+```
+$GCLOUD beta dataproc clusters create \
+  --project=$PROJECT \
+  --zone=$ZONE \
+  --bucket=nexmark \
+  --scopes=cloud-platform \
+  --num-workers=$NUM_WORKERS \
+  --image-version=preview \
+  flink
+```
+
+Force google_compute_engine ssh keys to be generated locally:
+```
+$GCLOUD compute ssh \
+  --project=$PROJECT \
+  --zone=$ZONE \
+  $MASTER \
+  --command 'exit'
+```
+
+Open ports on the VMs:
+```
+$GCLOUD compute firewall-rules create allow-monitoring --allow tcp:8080-8081
+$GCLOUD compute firewall-rules create allow-debug --allow tcp:5555
+```
+
+Establish keys on master and workers
+**CAUTION:** This will leave your private key on your master VM.
+Better would be to create a key just for inter-worker ssh.
+```
+for m in $ALL; do
+  echo "*** $m ***"
+  $GCLOUD beta compute scp \
+    --project=$PROJECT \
+    --zone=$ZONE \
+    ~/.ssh/google_compute_engine.pub $m:~/.ssh/
+done
+$GCLOUD beta compute scp \
+  --project=$PROJECT \
+  --zone=$ZONE \
+  ~/.ssh/google_compute_engine $MASTER:~/.ssh/
+```
+
+Collect IP addresses for workers:
+```
+MASTER_EXT_IP=$($GCLOUD compute instances describe \
+ --project=$PROJECT \
+  --zone=$ZONE \
+  $MASTER | grep natIP: | sed 's/[ ]*natIP:[ ]*//')
+MASTER_IP=$($GCLOUD compute instances describe \
+ --project=$PROJECT \
+  --zone=$ZONE \
+  $MASTER | grep networkIP: | sed 's/[ ]*networkIP:[ ]*//')
+WORKER_IPS=""
+for m in $WORKERS; do
+  echo "*** $m ***"
+  WORKER_IP=$($GCLOUD compute instances describe \
+    --project=$PROJECT \
+    --zone=$ZONE \
+    $m | grep networkIP: | sed 's/[ ]*networkIP:[ ]*//')
+  WORKER_IPS="$WORKER_IPS $WORKER_IP"
+done
+```
+
+Configure Flink:
+```
+cat $FLINK/conf/flink-conf.yaml \
+  | sed "s|.*\(jobmanager.rpc.address\):.*|\1: $MASTER_IP|g" \
+  | sed "s|.*\(jobmanager.heap.mb\):.*|\1: 4096|g" \
+  | sed "s|.*\(taskmanager.heap.mb\):.*|\1: 8192|g" \
+  | sed "s|.*\(parallelism.default\):.*|\1: $(($NUM_WORKERS * 4))|g" \
+  | sed "s|.*\(fs.hdfs.hadoopconf\):.*|\1: /etc/hadoop/conf|g" \
+  | sed "s|.*\(taskmanager.numberOfTaskSlots\):.*|\1: 4|g" \
+  | sed "s|.*\(jobmanager.web.submit.enable\):.*|\1: false|g" \
+  | sed "s|.*\(env.ssh.opts\):.*||g" \
+  > ~/flink-conf.yaml
+cat $FLINK/conf/log4j.properties \
+  | sed "s|.*\(log4j.rootLogger\)=.*|\1=ERROR, file|g" \
+  > ~/log4j.properties
+echo "env.ssh.opts: -i /home/$USER/.ssh/google_compute_engine -o StrictHostKeyChecking=no" >> ~/flink-conf.yaml
+echo "$MASTER_IP:8081" > ~/masters
+echo -n > ~/slaves
+for ip in $WORKER_IPS; do
+  echo $ip >> ~/slaves
+done
+cp -f \
+  ~/flink-conf.yaml \
+  ~/masters ~/slaves \
+  ~/log4j.properties \
+  $FLINK/conf/
+```
+
+Package configured Flink for distribution to workers:
+```
+( cd ~/ && tar -cvzf ~/flink.tgz $FLINK/* )
+```
+
+Distribute:
+```
+$GSUTIL cp ~/flink.tgz gs://nexmark
+for m in $ALL; do
+  echo "*** $m ***"
+  $GCLOUD compute ssh \
+    --project=$PROJECT \
+    --zone=$ZONE \
+    $m \
+    --command 'gsutil cp gs://nexmark/flink.tgz ~/ && tar -xvzf ~/flink.tgz'
+done
+```
+
+Start the Flink cluster:
+```
+$GCLOUD compute ssh \
+  --project=$PROJECT \
+  --zone=$ZONE \
+  $MASTER \
+  --command "~/$FLINK_VER/bin/start-cluster.sh"
+```
+
+Bring up the Flink monitoring UI:
+```
+/usr/bin/google-chrome $MASTER_EXT_IP:8081 &
+```
+
+## Run NexMark
+
+Distribute the Beam + NexMark jar to all workers:
+```
+$GSUTIL cp $BEAM/integration/java/target/java-integration-all-bundled-0.2.0-incubating-SNAPSHOT.jar gs://nexmark
+for m in $ALL; do
+  echo "*** $m ***"
+  $GCLOUD compute ssh \
+    --project=$PROJECT \
+    --zone=$ZONE \
+    $m \
+    --command "gsutil cp gs://nexmark/java-integration-all-bundled-0.2.0-incubating-SNAPSHOT.jar ~/$FLINK_VER/lib/"
+done
+```
+
+Create a Pubsub topic and subscription for testing:
+```
+$GCLOUD alpha pubsub \
+  --project=$PROJECT \
+  topics create flink_test
+
+$GCLOUD alpha pubsub \
+  --project=$PROJECT \
+  subscriptions create flink_test \
+  --topic flink_test \
+  --ack-deadline=60 \
+  --topic-project=$PROJECT
+```
+
+Launch!
+**NOTE:** As of flink-1.0.3 this will throw a `NullPointerException`
+in `org.apache.beam.sdk.io.PubsubUnboundedSink$WriterFn.startBundle`.
+See Jira issue [BEAM-196](https://issues.apache.org/jira/browse/BEAM-196).
+
+```
+$GCLOUD compute ssh \
+  --project=$PROJECT \
+  --zone=$ZONE \
+  $MASTER \
+  --command "~/$FLINK_VER/bin/flink run \
+  -c org.apache.beam.integration.nexmark.NexmarkFlinkDriver \
+  ~/$FLINK_VER/lib/java-integration-all-bundled-0.2.0-incubating-SNAPSHOT.jar \
+  --project=$PROJECT \
+  --streaming=true \
+  --query=0 \
+  --sourceType=PUBSUB \
+  --pubSubMode=COMBINED \
+  --pubsubTopic=flink_test \
+  --resourceNameMode=VERBATIM \
+  --manageResources=false \
+  --monitorJobs=false \
+  --numEventGenerators=5 \
+  --firstEventRate=1000 \
+  --nextEventRate=1000 \
+  --isRateLimited=true \
+  --numEvents=0 \
+  --useWallclockEventTime=true \
+  --usePubsubPublishTime=true"
+```
+
+## Teardown the cluster
+
+Stop the Flink cluster:
+```
+$GCLOUD compute ssh \
+  --project=$PROJECT \
+  --zone=$ZONE \
+  $MASTER \
+  --command "~/$FLINK_VER/bin/stop-cluster.sh"
+```
+
+Teardown the Dataproc cluster:
+```
+$GCLOUD beta dataproc clusters delete \
+  --project=$PROJECT \
+  flink
+```

http://git-wip-us.apache.org/repos/asf/beam/blob/1f08970a/integration/java/src/main/java/org/apache/beam/integration/nexmark/Bid.java
----------------------------------------------------------------------
diff --git a/integration/java/src/main/java/org/apache/beam/integration/nexmark/Bid.java b/integration/java/src/main/java/org/apache/beam/integration/nexmark/Bid.java
new file mode 100644
index 0000000..ce2184b
--- /dev/null
+++ b/integration/java/src/main/java/org/apache/beam/integration/nexmark/Bid.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.integration.nexmark;
+
+import org.apache.beam.sdk.coders.AtomicCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.VarLongCoder;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonProcessingException;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.Serializable;
+import java.util.Comparator;
+
+/**
+ * A bid for an item on auction.
+ */
+public class Bid implements KnownSize, Serializable {
+  private static final Coder<Long> LONG_CODER = VarLongCoder.of();
+  private static final Coder<String> STRING_CODER = StringUtf8Coder.of();
+
+  public static final Coder<Bid> CODER = new AtomicCoder<Bid>() {
+    @Override
+    public void encode(Bid value, OutputStream outStream,
+        Coder.Context context)
+        throws CoderException, IOException {
+      LONG_CODER.encode(value.auction, outStream, Context.NESTED);
+      LONG_CODER.encode(value.bidder, outStream, Context.NESTED);
+      LONG_CODER.encode(value.price, outStream, Context.NESTED);
+      LONG_CODER.encode(value.dateTime, outStream, Context.NESTED);
+      STRING_CODER.encode(value.extra, outStream, Context.NESTED);
+    }
+
+    @Override
+    public Bid decode(
+        InputStream inStream, Coder.Context context)
+        throws CoderException, IOException {
+      long auction = LONG_CODER.decode(inStream, Context.NESTED);
+      long bidder = LONG_CODER.decode(inStream, Context.NESTED);
+      long price = LONG_CODER.decode(inStream, Context.NESTED);
+      long dateTime = LONG_CODER.decode(inStream, Context.NESTED);
+      String extra = STRING_CODER.decode(inStream, Context.NESTED);
+      return new Bid(auction, bidder, price, dateTime, extra);
+    }
+  };
+
+  /**
+   * Comparator to order bids by ascending price then descending time
+   * (for finding winning bids).
+   */
+  public static final Comparator<Bid> PRICE_THEN_DESCENDING_TIME = new Comparator<Bid>() {
+    @Override
+    public int compare(Bid left, Bid right) {
+      int i = Double.compare(left.price, right.price);
+      if (i != 0) {
+        return i;
+      }
+      return Long.compare(right.dateTime, left.dateTime);
+    }
+  };
+
+  /**
+   * Comparator to order bids by ascending time then ascending price.
+   * (for finding most recent bids).
+   */
+  public static final Comparator<Bid> ASCENDING_TIME_THEN_PRICE = new Comparator<Bid>() {
+    @Override
+    public int compare(Bid left, Bid right) {
+      int i = Long.compare(left.dateTime, right.dateTime);
+      if (i != 0) {
+        return i;
+      }
+      return Double.compare(left.price, right.price);
+    }
+  };
+
+  /** Id of auction this bid is for. */
+  @JsonProperty
+  public final long auction; // foreign key: Auction.id
+
+  /** Id of person bidding in auction. */
+  @JsonProperty
+  public final long bidder; // foreign key: Person.id
+
+  /** Price of bid, in cents. */
+  @JsonProperty
+  public final long price;
+
+  /**
+   * Instant at which bid was made (ms since epoch).
+   * NOTE: This may be earlier than the system's event time.
+   */
+  @JsonProperty
+  public final long dateTime;
+
+  /** Additional arbitrary payload for performance testing. */
+  @JsonProperty
+  public final String extra;
+
+  // For Avro only.
+  @SuppressWarnings("unused")
+  private Bid() {
+    auction = 0;
+    bidder = 0;
+    price = 0;
+    dateTime = 0;
+    extra = null;
+  }
+
+  public Bid(long auction, long bidder, long price, long dateTime, String extra) {
+    this.auction = auction;
+    this.bidder = bidder;
+    this.price = price;
+    this.dateTime = dateTime;
+    this.extra = extra;
+  }
+
+  /**
+   * Return a copy of bid which capture the given annotation.
+   * (Used for debugging).
+   */
+  public Bid withAnnotation(String annotation) {
+    return new Bid(auction, bidder, price, dateTime, annotation + ": " + extra);
+  }
+
+  /**
+   * Does bid have {@code annotation}? (Used for debugging.)
+   */
+  public boolean hasAnnotation(String annotation) {
+    return extra.startsWith(annotation + ": ");
+  }
+
+  /**
+   * Remove {@code annotation} from bid. (Used for debugging.)
+   */
+  public Bid withoutAnnotation(String annotation) {
+    if (hasAnnotation(annotation)) {
+      return new Bid(auction, bidder, price, dateTime, extra.substring(annotation.length() + 2));
+    } else {
+      return this;
+    }
+  }
+
+  @Override
+  public long sizeInBytes() {
+    return 8 + 8 + 8 + 8 + extra.length() + 1;
+  }
+
+  @Override
+  public String toString() {
+    try {
+      return NexmarkUtils.MAPPER.writeValueAsString(this);
+    } catch (JsonProcessingException e) {
+      throw new RuntimeException(e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/1f08970a/integration/java/src/main/java/org/apache/beam/integration/nexmark/BidsPerSession.java
----------------------------------------------------------------------
diff --git a/integration/java/src/main/java/org/apache/beam/integration/nexmark/BidsPerSession.java b/integration/java/src/main/java/org/apache/beam/integration/nexmark/BidsPerSession.java
new file mode 100644
index 0000000..cfdd170
--- /dev/null
+++ b/integration/java/src/main/java/org/apache/beam/integration/nexmark/BidsPerSession.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.integration.nexmark;
+
+import org.apache.beam.sdk.coders.AtomicCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.VarLongCoder;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonProcessingException;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.Serializable;
+
+/**
+ * Result of query 11.
+ */
+public class BidsPerSession implements KnownSize, Serializable {
+  private static final Coder<Long> LONG_CODER = VarLongCoder.of();
+
+  public static final Coder<BidsPerSession> CODER = new AtomicCoder<BidsPerSession>() {
+    @Override
+    public void encode(BidsPerSession value, OutputStream outStream,
+        Coder.Context context)
+        throws CoderException, IOException {
+      LONG_CODER.encode(value.personId, outStream, Context.NESTED);
+      LONG_CODER.encode(value.bidsPerSession, outStream, Context.NESTED);
+    }
+
+    @Override
+    public BidsPerSession decode(
+        InputStream inStream, Coder.Context context)
+        throws CoderException, IOException {
+      long personId = LONG_CODER.decode(inStream, Context.NESTED);
+      long bidsPerSession = LONG_CODER.decode(inStream, Context.NESTED);
+      return new BidsPerSession(personId, bidsPerSession);
+    }
+  };
+
+  @JsonProperty
+  public final long personId;
+
+  @JsonProperty
+  public final long bidsPerSession;
+
+  public BidsPerSession() {
+    personId = 0;
+    bidsPerSession = 0;
+  }
+
+  public BidsPerSession(long personId, long bidsPerSession) {
+    this.personId = personId;
+    this.bidsPerSession = bidsPerSession;
+  }
+
+  @Override
+  public long sizeInBytes() {
+    // Two longs.
+    return 8 + 8;
+  }
+
+  @Override
+  public String toString() {
+    try {
+      return NexmarkUtils.MAPPER.writeValueAsString(this);
+    } catch (JsonProcessingException e) {
+      throw new RuntimeException(e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/1f08970a/integration/java/src/main/java/org/apache/beam/integration/nexmark/BoundedEventSource.java
----------------------------------------------------------------------
diff --git a/integration/java/src/main/java/org/apache/beam/integration/nexmark/BoundedEventSource.java b/integration/java/src/main/java/org/apache/beam/integration/nexmark/BoundedEventSource.java
new file mode 100644
index 0000000..f6cc16a
--- /dev/null
+++ b/integration/java/src/main/java/org/apache/beam/integration/nexmark/BoundedEventSource.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.integration.nexmark;
+
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.values.TimestampedValue;
+
+import org.joda.time.Instant;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+import javax.annotation.Nullable;
+
+/**
+ * A custom, bounded source of event records.
+ */
+class BoundedEventSource extends BoundedSource<Event> {
+  /** Configuration we generate events against. */
+  private final GeneratorConfig config;
+
+  /** How many bounded sources to create. */
+  private final int numEventGenerators;
+
+  public BoundedEventSource(GeneratorConfig config, int numEventGenerators) {
+    this.config = config;
+    this.numEventGenerators = numEventGenerators;
+  }
+
+  /** A reader to pull events from the generator. */
+  private static class EventReader extends BoundedReader<Event> {
+    /**
+     * Event source we purporting to be reading from.
+     * (We can't use Java's capture-outer-class pointer since we must update
+     * this field on calls to splitAtFraction.)
+     */
+    private BoundedEventSource source;
+
+    /** Generator we are reading from. */
+    private final Generator generator;
+
+    private boolean reportedStop;
+
+    @Nullable
+    private TimestampedValue<Event> currentEvent;
+
+    public EventReader(BoundedEventSource source, GeneratorConfig config) {
+      this.source = source;
+      generator = new Generator(config);
+      reportedStop = false;
+    }
+
+    @Override
+    public synchronized boolean start() {
+      NexmarkUtils.info("starting bounded generator %s", generator);
+      return advance();
+    }
+
+    @Override
+    public synchronized boolean advance() {
+      if (!generator.hasNext()) {
+        // No more events.
+        if (!reportedStop) {
+          reportedStop = true;
+          NexmarkUtils.info("stopped bounded generator %s", generator);
+        }
+        return false;
+      }
+      currentEvent = generator.next();
+      return true;
+    }
+
+    @Override
+    public synchronized Event getCurrent() throws NoSuchElementException {
+      if (currentEvent == null) {
+        throw new NoSuchElementException();
+      }
+      return currentEvent.getValue();
+    }
+
+    @Override
+    public synchronized Instant getCurrentTimestamp() throws NoSuchElementException {
+      if (currentEvent == null) {
+        throw new NoSuchElementException();
+      }
+      return currentEvent.getTimestamp();
+    }
+
+    @Override
+    public void close() throws IOException {
+      // Nothing to close.
+    }
+
+    @Override
+    public synchronized Double getFractionConsumed() {
+      return generator.getFractionConsumed();
+    }
+
+    @Override
+    public synchronized BoundedSource<Event> getCurrentSource() {
+      return source;
+    }
+
+    @Override
+    @Nullable
+    public synchronized BoundedEventSource splitAtFraction(double fraction) {
+      long startId = generator.getCurrentConfig().getStartEventId();
+      long stopId = generator.getCurrentConfig().getStopEventId();
+      long size = stopId - startId;
+      long splitEventId = startId + Math.min((int) (size * fraction), size);
+      if (splitEventId <= generator.getNextEventId() || splitEventId == stopId) {
+        // Already passed this position or split results in left or right being empty.
+        NexmarkUtils.info("split failed for bounded generator %s at %f", generator, fraction);
+        return null;
+      }
+
+      NexmarkUtils.info("about to split bounded generator %s at %d", generator, splitEventId);
+
+      // Scale back the event space of the current generator, and return a generator config
+      // representing the event space we just 'stole' from the current generator.
+      GeneratorConfig remainingConfig = generator.splitAtEventId(splitEventId);
+
+      NexmarkUtils.info("split bounded generator into %s and %s", generator, remainingConfig);
+
+      // At this point
+      //   generator.events() ++ new Generator(remainingConfig).events()
+      //   == originalGenerator.events()
+
+      // We need a new source to represent the now smaller key space for this reader, so
+      // that we can maintain the invariant that
+      //   this.getCurrentSource().createReader(...)
+      // will yield the same output as this.
+      source = new BoundedEventSource(generator.getCurrentConfig(), source.numEventGenerators);
+
+      // Return a source from which we may read the 'stolen' event space.
+      return new BoundedEventSource(remainingConfig, source.numEventGenerators);
+    }
+  }
+
+  @Override
+  public List<BoundedEventSource> splitIntoBundles(
+      long desiredBundleSizeBytes, PipelineOptions options) {
+    NexmarkUtils.info("slitting bounded source %s into %d sub-sources", config, numEventGenerators);
+    List<BoundedEventSource> results = new ArrayList<>();
+    // Ignore desiredBundleSizeBytes and use numEventGenerators instead.
+    for (GeneratorConfig subConfig : config.split(numEventGenerators)) {
+      results.add(new BoundedEventSource(subConfig, 1));
+    }
+    return results;
+  }
+
+  @Override
+  public long getEstimatedSizeBytes(PipelineOptions options) {
+    return config.getEstimatedSizeBytes();
+  }
+
+  @Override
+  public boolean producesSortedKeys(PipelineOptions options) {
+    return false;
+  }
+
+  @Override
+  public EventReader createReader(PipelineOptions options) {
+    NexmarkUtils.info("creating initial bounded reader for %s", config);
+    return new EventReader(this, config);
+  }
+
+  @Override
+  public void validate() {
+    // Nothing to validate.
+  }
+
+  @Override
+  public Coder<Event> getDefaultOutputCoder() {
+    return Event.CODER;
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/1f08970a/integration/java/src/main/java/org/apache/beam/integration/nexmark/CategoryPrice.java
----------------------------------------------------------------------
diff --git a/integration/java/src/main/java/org/apache/beam/integration/nexmark/CategoryPrice.java b/integration/java/src/main/java/org/apache/beam/integration/nexmark/CategoryPrice.java
new file mode 100644
index 0000000..ab5d92d
--- /dev/null
+++ b/integration/java/src/main/java/org/apache/beam/integration/nexmark/CategoryPrice.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.integration.nexmark;
+
+import org.apache.beam.sdk.coders.AtomicCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.coders.VarLongCoder;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonProcessingException;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.Serializable;
+
+/**
+ * Result of {@link Query4}.
+ */
+public class CategoryPrice implements KnownSize, Serializable {
+  private static final Coder<Long> LONG_CODER = VarLongCoder.of();
+  private static final Coder<Integer> INT_CODER = VarIntCoder.of();
+
+  public static final Coder<CategoryPrice> CODER = new AtomicCoder<CategoryPrice>() {
+    @Override
+    public void encode(CategoryPrice value, OutputStream outStream,
+        Coder.Context context)
+        throws CoderException, IOException {
+      LONG_CODER.encode(value.category, outStream, Context.NESTED);
+      LONG_CODER.encode(value.price, outStream, Context.NESTED);
+      INT_CODER.encode(value.isLast ? 1 : 0, outStream, Context.NESTED);
+    }
+
+    @Override
+    public CategoryPrice decode(
+        InputStream inStream, Coder.Context context)
+        throws CoderException, IOException {
+      long category = LONG_CODER.decode(inStream, Context.NESTED);
+      long price = LONG_CODER.decode(inStream, Context.NESTED);
+      boolean isLast = INT_CODER.decode(inStream, context) != 0;
+      return new CategoryPrice(category, price, isLast);
+    }
+  };
+
+  @JsonProperty
+  public final long category;
+
+  /** Price in cents. */
+  @JsonProperty
+  public final long price;
+
+  @JsonProperty
+  public final boolean isLast;
+
+  // For Avro only.
+  @SuppressWarnings("unused")
+  private CategoryPrice() {
+    category = 0;
+    price = 0;
+    isLast = false;
+  }
+
+  public CategoryPrice(long category, long price, boolean isLast) {
+    this.category = category;
+    this.price = price;
+    this.isLast = isLast;
+  }
+
+  @Override
+  public long sizeInBytes() {
+    return 8 + 8 + 1;
+  }
+
+  @Override
+  public String toString() {
+    try {
+      return NexmarkUtils.MAPPER.writeValueAsString(this);
+    } catch (JsonProcessingException e) {
+      throw new RuntimeException(e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/1f08970a/integration/java/src/main/java/org/apache/beam/integration/nexmark/Done.java
----------------------------------------------------------------------
diff --git a/integration/java/src/main/java/org/apache/beam/integration/nexmark/Done.java b/integration/java/src/main/java/org/apache/beam/integration/nexmark/Done.java
new file mode 100644
index 0000000..659da44
--- /dev/null
+++ b/integration/java/src/main/java/org/apache/beam/integration/nexmark/Done.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.integration.nexmark;
+
+import org.apache.beam.sdk.coders.AtomicCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonProcessingException;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.Serializable;
+
+/**
+ * Result of query 10.
+ */
+public class Done implements KnownSize, Serializable {
+  private static final Coder<String> STRING_CODER = StringUtf8Coder.of();
+
+  public static final Coder<Done> CODER = new AtomicCoder<Done>() {
+    @Override
+    public void encode(Done value, OutputStream outStream,
+        Coder.Context context)
+        throws CoderException, IOException {
+      STRING_CODER.encode(value.message, outStream, Context.NESTED);
+    }
+
+    @Override
+    public Done decode(
+        InputStream inStream, Coder.Context context)
+        throws CoderException, IOException {
+      String message = STRING_CODER.decode(inStream, Context.NESTED);
+      return new Done(message);
+    }
+  };
+
+  @JsonProperty
+  public final String message;
+
+  // For Avro only.
+  @SuppressWarnings("unused")
+  public Done() {
+    message = null;
+  }
+
+  public Done(String message) {
+    this.message = message;
+  }
+
+  @Override
+  public long sizeInBytes() {
+    return message.length();
+  }
+
+  @Override
+  public String toString() {
+    try {
+      return NexmarkUtils.MAPPER.writeValueAsString(this);
+    } catch (JsonProcessingException e) {
+      throw new RuntimeException(e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/1f08970a/integration/java/src/main/java/org/apache/beam/integration/nexmark/Event.java
----------------------------------------------------------------------
diff --git a/integration/java/src/main/java/org/apache/beam/integration/nexmark/Event.java b/integration/java/src/main/java/org/apache/beam/integration/nexmark/Event.java
new file mode 100644
index 0000000..a382b8e
--- /dev/null
+++ b/integration/java/src/main/java/org/apache/beam/integration/nexmark/Event.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.integration.nexmark;
+
+import org.apache.beam.sdk.coders.AtomicCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.VarIntCoder;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.Serializable;
+
+import javax.annotation.Nullable;
+
+/**
+ * An event in the auction system, either a (new) {@link Person}, a (new) {@link Auction},
+ * or a {@link Bid}.
+ */
+public class Event implements KnownSize, Serializable {
+  private static final Coder<Integer> INT_CODER = VarIntCoder.of();
+
+  public static final Coder<Event> CODER = new AtomicCoder<Event>() {
+    @Override
+    public void encode(Event value, OutputStream outStream, Coder.Context context)
+        throws CoderException, IOException {
+      if (value.newPerson != null) {
+        INT_CODER.encode(0, outStream, Context.NESTED);
+        Person.CODER.encode(value.newPerson, outStream, Context.NESTED);
+      } else if (value.newAuction != null) {
+        INT_CODER.encode(1, outStream, Context.NESTED);
+        Auction.CODER.encode(value.newAuction, outStream, Context.NESTED);
+      } else if (value.bid != null) {
+        INT_CODER.encode(2, outStream, Context.NESTED);
+        Bid.CODER.encode(value.bid, outStream, Context.NESTED);
+      } else {
+        throw new RuntimeException("invalid event");
+      }
+    }
+
+    @Override
+    public Event decode(
+        InputStream inStream, Coder.Context context)
+        throws CoderException, IOException {
+      int tag = INT_CODER.decode(inStream, context);
+      if (tag == 0) {
+        Person person = Person.CODER.decode(inStream, Context.NESTED);
+        return new Event(person);
+      } else if (tag == 1) {
+        Auction auction = Auction.CODER.decode(inStream, Context.NESTED);
+        return new Event(auction);
+      } else if (tag == 2) {
+        Bid bid = Bid.CODER.decode(inStream, Context.NESTED);
+        return new Event(bid);
+      } else {
+        throw new RuntimeException("invalid event encoding");
+      }
+    }
+  };
+
+  @Nullable
+  @org.apache.avro.reflect.Nullable
+  public final Person newPerson;
+
+  @Nullable
+  @org.apache.avro.reflect.Nullable
+  public final Auction newAuction;
+
+  @Nullable
+  @org.apache.avro.reflect.Nullable
+  public final Bid bid;
+
+  // For Avro only.
+  @SuppressWarnings("unused")
+  private Event() {
+    newPerson = null;
+    newAuction = null;
+    bid = null;
+  }
+
+  public Event(Person newPerson) {
+    this.newPerson = newPerson;
+    newAuction = null;
+    bid = null;
+  }
+
+  public Event(Auction newAuction) {
+    newPerson = null;
+    this.newAuction = newAuction;
+    bid = null;
+  }
+
+  public Event(Bid bid) {
+    newPerson = null;
+    newAuction = null;
+    this.bid = bid;
+  }
+
+  /**
+   * Return a copy of event which captures {@code annotation}.
+   * (Used for debugging).
+   */
+  public Event withAnnotation(String annotation) {
+    if (newPerson != null) {
+      return new Event(newPerson.withAnnotation(annotation));
+    } else if (newAuction != null) {
+      return new Event(newAuction.withAnnotation(annotation));
+    } else {
+      return new Event(bid.withAnnotation(annotation));
+    }
+  }
+
+  /**
+   * Does event have {@code annotation}? (Used for debugging.)
+   */
+  public boolean hasAnnotation(String annotation) {
+    if (newPerson != null) {
+      return newPerson.hasAnnotation(annotation);
+    } else if (newAuction != null) {
+      return newAuction.hasAnnotation(annotation);
+    } else {
+      return bid.hasAnnotation(annotation);
+    }
+  }
+
+  /**
+   * Remove {@code annotation} from event. (Used for debugging.)
+   */
+  public Event withoutAnnotation(String annotation) {
+    if (newPerson != null) {
+      return new Event(newPerson.withoutAnnotation(annotation));
+    } else if (newAuction != null) {
+      return new Event(newAuction.withoutAnnotation(annotation));
+    } else {
+      return new Event(bid.withoutAnnotation(annotation));
+    }
+  }
+
+  @Override
+  public long sizeInBytes() {
+    if (newPerson != null) {
+      return 1 + newPerson.sizeInBytes();
+    } else if (newAuction != null) {
+      return 1 + newAuction.sizeInBytes();
+    } else if (bid != null) {
+      return 1 + bid.sizeInBytes();
+    } else {
+      throw new RuntimeException("invalid event");
+    }
+  }
+
+  @Override
+  public String toString() {
+    if (newPerson != null) {
+      return newPerson.toString();
+    } else if (newAuction != null) {
+      return newAuction.toString();
+    } else if (bid != null) {
+      return bid.toString();
+    } else {
+      throw new RuntimeException("invalid event");
+    }
+  }
+}