You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ie...@apache.org on 2017/08/23 17:09:51 UTC

[43/55] [abbrv] beam git commit: Move module beam-integration-java-nexmark to beam-sdks-java-nexmark

http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/AbstractSimulator.java
----------------------------------------------------------------------
diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/AbstractSimulator.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/AbstractSimulator.java
new file mode 100644
index 0000000..6f4ad56
--- /dev/null
+++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/AbstractSimulator.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.nexmark.queries;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import javax.annotation.Nullable;
+
+import org.apache.beam.sdk.nexmark.NexmarkUtils;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+/**
+ * Abstract base class for simulator of a query.
+ *
+ * @param <InputT> Type of input elements.
+ * @param <OutputT> Type of output elements.
+ */
+public abstract class AbstractSimulator<InputT, OutputT> {
+  /** Window size for action bucket sampling. */
+  private static final Duration WINDOW_SIZE = Duration.standardMinutes(1);
+
+  /** Input event stream we should draw from. */
+  private final Iterator<TimestampedValue<InputT>> input;
+
+  /** Set to true when no more results. */
+  private boolean isDone;
+
+  /**
+   * Results which have not yet been returned by the {@link #results} iterator.
+   */
+  private final List<TimestampedValue<OutputT>> pendingResults;
+
+  /**
+   * Current window timestamp (ms since epoch).
+   */
+  private long currentWindow;
+
+  /**
+   * Number of (possibly intermediate) results for the current window.
+   */
+  private long currentCount;
+
+  /**
+   * Result counts per window which have not yet been returned by the {@link #resultsPerWindow}
+   * iterator.
+   */
+  private final List<Long> pendingCounts;
+
+  public AbstractSimulator(Iterator<TimestampedValue<InputT>> input) {
+    this.input = input;
+    isDone = false;
+    pendingResults = new ArrayList<>();
+    currentWindow = BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis();
+    currentCount = 0;
+    pendingCounts = new ArrayList<>();
+  }
+
+  /** Called by implementors of {@link #run}: Fetch the next input element. */
+  @Nullable
+  TimestampedValue<InputT> nextInput() {
+    if (!input.hasNext()) {
+      return null;
+    }
+    TimestampedValue<InputT> timestampedInput = input.next();
+    NexmarkUtils.info("input: %s", timestampedInput);
+    return timestampedInput;
+  }
+
+  /**
+   * Called by implementors of {@link #run}:  Capture an intermediate result, for the purpose of
+   * recording the expected activity of the query over time.
+   */
+  void addIntermediateResult(TimestampedValue<OutputT> result) {
+    NexmarkUtils.info("intermediate result: %s", result);
+    updateCounts(result.getTimestamp());
+  }
+
+  /**
+   * Called by implementors of {@link #run}: Capture a final result, for the purpose of checking
+   * semantic correctness.
+   */
+  void addResult(TimestampedValue<OutputT> result) {
+    NexmarkUtils.info("result: %s", result);
+    pendingResults.add(result);
+    updateCounts(result.getTimestamp());
+  }
+
+  /**
+   * Update window and counts.
+   */
+  private void updateCounts(Instant timestamp) {
+    long window = timestamp.getMillis() - timestamp.getMillis() % WINDOW_SIZE.getMillis();
+    if (window > currentWindow) {
+      if (currentWindow > BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis()) {
+        pendingCounts.add(currentCount);
+      }
+      currentCount = 0;
+      currentWindow = window;
+    }
+    currentCount++;
+  }
+
+  /** Called by implementors of {@link #run}: Record that no more results will be emitted. */
+  void allDone() {
+    isDone = true;
+  }
+
+  /**
+   * Overridden by derived classes to do the next increment of work. Each call should
+   * call one or more of {@link #nextInput}, {@link #addIntermediateResult}, {@link #addResult}
+   * or {@link #allDone}. It is ok for a single call to emit more than one result via
+   * {@link #addResult}. It is ok for a single call to run the entire simulation, though
+   * this will prevent the {@link #results} and {@link #resultsPerWindow} iterators to
+   * stall.
+   */
+  protected abstract void run();
+
+  /**
+   * Return iterator over all expected timestamped results. The underlying simulator state is
+   * changed. Only one of {@link #results} or {@link #resultsPerWindow} can be called.
+   */
+  public Iterator<TimestampedValue<OutputT>> results() {
+    return new Iterator<TimestampedValue<OutputT>>() {
+      @Override
+      public boolean hasNext() {
+        while (true) {
+          if (!pendingResults.isEmpty()) {
+            return true;
+          }
+          if (isDone) {
+            return false;
+          }
+          run();
+        }
+      }
+
+      @Override
+      public TimestampedValue<OutputT> next() {
+        TimestampedValue<OutputT> result = pendingResults.get(0);
+        pendingResults.remove(0);
+        return result;
+      }
+
+      @Override
+      public void remove() {
+        throw new UnsupportedOperationException();
+      }
+    };
+  }
+
+  /**
+   * Return an iterator over the number of results per {@link #WINDOW_SIZE} period. The underlying
+   * simulator state is changed.  Only one of {@link #results} or {@link #resultsPerWindow} can be
+   * called.
+   */
+  public Iterator<Long> resultsPerWindow() {
+    return new Iterator<Long>() {
+      @Override
+      public boolean hasNext() {
+        while (true) {
+          if (!pendingCounts.isEmpty()) {
+            return true;
+          }
+          if (isDone) {
+            if (currentCount > 0) {
+              pendingCounts.add(currentCount);
+              currentCount = 0;
+              currentWindow = BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis();
+              return true;
+            } else {
+              return false;
+            }
+          }
+          run();
+        }
+      }
+
+      @Override
+      public Long next() {
+        Long result = pendingCounts.get(0);
+        pendingCounts.remove(0);
+        return result;
+      }
+
+      @Override
+      public void remove() {
+        throw new UnsupportedOperationException();
+      }
+    };
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/NexmarkQuery.java
----------------------------------------------------------------------
diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/NexmarkQuery.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/NexmarkQuery.java
new file mode 100644
index 0000000..d070058
--- /dev/null
+++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/NexmarkQuery.java
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.nexmark.queries;
+
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Metrics;
+import org.apache.beam.sdk.nexmark.Monitor;
+import org.apache.beam.sdk.nexmark.NexmarkConfiguration;
+import org.apache.beam.sdk.nexmark.NexmarkUtils;
+import org.apache.beam.sdk.nexmark.model.Auction;
+import org.apache.beam.sdk.nexmark.model.Bid;
+import org.apache.beam.sdk.nexmark.model.Event;
+import org.apache.beam.sdk.nexmark.model.KnownSize;
+import org.apache.beam.sdk.nexmark.model.Person;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.Filter;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.apache.beam.sdk.values.TupleTag;
+import org.joda.time.Instant;
+
+/**
+ * Base class for the eight 'NEXMark' queries. Supplies some fragments common to
+ * multiple queries.
+ */
+public abstract class NexmarkQuery
+    extends PTransform<PCollection<Event>, PCollection<TimestampedValue<KnownSize>>> {
+  public static final TupleTag<Auction> AUCTION_TAG = new TupleTag<>("auctions");
+  public static final TupleTag<Bid> BID_TAG = new TupleTag<>("bids");
+  static final TupleTag<Person> PERSON_TAG = new TupleTag<>("person");
+
+  /** Predicate to detect a new person event. */
+  private static final SerializableFunction<Event, Boolean> IS_NEW_PERSON =
+      new SerializableFunction<Event, Boolean>() {
+        @Override
+        public Boolean apply(Event event) {
+          return event.newPerson != null;
+        }
+      };
+
+  /** DoFn to convert a new person event to a person. */
+  private static final DoFn<Event, Person> AS_PERSON = new DoFn<Event, Person>() {
+    @ProcessElement
+    public void processElement(ProcessContext c) {
+      c.output(c.element().newPerson);
+    }
+  };
+
+  /** Predicate to detect a new auction event. */
+  private static final SerializableFunction<Event, Boolean> IS_NEW_AUCTION =
+      new SerializableFunction<Event, Boolean>() {
+        @Override
+        public Boolean apply(Event event) {
+          return event.newAuction != null;
+        }
+      };
+
+  /** DoFn to convert a new auction event to an auction. */
+  private static final DoFn<Event, Auction> AS_AUCTION = new DoFn<Event, Auction>() {
+    @ProcessElement
+    public void processElement(ProcessContext c) {
+      c.output(c.element().newAuction);
+    }
+  };
+
+  /** Predicate to detect a new bid event. */
+  private static final SerializableFunction<Event, Boolean> IS_BID =
+      new SerializableFunction<Event, Boolean>() {
+        @Override
+        public Boolean apply(Event event) {
+          return event.bid != null;
+        }
+      };
+
+  /** DoFn to convert a bid event to a bid. */
+  private static final DoFn<Event, Bid> AS_BID = new DoFn<Event, Bid>() {
+    @ProcessElement
+    public void processElement(ProcessContext c) {
+      c.output(c.element().bid);
+    }
+  };
+
+  /** Transform to key each person by their id. */
+  static final ParDo.SingleOutput<Person, KV<Long, Person>> PERSON_BY_ID =
+      ParDo.of(new DoFn<Person, KV<Long, Person>>() {
+             @ProcessElement
+             public void processElement(ProcessContext c) {
+               c.output(KV.of(c.element().id, c.element()));
+             }
+           });
+
+  /** Transform to key each auction by its id. */
+  static final ParDo.SingleOutput<Auction, KV<Long, Auction>> AUCTION_BY_ID =
+      ParDo.of(new DoFn<Auction, KV<Long, Auction>>() {
+             @ProcessElement
+             public void processElement(ProcessContext c) {
+               c.output(KV.of(c.element().id, c.element()));
+             }
+           });
+
+  /** Transform to key each auction by its seller id. */
+  static final ParDo.SingleOutput<Auction, KV<Long, Auction>> AUCTION_BY_SELLER =
+      ParDo.of(new DoFn<Auction, KV<Long, Auction>>() {
+             @ProcessElement
+             public void processElement(ProcessContext c) {
+               c.output(KV.of(c.element().seller, c.element()));
+             }
+           });
+
+  /** Transform to key each bid by it's auction id. */
+  static final ParDo.SingleOutput<Bid, KV<Long, Bid>> BID_BY_AUCTION =
+      ParDo.of(new DoFn<Bid, KV<Long, Bid>>() {
+             @ProcessElement
+             public void processElement(ProcessContext c) {
+               c.output(KV.of(c.element().auction, c.element()));
+             }
+           });
+
+  /** Transform to project the auction id from each bid. */
+  static final ParDo.SingleOutput<Bid, Long> BID_TO_AUCTION =
+      ParDo.of(new DoFn<Bid, Long>() {
+             @ProcessElement
+             public void processElement(ProcessContext c) {
+               c.output(c.element().auction);
+             }
+           });
+
+  /** Transform to project the price from each bid. */
+  static final ParDo.SingleOutput<Bid, Long> BID_TO_PRICE =
+      ParDo.of(new DoFn<Bid, Long>() {
+             @ProcessElement
+             public void processElement(ProcessContext c) {
+               c.output(c.element().price);
+             }
+           });
+
+  /** Transform to emit each event with the timestamp embedded within it. */
+  public static final ParDo.SingleOutput<Event, Event> EVENT_TIMESTAMP_FROM_DATA =
+      ParDo.of(new DoFn<Event, Event>() {
+             @ProcessElement
+             public void processElement(ProcessContext c) {
+               Event e = c.element();
+               if (e.bid != null) {
+                 c.outputWithTimestamp(e, new Instant(e.bid.dateTime));
+               } else if (e.newPerson != null) {
+                 c.outputWithTimestamp(e, new Instant(e.newPerson.dateTime));
+               } else if (e.newAuction != null) {
+                 c.outputWithTimestamp(e, new Instant(e.newAuction.dateTime));
+               }
+             }
+           });
+
+  /**
+   * Transform to filter for just the new auction events.
+   */
+  public static final PTransform<PCollection<Event>, PCollection<Auction>> JUST_NEW_AUCTIONS =
+      new PTransform<PCollection<Event>, PCollection<Auction>>("justNewAuctions") {
+        @Override
+        public PCollection<Auction> expand(PCollection<Event> input) {
+          return input.apply("IsNewAuction", Filter.by(IS_NEW_AUCTION))
+                      .apply("AsAuction", ParDo.of(AS_AUCTION));
+        }
+      };
+
+  /**
+   * Transform to filter for just the new person events.
+   */
+  public static final PTransform<PCollection<Event>, PCollection<Person>> JUST_NEW_PERSONS =
+      new PTransform<PCollection<Event>, PCollection<Person>>("justNewPersons") {
+        @Override
+        public PCollection<Person> expand(PCollection<Event> input) {
+          return input.apply("IsNewPerson", Filter.by(IS_NEW_PERSON))
+                      .apply("AsPerson", ParDo.of(AS_PERSON));
+        }
+      };
+
+  /**
+   * Transform to filter for just the bid events.
+   */
+  public static final PTransform<PCollection<Event>, PCollection<Bid>> JUST_BIDS =
+      new PTransform<PCollection<Event>, PCollection<Bid>>("justBids") {
+        @Override
+        public PCollection<Bid> expand(PCollection<Event> input) {
+          return input.apply("IsBid", Filter.by(IS_BID))
+                      .apply("AsBid", ParDo.of(AS_BID));
+        }
+      };
+
+  final NexmarkConfiguration configuration;
+  public final Monitor<Event> eventMonitor;
+  public final Monitor<KnownSize> resultMonitor;
+  private final Monitor<Event> endOfStreamMonitor;
+  private final Counter fatalCounter;
+
+  NexmarkQuery(NexmarkConfiguration configuration, String name) {
+    super(name);
+    this.configuration = configuration;
+    if (configuration.debug) {
+      eventMonitor = new Monitor<>(name + ".Events", "event");
+      resultMonitor = new Monitor<>(name + ".Results", "result");
+      endOfStreamMonitor = new Monitor<>(name + ".EndOfStream", "end");
+      fatalCounter = Metrics.counter(name , "fatal");
+    } else {
+      eventMonitor = null;
+      resultMonitor = null;
+      endOfStreamMonitor = null;
+      fatalCounter = null;
+    }
+  }
+
+  /**
+   * Implement the actual query. All we know about the result is it has a known encoded size.
+   */
+  protected abstract PCollection<KnownSize> applyPrim(PCollection<Event> events);
+
+  @Override
+  public PCollection<TimestampedValue<KnownSize>> expand(PCollection<Event> events) {
+
+    if (configuration.debug) {
+      events =
+          events
+              // Monitor events as they go by.
+              .apply(name + ".Monitor", eventMonitor.getTransform())
+              // Count each type of event.
+              .apply(name + ".Snoop", NexmarkUtils.snoop(name));
+    }
+
+    if (configuration.cpuDelayMs > 0) {
+      // Slow down by pegging one core at 100%.
+      events = events.apply(name + ".CpuDelay",
+              NexmarkUtils.<Event>cpuDelay(name, configuration.cpuDelayMs));
+    }
+
+    if (configuration.diskBusyBytes > 0) {
+      // Slow down by forcing bytes to durable store.
+      events = events.apply(name + ".DiskBusy",
+              NexmarkUtils.<Event>diskBusy(configuration.diskBusyBytes));
+    }
+
+    // Run the query.
+    PCollection<KnownSize> queryResults = applyPrim(events);
+
+    if (configuration.debug) {
+      // Monitor results as they go by.
+      queryResults = queryResults.apply(name + ".Debug", resultMonitor.getTransform());
+    }
+
+    // Timestamp the query results.
+    return queryResults.apply(name + ".Stamp", NexmarkUtils.<KnownSize>stamp(name));
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/NexmarkQueryModel.java
----------------------------------------------------------------------
diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/NexmarkQueryModel.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/NexmarkQueryModel.java
new file mode 100644
index 0000000..1f093a0
--- /dev/null
+++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/NexmarkQueryModel.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.nexmark.queries;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.beam.sdk.nexmark.NexmarkConfiguration;
+import org.apache.beam.sdk.nexmark.model.KnownSize;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.TimestampedValue;
+
+
+import org.hamcrest.core.IsEqual;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Assert;
+
+/**
+ * Base class for models of the eight NEXMark queries. Provides an assertion function which can be
+ * applied against the actual query results to check their consistency with the model.
+ */
+public abstract class NexmarkQueryModel implements Serializable {
+  public final NexmarkConfiguration configuration;
+
+  NexmarkQueryModel(NexmarkConfiguration configuration) {
+    this.configuration = configuration;
+  }
+
+  /**
+   * Return the start of the most recent window of {@code size} and {@code period} which ends
+   * strictly before {@code timestamp}.
+   */
+  static Instant windowStart(Duration size, Duration period, Instant timestamp) {
+    long ts = timestamp.getMillis();
+    long p = period.getMillis();
+    long lim = ts - ts % p;
+    long s = size.getMillis();
+    return new Instant(lim - s);
+  }
+
+  /** Convert {@code itr} to strings capturing values, timestamps and order. */
+  static <T> List<String> toValueTimestampOrder(Iterator<TimestampedValue<T>> itr) {
+    List<String> strings = new ArrayList<>();
+    while (itr.hasNext()) {
+      strings.add(itr.next().toString());
+    }
+    return strings;
+  }
+
+  /** Convert {@code itr} to strings capturing values and order. */
+  static <T> List<String> toValueOrder(Iterator<TimestampedValue<T>> itr) {
+    List<String> strings = new ArrayList<>();
+    while (itr.hasNext()) {
+      strings.add(itr.next().getValue().toString());
+    }
+    return strings;
+  }
+
+  /** Convert {@code itr} to strings capturing values only. */
+  static <T> Set<String> toValue(Iterator<TimestampedValue<T>> itr) {
+    Set<String> strings = new HashSet<>();
+    while (itr.hasNext()) {
+      strings.add(itr.next().getValue().toString());
+    }
+    return strings;
+  }
+
+  /** Return simulator for query. */
+  public abstract AbstractSimulator<?, ?> simulator();
+
+  /** Return sub-sequence of results which are significant for model. */
+  Iterable<TimestampedValue<KnownSize>> relevantResults(
+      Iterable<TimestampedValue<KnownSize>> results) {
+    return results;
+  }
+
+  /**
+   * Convert iterator of elements to collection of strings to use when testing coherence of model
+   * against actual query results.
+   */
+  protected abstract <T> Collection<String> toCollection(Iterator<TimestampedValue<T>> itr);
+
+  /** Return assertion to use on results of pipeline for this query. */
+  public SerializableFunction<Iterable<TimestampedValue<KnownSize>>, Void> assertionFor() {
+    final Collection<String> expectedStrings = toCollection(simulator().results());
+
+    return new SerializableFunction<Iterable<TimestampedValue<KnownSize>>, Void>() {
+      @Override
+      public Void apply(Iterable<TimestampedValue<KnownSize>> actual) {
+      Collection<String> actualStrings = toCollection(relevantResults(actual).iterator());
+        Assert.assertThat("wrong pipeline output", actualStrings,
+          IsEqual.equalTo(expectedStrings));
+        return null;
+      }
+    };
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query0.java
----------------------------------------------------------------------
diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query0.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query0.java
new file mode 100644
index 0000000..68bf78e
--- /dev/null
+++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query0.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.nexmark.queries;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Metrics;
+import org.apache.beam.sdk.nexmark.NexmarkConfiguration;
+import org.apache.beam.sdk.nexmark.NexmarkUtils;
+import org.apache.beam.sdk.nexmark.model.Event;
+import org.apache.beam.sdk.nexmark.model.KnownSize;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+
+/**
+ * Query 0: Pass events through unchanged. However, force them to do a round trip through
+ * serialization so that we measure the impact of the choice of coders.
+ */
+public class Query0 extends NexmarkQuery {
+  public Query0(NexmarkConfiguration configuration) {
+    super(configuration, "Query0");
+  }
+
+  private PCollection<Event> applyTyped(PCollection<Event> events) {
+    final Coder<Event> coder = events.getCoder();
+    return events
+        // Force round trip through coder.
+        .apply(name + ".Serialize",
+            ParDo.of(new DoFn<Event, Event>() {
+                  private final Counter bytesMetric =
+                    Metrics.counter(name , "bytes");
+
+                  @ProcessElement
+                  public void processElement(ProcessContext c) throws CoderException, IOException {
+                    ByteArrayOutputStream outStream = new ByteArrayOutputStream();
+                    coder.encode(c.element(), outStream, Coder.Context.OUTER);
+                    byte[] byteArray = outStream.toByteArray();
+                    bytesMetric.inc((long) byteArray.length);
+                    ByteArrayInputStream inStream = new ByteArrayInputStream(byteArray);
+                    Event event = coder.decode(inStream, Coder.Context.OUTER);
+                    c.output(event);
+                  }
+                }));
+  }
+
+  @Override
+  protected PCollection<KnownSize> applyPrim(PCollection<Event> events) {
+    return NexmarkUtils.castToKnownSize(name, applyTyped(events));
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query0Model.java
----------------------------------------------------------------------
diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query0Model.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query0Model.java
new file mode 100644
index 0000000..0e73a21
--- /dev/null
+++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query0Model.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.nexmark.queries;
+
+import java.util.Collection;
+import java.util.Iterator;
+
+import org.apache.beam.sdk.nexmark.NexmarkConfiguration;
+import org.apache.beam.sdk.nexmark.NexmarkUtils;
+import org.apache.beam.sdk.nexmark.model.Event;
+import org.apache.beam.sdk.values.TimestampedValue;
+
+/**
+ * A direct implementation of {@link Query0}.
+ */
+public class Query0Model extends NexmarkQueryModel {
+  /**
+   * Simulator for query 0.
+   */
+  private static class Simulator extends AbstractSimulator<Event, Event> {
+    public Simulator(NexmarkConfiguration configuration) {
+      super(NexmarkUtils.standardEventIterator(configuration));
+    }
+
+    @Override
+    protected void run() {
+      TimestampedValue<Event> timestampedEvent = nextInput();
+      if (timestampedEvent == null) {
+        allDone();
+        return;
+      }
+      addResult(timestampedEvent);
+    }
+  }
+
+  public Query0Model(NexmarkConfiguration configuration) {
+    super(configuration);
+  }
+
+  @Override
+  public AbstractSimulator<?, ?> simulator() {
+    return new Simulator(configuration);
+  }
+
+  @Override
+  protected <T> Collection<String> toCollection(Iterator<TimestampedValue<T>> itr) {
+    return toValueTimestampOrder(itr);
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query1.java
----------------------------------------------------------------------
diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query1.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query1.java
new file mode 100644
index 0000000..810cd87
--- /dev/null
+++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query1.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.nexmark.queries;
+
+import org.apache.beam.sdk.nexmark.NexmarkConfiguration;
+import org.apache.beam.sdk.nexmark.NexmarkUtils;
+import org.apache.beam.sdk.nexmark.model.Bid;
+import org.apache.beam.sdk.nexmark.model.Event;
+import org.apache.beam.sdk.nexmark.model.KnownSize;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+
+/**
+ * Query 1, 'Currency Conversion'. Convert each bid value from dollars to euros.
+ * In CQL syntax:
+ *
+ * <pre>
+ * SELECT Istream(auction, DOLTOEUR(price), bidder, datetime)
+ * FROM bid [ROWS UNBOUNDED];
+ * </pre>
+ *
+ * <p>To make things more interesting, allow the 'currency conversion' to be arbitrarily
+ * slowed down.
+ */
+public class Query1 extends NexmarkQuery {
+  public Query1(NexmarkConfiguration configuration) {
+    super(configuration, "Query1");
+  }
+
+  private PCollection<Bid> applyTyped(PCollection<Event> events) {
+    return events
+        // Only want the bid events.
+        .apply(JUST_BIDS)
+
+        // Map the conversion function over all bids.
+        .apply(name + ".ToEuros",
+            ParDo.of(new DoFn<Bid, Bid>() {
+                  @ProcessElement
+                  public void processElement(ProcessContext c) {
+                    Bid bid = c.element();
+                    c.output(new Bid(
+                        bid.auction, bid.bidder, (bid.price * 89) / 100, bid.dateTime, bid.extra));
+                  }
+                }));
+  }
+
+  @Override
+  protected PCollection<KnownSize> applyPrim(PCollection<Event> events) {
+    return NexmarkUtils.castToKnownSize(name, applyTyped(events));
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query10.java
----------------------------------------------------------------------
diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query10.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query10.java
new file mode 100644
index 0000000..1c4e443
--- /dev/null
+++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query10.java
@@ -0,0 +1,367 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.nexmark.queries;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.Serializable;
+import java.nio.channels.Channels;
+import java.nio.channels.WritableByteChannel;
+import java.util.concurrent.ThreadLocalRandom;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.extensions.gcp.options.GcsOptions;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Metrics;
+import org.apache.beam.sdk.nexmark.NexmarkConfiguration;
+import org.apache.beam.sdk.nexmark.NexmarkUtils;
+import org.apache.beam.sdk.nexmark.model.Done;
+import org.apache.beam.sdk.nexmark.model.Event;
+import org.apache.beam.sdk.nexmark.model.KnownSize;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.windowing.AfterEach;
+import org.apache.beam.sdk.transforms.windowing.AfterFirst;
+import org.apache.beam.sdk.transforms.windowing.AfterPane;
+import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
+import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing;
+import org.apache.beam.sdk.transforms.windowing.Repeatedly;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Query "10", 'Log to sharded files' (Not in original suite.)
+ *
+ * <p>Every windowSizeSec, save all events from the last period into 2*maxWorkers log files.
+ */
+public class Query10 extends NexmarkQuery {
+  private static final Logger LOG = LoggerFactory.getLogger(Query10.class);
+  private static final int CHANNEL_BUFFER = 8 << 20; // 8MB
+  private static final int NUM_SHARDS_PER_WORKER = 5;
+  private static final Duration LATE_BATCHING_PERIOD = Duration.standardSeconds(10);
+
+  /**
+   * Capture everything we need to know about the records in a single output file.
+   */
+  private static class OutputFile implements Serializable {
+    /** Maximum possible timestamp of records in file. */
+    private final Instant maxTimestamp;
+    /** Shard within window. */
+    private final String shard;
+    /** Index of file in all files in shard. */
+    private final long index;
+    /** Timing of records in this file. */
+    private final PaneInfo.Timing timing;
+    /** Path to file containing records, or {@literal null} if no output required. */
+    @Nullable
+    private final String filename;
+
+    public OutputFile(
+        Instant maxTimestamp,
+        String shard,
+        long index,
+        PaneInfo.Timing timing,
+        @Nullable String filename) {
+      this.maxTimestamp = maxTimestamp;
+      this.shard = shard;
+      this.index = index;
+      this.timing = timing;
+      this.filename = filename;
+    }
+
+    @Override
+    public String toString() {
+      return String.format("%s %s %d %s %s%n", maxTimestamp, shard, index, timing, filename);
+    }
+  }
+
+  /**
+   * GCS uri prefix for all log and 'finished' files. If null they won't be written.
+   */
+  @Nullable
+  private String outputPath;
+
+  /**
+   * Maximum number of workers, used to determine log sharding factor.
+   */
+  private int maxNumWorkers;
+
+  public Query10(NexmarkConfiguration configuration) {
+    super(configuration, "Query10");
+  }
+
+  public void setOutputPath(@Nullable String outputPath) {
+    this.outputPath = outputPath;
+  }
+
+  public void setMaxNumWorkers(int maxNumWorkers) {
+    this.maxNumWorkers = maxNumWorkers;
+  }
+
+  /**
+   * Return channel for writing bytes to GCS.
+   */
+  private WritableByteChannel openWritableGcsFile(GcsOptions options, String filename)
+      throws IOException {
+    //TODO
+    // Fix after PR: right now this is a specific Google added use case
+    // Discuss it on ML: shall we keep GCS or use HDFS or use a generic beam filesystem way.
+    throw new UnsupportedOperationException("Disabled after removal of GcsIOChannelFactory");
+  }
+
+  /** Return a short string to describe {@code timing}. */
+  private String timingToString(PaneInfo.Timing timing) {
+    switch (timing) {
+      case EARLY:
+        return "E";
+      case ON_TIME:
+        return "O";
+      case LATE:
+        return "L";
+    }
+    throw new RuntimeException(); // cases are exhaustive
+  }
+
+  /** Construct an {@link OutputFile} for {@code pane} in {@code window} for {@code shard}. */
+  private OutputFile outputFileFor(BoundedWindow window, String shard, PaneInfo pane) {
+    @Nullable String filename =
+        outputPath == null
+        ? null
+        : String.format("%s/LOG-%s-%s-%03d-%s-%x",
+            outputPath, window.maxTimestamp(), shard, pane.getIndex(),
+            timingToString(pane.getTiming()),
+            ThreadLocalRandom.current().nextLong());
+    return new OutputFile(window.maxTimestamp(), shard, pane.getIndex(),
+        pane.getTiming(), filename);
+  }
+
+  /**
+   * Return path to which we should write the index for {@code window}, or {@literal null}
+   * if no output required.
+   */
+  @Nullable
+  private String indexPathFor(BoundedWindow window) {
+    if (outputPath == null) {
+      return null;
+    }
+    return String.format("%s/INDEX-%s", outputPath, window.maxTimestamp());
+  }
+
+  private PCollection<Done> applyTyped(PCollection<Event> events) {
+    final int numLogShards = maxNumWorkers * NUM_SHARDS_PER_WORKER;
+
+    return events
+      .apply(name + ".ShardEvents",
+        ParDo.of(new DoFn<Event, KV<String, Event>>() {
+          private final Counter lateCounter = Metrics.counter(name , "actuallyLateEvent");
+          private final Counter onTimeCounter = Metrics.counter(name , "onTimeCounter");
+
+          @ProcessElement
+          public void processElement(ProcessContext c) {
+            if (c.element().hasAnnotation("LATE")) {
+              lateCounter.inc();
+              LOG.info("Observed late: %s", c.element());
+            } else {
+              onTimeCounter.inc();
+            }
+            int shardNum = (int) Math.abs((long) c.element().hashCode() % numLogShards);
+            String shard = String.format("shard-%05d-of-%05d", shardNum, numLogShards);
+            c.output(KV.of(shard, c.element()));
+          }
+        }))
+      .apply(name + ".WindowEvents",
+        Window.<KV<String, Event>>into(
+          FixedWindows.of(Duration.standardSeconds(configuration.windowSizeSec)))
+          .triggering(AfterEach.inOrder(
+              Repeatedly
+                  .forever(AfterPane.elementCountAtLeast(configuration.maxLogEvents))
+                  .orFinally(AfterWatermark.pastEndOfWindow()),
+              Repeatedly.forever(
+                  AfterFirst.of(AfterPane.elementCountAtLeast(configuration.maxLogEvents),
+                      AfterProcessingTime.pastFirstElementInPane()
+                                         .plusDelayOf(LATE_BATCHING_PERIOD)))))
+          .discardingFiredPanes()
+          // Use a 1 day allowed lateness so that any forgotten hold will stall the
+          // pipeline for that period and be very noticeable.
+          .withAllowedLateness(Duration.standardDays(1)))
+      .apply(name + ".GroupByKey", GroupByKey.<String, Event>create())
+      .apply(name + ".CheckForLateEvents",
+        ParDo.of(new DoFn<KV<String, Iterable<Event>>,
+                 KV<String, Iterable<Event>>>() {
+          private final Counter earlyCounter = Metrics.counter(name , "earlyShard");
+          private final Counter onTimeCounter = Metrics.counter(name , "onTimeShard");
+          private final Counter lateCounter = Metrics.counter(name , "lateShard");
+          private final Counter unexpectedLatePaneCounter =
+            Metrics.counter(name , "ERROR_unexpectedLatePane");
+          private final Counter unexpectedOnTimeElementCounter =
+            Metrics.counter(name , "ERROR_unexpectedOnTimeElement");
+
+          @ProcessElement
+          public void processElement(ProcessContext c, BoundedWindow window) {
+            int numLate = 0;
+            int numOnTime = 0;
+            for (Event event : c.element().getValue()) {
+              if (event.hasAnnotation("LATE")) {
+                numLate++;
+              } else {
+                numOnTime++;
+              }
+            }
+            String shard = c.element().getKey();
+            LOG.info(String.format(
+                "%s with timestamp %s has %d actually late and %d on-time "
+                    + "elements in pane %s for window %s",
+                shard, c.timestamp(), numLate, numOnTime, c.pane(),
+                window.maxTimestamp()));
+            if (c.pane().getTiming() == PaneInfo.Timing.LATE) {
+              if (numLate == 0) {
+                LOG.error(
+                    "ERROR! No late events in late pane for %s", shard);
+                unexpectedLatePaneCounter.inc();
+              }
+              if (numOnTime > 0) {
+                LOG.error(
+                    "ERROR! Have %d on-time events in late pane for %s",
+                    numOnTime, shard);
+                unexpectedOnTimeElementCounter.inc();
+              }
+              lateCounter.inc();
+            } else if (c.pane().getTiming() == PaneInfo.Timing.EARLY) {
+              if (numOnTime + numLate < configuration.maxLogEvents) {
+                LOG.error(
+                    "ERROR! Only have %d events in early pane for %s",
+                    numOnTime + numLate, shard);
+              }
+              earlyCounter.inc();
+            } else {
+              onTimeCounter.inc();
+            }
+            c.output(c.element());
+          }
+        }))
+      .apply(name + ".UploadEvents",
+        ParDo.of(new DoFn<KV<String, Iterable<Event>>,
+                 KV<Void, OutputFile>>() {
+          private final Counter savedFileCounter = Metrics.counter(name , "savedFile");
+          private final Counter writtenRecordsCounter = Metrics.counter(name , "writtenRecords");
+
+            @ProcessElement
+            public void processElement(ProcessContext c, BoundedWindow window)
+                    throws IOException {
+              String shard = c.element().getKey();
+              GcsOptions options = c.getPipelineOptions().as(GcsOptions.class);
+              OutputFile outputFile = outputFileFor(window, shard, c.pane());
+              LOG.info(String.format(
+                  "Writing %s with record timestamp %s, window timestamp %s, pane %s",
+                  shard, c.timestamp(), window.maxTimestamp(), c.pane()));
+              if (outputFile.filename != null) {
+                LOG.info("Beginning write to '%s'", outputFile.filename);
+                int n = 0;
+                try (OutputStream output =
+                         Channels.newOutputStream(openWritableGcsFile(options, outputFile
+                             .filename))) {
+                  for (Event event : c.element().getValue()) {
+                    Event.CODER.encode(event, output, Coder.Context.OUTER);
+                    writtenRecordsCounter.inc();
+                    if (++n % 10000 == 0) {
+                      LOG.info("So far written %d records to '%s'", n,
+                          outputFile.filename);
+                    }
+                  }
+                }
+                LOG.info("Written all %d records to '%s'", n, outputFile.filename);
+              }
+              savedFileCounter.inc();
+              c.output(KV.<Void, OutputFile>of(null, outputFile));
+            }
+          }))
+      // Clear fancy triggering from above.
+      .apply(name + ".WindowLogFiles", Window.<KV<Void, OutputFile>>into(
+        FixedWindows.of(Duration.standardSeconds(configuration.windowSizeSec)))
+        .triggering(AfterWatermark.pastEndOfWindow())
+        // We expect no late data here, but we'll assume the worst so we can detect any.
+        .withAllowedLateness(Duration.standardDays(1))
+        .discardingFiredPanes())
+      // this GroupByKey allows to have one file per window
+      .apply(name + ".GroupByKey2", GroupByKey.<Void, OutputFile>create())
+      .apply(name + ".Index",
+        ParDo.of(new DoFn<KV<Void, Iterable<OutputFile>>, Done>() {
+          private final Counter unexpectedLateCounter =
+            Metrics.counter(name , "ERROR_unexpectedLate");
+          private final Counter unexpectedEarlyCounter =
+              Metrics.counter(name , "ERROR_unexpectedEarly");
+          private final Counter unexpectedIndexCounter =
+              Metrics.counter(name , "ERROR_unexpectedIndex");
+          private final Counter finalizedCounter = Metrics.counter(name , "indexed");
+
+          @ProcessElement
+          public void processElement(ProcessContext c, BoundedWindow window)
+                  throws IOException {
+            if (c.pane().getTiming() == Timing.LATE) {
+              unexpectedLateCounter.inc();
+              LOG.error("ERROR! Unexpected LATE pane: %s", c.pane());
+            } else if (c.pane().getTiming() == Timing.EARLY) {
+              unexpectedEarlyCounter.inc();
+              LOG.error("ERROR! Unexpected EARLY pane: %s", c.pane());
+            } else if (c.pane().getTiming() == Timing.ON_TIME
+                && c.pane().getIndex() != 0) {
+              unexpectedIndexCounter.inc();
+              LOG.error("ERROR! Unexpected ON_TIME pane index: %s", c.pane());
+            } else {
+              GcsOptions options = c.getPipelineOptions().as(GcsOptions.class);
+              LOG.info(
+                  "Index with record timestamp %s, window timestamp %s, pane %s",
+                  c.timestamp(), window.maxTimestamp(), c.pane());
+
+              @Nullable String filename = indexPathFor(window);
+              if (filename != null) {
+                LOG.info("Beginning write to '%s'", filename);
+                int n = 0;
+                try (OutputStream output =
+                         Channels.newOutputStream(
+                             openWritableGcsFile(options, filename))) {
+                  for (OutputFile outputFile : c.element().getValue()) {
+                    output.write(outputFile.toString().getBytes("UTF-8"));
+                    n++;
+                  }
+                }
+                LOG.info("Written all %d lines to '%s'", n, filename);
+              }
+              c.output(
+                  new Done("written for timestamp " + window.maxTimestamp()));
+              finalizedCounter.inc();
+            }
+          }
+        }));
+  }
+
+  @Override
+  protected PCollection<KnownSize> applyPrim(PCollection<Event> events) {
+    return NexmarkUtils.castToKnownSize(name, applyTyped(events));
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query11.java
----------------------------------------------------------------------
diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query11.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query11.java
new file mode 100644
index 0000000..47e7c00
--- /dev/null
+++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query11.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.nexmark.queries;
+
+import org.apache.beam.sdk.nexmark.NexmarkConfiguration;
+import org.apache.beam.sdk.nexmark.NexmarkUtils;
+import org.apache.beam.sdk.nexmark.model.Bid;
+import org.apache.beam.sdk.nexmark.model.BidsPerSession;
+import org.apache.beam.sdk.nexmark.model.Event;
+import org.apache.beam.sdk.nexmark.model.KnownSize;
+import org.apache.beam.sdk.transforms.Count;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.windowing.AfterPane;
+import org.apache.beam.sdk.transforms.windowing.Repeatedly;
+import org.apache.beam.sdk.transforms.windowing.Sessions;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.joda.time.Duration;
+
+/**
+ * Query "11", 'User sessions' (Not in original suite.)
+ *
+ * <p>Group bids by the same user into sessions with {@code windowSizeSec} max gap.
+ * However limit the session to at most {@code maxLogEvents}. Emit the number of
+ * bids per session.
+ */
+public class Query11 extends NexmarkQuery {
+  public Query11(NexmarkConfiguration configuration) {
+    super(configuration, "Query11");
+  }
+
+  private PCollection<BidsPerSession> applyTyped(PCollection<Event> events) {
+    PCollection<Long> bidders = events.apply(JUST_BIDS).apply(name + ".Rekey",
+        ParDo.of(new DoFn<Bid, Long>() {
+
+          @ProcessElement public void processElement(ProcessContext c) {
+            Bid bid = c.element();
+            c.output(bid.bidder);
+          }
+        }));
+
+    PCollection<Long> biddersWindowed = bidders.apply(
+        Window.<Long>into(
+          Sessions.withGapDuration(Duration.standardSeconds(configuration.windowSizeSec)))
+            .triggering(
+                Repeatedly.forever(AfterPane.elementCountAtLeast(configuration.maxLogEvents)))
+            .discardingFiredPanes()
+            .withAllowedLateness(Duration.standardSeconds(configuration.occasionalDelaySec / 2)));
+    return biddersWindowed.apply(Count.<Long>perElement())
+        .apply(name + ".ToResult", ParDo.of(new DoFn<KV<Long, Long>, BidsPerSession>() {
+
+          @ProcessElement public void processElement(ProcessContext c) {
+            c.output(new BidsPerSession(c.element().getKey(), c.element().getValue()));
+          }
+        }));
+  }
+
+  @Override
+  protected PCollection<KnownSize> applyPrim(PCollection<Event> events) {
+    return NexmarkUtils.castToKnownSize(name, applyTyped(events));
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query12.java
----------------------------------------------------------------------
diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query12.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query12.java
new file mode 100644
index 0000000..0f4b232
--- /dev/null
+++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query12.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.nexmark.queries;
+
+import org.apache.beam.sdk.nexmark.NexmarkConfiguration;
+import org.apache.beam.sdk.nexmark.NexmarkUtils;
+import org.apache.beam.sdk.nexmark.model.Bid;
+import org.apache.beam.sdk.nexmark.model.BidsPerSession;
+import org.apache.beam.sdk.nexmark.model.Event;
+import org.apache.beam.sdk.nexmark.model.KnownSize;
+import org.apache.beam.sdk.transforms.Count;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
+import org.apache.beam.sdk.transforms.windowing.Repeatedly;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.joda.time.Duration;
+
+/**
+ * Query "12", 'Processing time windows' (Not in original suite.)
+ *
+ * <p>Group bids by the same user into processing time windows of windowSize. Emit the count
+ * of bids per window.
+ */
+public class Query12 extends NexmarkQuery {
+  public Query12(NexmarkConfiguration configuration) {
+    super(configuration, "Query12");
+  }
+
+  private PCollection<BidsPerSession> applyTyped(PCollection<Event> events) {
+    return events
+        .apply(JUST_BIDS)
+        .apply(ParDo.of(new DoFn<Bid, Long>() {
+          @ProcessElement
+          public void processElement(ProcessContext c){
+            c.output(c.element().bidder);
+          }
+        }))
+        .apply(Window.<Long>into(new GlobalWindows())
+            .triggering(
+                Repeatedly.forever(
+                    AfterProcessingTime.pastFirstElementInPane()
+                                       .plusDelayOf(
+                                           Duration.standardSeconds(configuration.windowSizeSec))))
+            .discardingFiredPanes()
+            .withAllowedLateness(Duration.ZERO))
+        .apply(Count.<Long>perElement())
+        .apply(name + ".ToResult",
+            ParDo.of(new DoFn<KV<Long, Long>, BidsPerSession>() {
+                   @ProcessElement
+                   public void processElement(ProcessContext c) {
+                     c.output(
+                         new BidsPerSession(c.element().getKey(), c.element().getValue()));
+                   }
+                 }));
+  }
+
+  @Override
+  protected PCollection<KnownSize> applyPrim(PCollection<Event> events) {
+    return NexmarkUtils.castToKnownSize(name, applyTyped(events));
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query1Model.java
----------------------------------------------------------------------
diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query1Model.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query1Model.java
new file mode 100644
index 0000000..76c182a
--- /dev/null
+++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query1Model.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.nexmark.queries;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.Iterator;
+
+import org.apache.beam.sdk.nexmark.NexmarkConfiguration;
+import org.apache.beam.sdk.nexmark.NexmarkUtils;
+import org.apache.beam.sdk.nexmark.model.Bid;
+import org.apache.beam.sdk.nexmark.model.Event;
+import org.apache.beam.sdk.values.TimestampedValue;
+
+/**
+ * A direct implementation of {@link Query1}.
+ */
+public class Query1Model extends NexmarkQueryModel implements Serializable {
+  /**
+   * Simulator for query 1.
+   */
+  private static class Simulator extends AbstractSimulator<Event, Bid> {
+    public Simulator(NexmarkConfiguration configuration) {
+      super(NexmarkUtils.standardEventIterator(configuration));
+    }
+
+    @Override
+    protected void run() {
+      TimestampedValue<Event> timestampedEvent = nextInput();
+      if (timestampedEvent == null) {
+        allDone();
+        return;
+      }
+      Event event = timestampedEvent.getValue();
+      if (event.bid == null) {
+        // Ignore non-bid events.
+        return;
+      }
+      Bid bid = event.bid;
+      Bid resultBid =
+          new Bid(bid.auction, bid.bidder, bid.price * 89 / 100, bid.dateTime, bid.extra);
+      TimestampedValue<Bid> result =
+          TimestampedValue.of(resultBid, timestampedEvent.getTimestamp());
+      addResult(result);
+    }
+  }
+
+  public Query1Model(NexmarkConfiguration configuration) {
+    super(configuration);
+  }
+
+  @Override
+  public AbstractSimulator<?, ?> simulator() {
+    return new Simulator(configuration);
+  }
+
+  @Override
+  protected <T> Collection<String> toCollection(Iterator<TimestampedValue<T>> itr) {
+    return toValueTimestampOrder(itr);
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query2.java
----------------------------------------------------------------------
diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query2.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query2.java
new file mode 100644
index 0000000..c5ab992
--- /dev/null
+++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query2.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.nexmark.queries;
+
+import org.apache.beam.sdk.nexmark.NexmarkConfiguration;
+import org.apache.beam.sdk.nexmark.NexmarkUtils;
+import org.apache.beam.sdk.nexmark.model.AuctionPrice;
+import org.apache.beam.sdk.nexmark.model.Bid;
+import org.apache.beam.sdk.nexmark.model.Event;
+import org.apache.beam.sdk.nexmark.model.KnownSize;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.Filter;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.PCollection;
+
+/**
+ * Query 2, 'Filtering. Find bids with specific auction ids and show their bid price.
+ * In CQL syntax:
+ *
+ * <pre>
+ * SELECT Rstream(auction, price)
+ * FROM Bid [NOW]
+ * WHERE auction = 1007 OR auction = 1020 OR auction = 2001 OR auction = 2019 OR auction = 2087;
+ * </pre>
+ *
+ * <p>As written that query will only yield a few hundred results over event streams of
+ * arbitrary size. To make it more interesting we instead choose bids for every
+ * {@code auctionSkip}'th auction.
+ */
+public class Query2 extends NexmarkQuery {
+  public Query2(NexmarkConfiguration configuration) {
+    super(configuration, "Query2");
+  }
+
+  private PCollection<AuctionPrice> applyTyped(PCollection<Event> events) {
+    return events
+        // Only want the bid events.
+        .apply(JUST_BIDS)
+
+        // Select just the bids for the auctions we care about.
+        .apply(Filter.by(new SerializableFunction<Bid, Boolean>() {
+          @Override
+          public Boolean apply(Bid bid) {
+            return bid.auction % configuration.auctionSkip == 0;
+          }
+        }))
+
+        // Project just auction id and price.
+        .apply(name + ".Project",
+            ParDo.of(new DoFn<Bid, AuctionPrice>() {
+                  @ProcessElement
+                  public void processElement(ProcessContext c) {
+                    Bid bid = c.element();
+                    c.output(new AuctionPrice(bid.auction, bid.price));
+                  }
+                }));
+  }
+
+  @Override
+  protected PCollection<KnownSize> applyPrim(PCollection<Event> events) {
+    return NexmarkUtils.castToKnownSize(name, applyTyped(events));
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query2Model.java
----------------------------------------------------------------------
diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query2Model.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query2Model.java
new file mode 100644
index 0000000..33a1f8d
--- /dev/null
+++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query2Model.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.nexmark.queries;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.Iterator;
+
+import org.apache.beam.sdk.nexmark.NexmarkConfiguration;
+import org.apache.beam.sdk.nexmark.NexmarkUtils;
+import org.apache.beam.sdk.nexmark.model.AuctionPrice;
+import org.apache.beam.sdk.nexmark.model.Bid;
+import org.apache.beam.sdk.nexmark.model.Event;
+import org.apache.beam.sdk.values.TimestampedValue;
+
+/**
+ * A direct implementation of {@link Query2}.
+ */
+public class Query2Model extends NexmarkQueryModel implements Serializable {
+  /**
+   * Simulator for query 2.
+   */
+  private class Simulator extends AbstractSimulator<Event, AuctionPrice> {
+    public Simulator(NexmarkConfiguration configuration) {
+      super(NexmarkUtils.standardEventIterator(configuration));
+    }
+
+    @Override
+    protected void run() {
+      TimestampedValue<Event> timestampedEvent = nextInput();
+      if (timestampedEvent == null) {
+        allDone();
+        return;
+      }
+      Event event = timestampedEvent.getValue();
+      if (event.bid == null) {
+        // Ignore non bid events.
+        return;
+      }
+      Bid bid = event.bid;
+      if (bid.auction % configuration.auctionSkip != 0) {
+        // Ignore bids for auctions we don't care about.
+        return;
+      }
+      AuctionPrice auctionPrice = new AuctionPrice(bid.auction, bid.price);
+      TimestampedValue<AuctionPrice> result =
+          TimestampedValue.of(auctionPrice, timestampedEvent.getTimestamp());
+      addResult(result);
+    }
+  }
+
+  public Query2Model(NexmarkConfiguration configuration) {
+    super(configuration);
+  }
+
+  @Override
+  public AbstractSimulator<?, ?> simulator() {
+    return new Simulator(configuration);
+  }
+
+  @Override
+  protected <T> Collection<String> toCollection(Iterator<TimestampedValue<T>> itr) {
+    return toValueTimestampOrder(itr);
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query3.java
----------------------------------------------------------------------
diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query3.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query3.java
new file mode 100644
index 0000000..6f8d72d
--- /dev/null
+++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query3.java
@@ -0,0 +1,301 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.nexmark.queries;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.beam.sdk.coders.ListCoder;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Metrics;
+import org.apache.beam.sdk.nexmark.NexmarkConfiguration;
+import org.apache.beam.sdk.nexmark.NexmarkUtils;
+import org.apache.beam.sdk.nexmark.model.Auction;
+import org.apache.beam.sdk.nexmark.model.Event;
+import org.apache.beam.sdk.nexmark.model.KnownSize;
+import org.apache.beam.sdk.nexmark.model.NameCityStateId;
+import org.apache.beam.sdk.nexmark.model.Person;
+import org.apache.beam.sdk.state.StateSpec;
+import org.apache.beam.sdk.state.StateSpecs;
+import org.apache.beam.sdk.state.TimeDomain;
+import org.apache.beam.sdk.state.Timer;
+import org.apache.beam.sdk.state.TimerSpec;
+import org.apache.beam.sdk.state.TimerSpecs;
+import org.apache.beam.sdk.state.ValueState;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.Filter;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.join.CoGbkResult;
+import org.apache.beam.sdk.transforms.join.CoGroupByKey;
+import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple;
+import org.apache.beam.sdk.transforms.windowing.AfterPane;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
+import org.apache.beam.sdk.transforms.windowing.Repeatedly;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Query 3, 'Local Item Suggestion'. Who is selling in OR, ID or CA in category 10, and for what
+ * auction ids? In CQL syntax:
+ *
+ * <pre>
+ * SELECT Istream(P.name, P.city, P.state, A.id)
+ * FROM Auction A [ROWS UNBOUNDED], Person P [ROWS UNBOUNDED]
+ * WHERE A.seller = P.id AND (P.state = `OR' OR P.state = `ID' OR P.state = `CA') AND A.category
+ * = 10;
+ * </pre>
+ *
+ * <p>We'll implement this query to allow 'new auction' events to come before the 'new person'
+ * events for the auction seller. Those auctions will be stored until the matching person is seen.
+ * Then all subsequent auctions for a person will use the stored person record.
+ *
+ * <p>A real system would use an external system to maintain the id-to-person association.
+ */
+public class Query3 extends NexmarkQuery {
+
+  private static final Logger LOG = LoggerFactory.getLogger(Query3.class);
+  private final JoinDoFn joinDoFn;
+
+  public Query3(NexmarkConfiguration configuration) {
+    super(configuration, "Query3");
+    joinDoFn = new JoinDoFn(name, configuration.maxAuctionsWaitingTime);
+  }
+
+  private PCollection<NameCityStateId> applyTyped(PCollection<Event> events) {
+    int numEventsInPane = 30;
+
+    PCollection<Event> eventsWindowed =
+        events.apply(
+            Window.<Event>into(new GlobalWindows())
+                .triggering(Repeatedly.forever((AfterPane.elementCountAtLeast(numEventsInPane))))
+                .discardingFiredPanes()
+                .withAllowedLateness(Duration.ZERO));
+    PCollection<KV<Long, Auction>> auctionsBySellerId =
+        eventsWindowed
+            // Only want the new auction events.
+            .apply(JUST_NEW_AUCTIONS)
+
+            // We only want auctions in category 10.
+            .apply(
+                name + ".InCategory",
+                Filter.by(
+                    new SerializableFunction<Auction, Boolean>() {
+
+                      @Override
+                      public Boolean apply(Auction auction) {
+                        return auction.category == 10;
+                      }
+                    }))
+
+            // Key auctions by their seller id.
+            .apply("AuctionBySeller", AUCTION_BY_SELLER);
+
+    PCollection<KV<Long, Person>> personsById =
+        eventsWindowed
+            // Only want the new people events.
+            .apply(JUST_NEW_PERSONS)
+
+            // We only want people in OR, ID, CA.
+            .apply(
+                name + ".InState",
+                Filter.by(
+                    new SerializableFunction<Person, Boolean>() {
+
+                      @Override
+                      public Boolean apply(Person person) {
+                        return person.state.equals("OR")
+                            || person.state.equals("ID")
+                            || person.state.equals("CA");
+                      }
+                    }))
+
+            // Key people by their id.
+            .apply("PersonById", PERSON_BY_ID);
+
+    return
+    // Join auctions and people.
+    // concatenate KeyedPCollections
+    KeyedPCollectionTuple.of(AUCTION_TAG, auctionsBySellerId)
+        .and(PERSON_TAG, personsById)
+        // group auctions and persons by personId
+        .apply(CoGroupByKey.<Long>create())
+        .apply(name + ".Join", ParDo.of(joinDoFn))
+
+        // Project what we want.
+        .apply(
+            name + ".Project",
+            ParDo.of(
+                new DoFn<KV<Auction, Person>, NameCityStateId>() {
+
+                  @ProcessElement
+                  public void processElement(ProcessContext c) {
+                    Auction auction = c.element().getKey();
+                    Person person = c.element().getValue();
+                    c.output(
+                        new NameCityStateId(person.name, person.city, person.state, auction.id));
+                  }
+                }));
+  }
+
+  @Override
+  protected PCollection<KnownSize> applyPrim(PCollection<Event> events) {
+    return NexmarkUtils.castToKnownSize(name, applyTyped(events));
+  }
+
+  /**
+   * Join {@code auctions} and {@code people} by person id and emit their cross-product one pair at
+   * a time.
+   *
+   * <p>We know a person may submit any number of auctions. Thus new person event must have the
+   * person record stored in persistent state in order to match future auctions by that person.
+   *
+   * <p>However we know that each auction is associated with at most one person, so only need to
+   * store auction records in persistent state until we have seen the corresponding person record.
+   * And of course may have already seen that record.
+   */
+  private static class JoinDoFn extends DoFn<KV<Long, CoGbkResult>, KV<Auction, Person>> {
+
+    private final int maxAuctionsWaitingTime;
+    private static final String AUCTIONS = "auctions";
+    private static final String PERSON = "person";
+
+    @StateId(PERSON)
+    private static final StateSpec<ValueState<Person>> personSpec =
+        StateSpecs.value(Person.CODER);
+
+    private static final String PERSON_STATE_EXPIRING = "personStateExpiring";
+
+    @StateId(AUCTIONS)
+    private final StateSpec<ValueState<List<Auction>>> auctionsSpec =
+        StateSpecs.value(ListCoder.of(Auction.CODER));
+
+    @TimerId(PERSON_STATE_EXPIRING)
+    private final TimerSpec timerSpec = TimerSpecs.timer(TimeDomain.EVENT_TIME);
+
+    // Used to refer the metrics namespace
+    private final String name;
+
+    private final Counter newAuctionCounter;
+    private final Counter newPersonCounter;
+    private final Counter newNewOutputCounter;
+    private final Counter newOldOutputCounter;
+    private final Counter oldNewOutputCounter;
+    private final Counter fatalCounter;
+
+    private JoinDoFn(String name, int maxAuctionsWaitingTime) {
+      this.name = name;
+      this.maxAuctionsWaitingTime = maxAuctionsWaitingTime;
+      newAuctionCounter = Metrics.counter(name, "newAuction");
+      newPersonCounter = Metrics.counter(name, "newPerson");
+      newNewOutputCounter = Metrics.counter(name, "newNewOutput");
+      newOldOutputCounter = Metrics.counter(name, "newOldOutput");
+      oldNewOutputCounter = Metrics.counter(name, "oldNewOutput");
+      fatalCounter = Metrics.counter(name , "fatal");
+    }
+
+    @ProcessElement
+    public void processElement(
+        ProcessContext c,
+        @TimerId(PERSON_STATE_EXPIRING) Timer timer,
+        @StateId(PERSON) ValueState<Person> personState,
+        @StateId(AUCTIONS) ValueState<List<Auction>> auctionsState) {
+      // We would *almost* implement this by  rewindowing into the global window and
+      // running a combiner over the result. The combiner's accumulator would be the
+      // state we use below. However, combiners cannot emit intermediate results, thus
+      // we need to wait for the pending ReduceFn API.
+
+      Person existingPerson = personState.read();
+      if (existingPerson != null) {
+        // We've already seen the new person event for this person id.
+        // We can join with any new auctions on-the-fly without needing any
+        // additional persistent state.
+        for (Auction newAuction : c.element().getValue().getAll(AUCTION_TAG)) {
+          newAuctionCounter.inc();
+          newOldOutputCounter.inc();
+          c.output(KV.of(newAuction, existingPerson));
+        }
+        return;
+      }
+
+      Person theNewPerson = null;
+      for (Person newPerson : c.element().getValue().getAll(PERSON_TAG)) {
+        if (theNewPerson == null) {
+          theNewPerson = newPerson;
+        } else {
+          if (theNewPerson.equals(newPerson)) {
+            LOG.error("Duplicate person {}", theNewPerson);
+          } else {
+            LOG.error("Conflicting persons {} and {}", theNewPerson, newPerson);
+          }
+          fatalCounter.inc();
+          continue;
+        }
+        newPersonCounter.inc();
+        // We've now seen the person for this person id so can flush any
+        // pending auctions for the same seller id (an auction is done by only one seller).
+        List<Auction> pendingAuctions = auctionsState.read();
+        if (pendingAuctions != null) {
+          for (Auction pendingAuction : pendingAuctions) {
+            oldNewOutputCounter.inc();
+            c.output(KV.of(pendingAuction, newPerson));
+          }
+          auctionsState.clear();
+        }
+        // Also deal with any new auctions.
+        for (Auction newAuction : c.element().getValue().getAll(AUCTION_TAG)) {
+          newAuctionCounter.inc();
+          newNewOutputCounter.inc();
+          c.output(KV.of(newAuction, newPerson));
+        }
+        // Remember this person for any future auctions.
+        personState.write(newPerson);
+        //set a time out to clear this state
+        Instant firingTime = new Instant(newPerson.dateTime)
+                                  .plus(Duration.standardSeconds(maxAuctionsWaitingTime));
+        timer.set(firingTime);
+      }
+      if (theNewPerson != null) {
+        return;
+      }
+
+      // We'll need to remember the auctions until we see the corresponding
+      // new person event.
+      List<Auction> pendingAuctions = auctionsState.read();
+      if (pendingAuctions == null) {
+        pendingAuctions = new ArrayList<>();
+      }
+      for (Auction newAuction : c.element().getValue().getAll(AUCTION_TAG)) {
+        newAuctionCounter.inc();
+        pendingAuctions.add(newAuction);
+      }
+      auctionsState.write(pendingAuctions);
+    }
+
+    @OnTimer(PERSON_STATE_EXPIRING)
+    public void onTimerCallback(
+        OnTimerContext context,
+        @StateId(PERSON) ValueState<Person> personState) {
+        personState.clear();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/f4333df7/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query3Model.java
----------------------------------------------------------------------
diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query3Model.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query3Model.java
new file mode 100644
index 0000000..94f24cb
--- /dev/null
+++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/Query3Model.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.nexmark.queries;
+
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.Multimap;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+import org.apache.beam.sdk.nexmark.NexmarkConfiguration;
+import org.apache.beam.sdk.nexmark.NexmarkUtils;
+import org.apache.beam.sdk.nexmark.model.Auction;
+import org.apache.beam.sdk.nexmark.model.Event;
+import org.apache.beam.sdk.nexmark.model.NameCityStateId;
+import org.apache.beam.sdk.nexmark.model.Person;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.joda.time.Instant;
+
+/**
+ * A direct implementation of {@link Query3}.
+ */
+public class Query3Model extends NexmarkQueryModel implements Serializable {
+  /**
+   * Simulator for query 3.
+   */
+  private static class Simulator extends AbstractSimulator<Event, NameCityStateId> {
+    /** Auctions, indexed by seller id. */
+    private final Multimap<Long, Auction> newAuctions;
+
+    /** Persons, indexed by id. */
+    private final Map<Long, Person> newPersons;
+
+    public Simulator(NexmarkConfiguration configuration) {
+      super(NexmarkUtils.standardEventIterator(configuration));
+      newPersons = new HashMap<>();
+      newAuctions = ArrayListMultimap.create();
+    }
+
+    /**
+     * Capture new result.
+     */
+    private void addResult(Auction auction, Person person, Instant timestamp) {
+      TimestampedValue<NameCityStateId> result = TimestampedValue.of(
+          new NameCityStateId(person.name, person.city, person.state, auction.id), timestamp);
+      addResult(result);
+    }
+
+    @Override
+    protected void run() {
+      TimestampedValue<Event> timestampedEvent = nextInput();
+      if (timestampedEvent == null) {
+        allDone();
+        return;
+      }
+      Event event = timestampedEvent.getValue();
+      if (event.bid != null) {
+        // Ignore bid events.
+        return;
+      }
+
+      Instant timestamp = timestampedEvent.getTimestamp();
+
+      if (event.newAuction != null) {
+        // Only want auctions in category 10.
+        if (event.newAuction.category == 10) {
+          // Join new auction with existing person, if any.
+          Person person = newPersons.get(event.newAuction.seller);
+          if (person != null) {
+            addResult(event.newAuction, person, timestamp);
+          } else {
+            // Remember auction for future new person event.
+            newAuctions.put(event.newAuction.seller, event.newAuction);
+          }
+        }
+      } else {
+        // Only want people in OR, ID or CA.
+        if (event.newPerson.state.equals("OR") || event.newPerson.state.equals("ID")
+            || event.newPerson.state.equals("CA")) {
+          // Join new person with existing auctions.
+          for (Auction auction : newAuctions.get(event.newPerson.id)) {
+            addResult(auction, event.newPerson, timestamp);
+          }
+          // We'll never need these auctions again.
+          newAuctions.removeAll(event.newPerson.id);
+          // Remember person for future auctions.
+          newPersons.put(event.newPerson.id, event.newPerson);
+        }
+      }
+    }
+  }
+
+  public Query3Model(NexmarkConfiguration configuration) {
+    super(configuration);
+  }
+
+  @Override
+  public AbstractSimulator<?, ?> simulator() {
+    return new Simulator(configuration);
+  }
+
+  @Override
+  protected <T> Collection<String> toCollection(Iterator<TimestampedValue<T>> itr) {
+    return toValue(itr);
+  }
+}